Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
graph_reader.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include "graphar/high-level/graph_reader.h"
21 #include <algorithm>
22 #include <unordered_set>
23 #include "arrow/array.h"
24 #include "graphar/api/arrow_reader.h"
25 #include "graphar/convert_to_arrow_type.h"
26 #include "graphar/label.h"
27 #include "graphar/types.h"
28 
29 namespace graphar {
30 
31 template <Type type>
32 Status CastToAny(std::shared_ptr<arrow::Array> array,
33  std::any& any) { // NOLINT
34  if (array->IsNull(0)) {
35  any = std::any();
36  return Status::OK();
37  }
38  using ArrayType = typename TypeToArrowType<type>::ArrayType;
39  auto column = std::dynamic_pointer_cast<ArrayType>(array);
40  any = column->GetView(0);
41  return Status::OK();
42 }
43 
44 template <>
45 Status CastToAny<Type::STRING>(std::shared_ptr<arrow::Array> array,
46  std::any& any) { // NOLINT
47  using ArrayType = typename TypeToArrowType<Type::STRING>::ArrayType;
48  auto column = std::dynamic_pointer_cast<ArrayType>(array);
49  any = column->GetString(0);
50  return Status::OK();
51 }
52 
53 Status TryToCastToAny(const std::shared_ptr<DataType>& type,
54  std::shared_ptr<arrow::Array> array,
55  std::any& any) { // NOLINT
56  switch (type->id()) {
57  case Type::BOOL:
58  return CastToAny<Type::BOOL>(array, any);
59  case Type::INT32:
60  return CastToAny<Type::INT32>(array, any);
61  case Type::INT64:
62  return CastToAny<Type::INT64>(array, any);
63  case Type::FLOAT:
64  return CastToAny<Type::FLOAT>(array, any);
65  case Type::DOUBLE:
66  return CastToAny<Type::DOUBLE>(array, any);
67  case Type::STRING:
68  return CastToAny<Type::STRING>(array, any);
69  case Type::DATE:
70  return CastToAny<Type::DATE>(array, any);
71  case Type::TIMESTAMP:
72  return CastToAny<Type::TIMESTAMP>(array, any);
73  default:
74  return Status::TypeError("Unsupported type.");
75  }
76  return Status::OK();
77 }
78 
79 Vertex::Vertex(IdType id,
80  std::vector<VertexPropertyArrowChunkReader>& readers) // NOLINT
81  : id_(id) {
82  // get the first row of table
83  for (auto& reader : readers) {
84  GAR_ASSIGN_OR_RAISE_ERROR(auto chunk_table,
85  reader.GetChunk(graphar::GetChunkVersion::V1));
86  auto schema = chunk_table->schema();
87  for (int i = 0; i < schema->num_fields(); ++i) {
88  auto field = chunk_table->field(i);
89  if (field->type()->id() == arrow::Type::LIST) {
90  auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
91  chunk_table->column(i)->chunk(0));
92  list_properties_[field->name()] = list_array->value_slice(0);
93  } else {
94  auto type = DataType::ArrowDataTypeToDataType(field->type());
95  GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
96  chunk_table->column(i)->chunk(0),
97  properties_[field->name()]));
98  }
99  }
100  }
101 }
102 
103 Result<bool> VertexIter::hasLabel(const std::string& label) noexcept {
104  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
105  label_reader_.seek(cur_offset_);
106  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
107  column = util::GetArrowColumnByName(chunk_table, label);
108  if (column != nullptr) {
109  auto array = util::GetArrowArrayByChunkIndex(column, 0);
110  auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
111  return bool_array->Value(0);
112  }
113  return Status::KeyError("label with name ", label,
114  " does not exist in the vertex.");
115 }
116 
117 Result<std::vector<std::string>> VertexIter::label() noexcept {
118  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
119  std::vector<std::string> vertex_label;
120  if (is_filtered_)
121  label_reader_.seek(filtered_ids_[cur_offset_]);
122  else
123  label_reader_.seek(cur_offset_);
124  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
125  for (auto label : labels_) {
126  column = util::GetArrowColumnByName(chunk_table, label);
127  if (column != nullptr) {
128  auto array = util::GetArrowArrayByChunkIndex(column, 0);
129  auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
130  if (bool_array->Value(0)) {
131  vertex_label.push_back(label);
132  }
133  }
134  }
135  return vertex_label;
136 }
137 
138 static inline bool IsValid(bool* state, int column_number) {
139  for (int i = 0; i < column_number; ++i) {
140  // AND case
141  if (!state[i])
142  return false;
143  // OR case
144  // if (state[i]) return true;
145  }
146  // AND case
147  return true;
148  // OR case
149  // return false;
150 }
151 
152 Result<std::vector<IdType>> VerticesCollection::filter(
153  const std::vector<std::string>& filter_labels,
154  std::vector<IdType>* new_valid_chunk) {
155  std::vector<int> indices;
156  const int TOT_ROWS_NUM = vertex_num_;
157  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
158  const int TOT_LABEL_NUM = labels_.size();
159  const int TESTED_LABEL_NUM = filter_labels.size();
160  std::vector<int> tested_label_ids;
161 
162  for (const auto& filter_label : filter_labels) {
163  auto it = std::find(labels_.begin(), labels_.end(), filter_label);
164  if (it != labels_.end()) {
165  tested_label_ids.push_back(std::distance(labels_.begin(), it));
166  }
167  }
168  if (tested_label_ids.empty())
169  return Status::KeyError(
170  "query label"
171  " does not exist in the vertex.");
172 
173  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
174  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
175  int total_count = 0;
176  int row_num;
177 
178  if (is_filtered_) {
179  for (int chunk_idx : valid_chunk_) {
180  row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
181  std::string new_filename =
182  prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
183  int count = read_parquet_file_and_get_valid_indices(
184  new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
185  tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
186  QUERY_TYPE::INDEX);
187  if (count != 0 && new_valid_chunk != nullptr)
188  new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
189  }
190  } else {
191  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
192  ++chunk_idx) {
193  row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
194  std::string new_filename =
195  prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
196  int count = read_parquet_file_and_get_valid_indices(
197  new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
198  tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
199  QUERY_TYPE::INDEX);
200  if (count != 0)
201  valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
202  }
203  }
204  // std::cout << "Total valid count: " << total_count << std::endl;
205  std::vector<int64_t> indices64;
206 
207  for (int value : indices) {
208  indices64.push_back(static_cast<int64_t>(value));
209  }
210 
211  delete[] bitmap;
212 
213  return indices64;
214 }
215 
216 Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
217  const std::vector<std::string>& filter_labels) const {
218  std::vector<int> indices;
219  const int TOT_ROWS_NUM = vertex_num_;
220  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
221 
222  std::vector<int> tested_label_ids;
223  for (const auto& filter_label : filter_labels) {
224  auto it = std::find(labels_.begin(), labels_.end(), filter_label);
225  if (it != labels_.end()) {
226  tested_label_ids.push_back(std::distance(labels_.begin(), it));
227  }
228  }
229  int total_count = 0;
230  int row_num;
231  std::vector<std::shared_ptr<Expression>> filters;
232  std::shared_ptr<Expression> combined_filter = nullptr;
233 
234  for (const auto& label : filter_labels) {
235  filters.emplace_back(
236  graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
237  }
238 
239  for (const auto& filter : filters) {
240  if (!combined_filter) {
241  combined_filter = graphar::_And(filter, filter);
242  } else {
243  combined_filter = graphar::_And(combined_filter, filter);
244  }
245  }
246 
247  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
248  vertex_info_, labels_, prefix_, {});
249  auto filter_reader = maybe_filter_reader.value();
250  filter_reader->Filter(combined_filter);
251  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
252  auto filter_result = filter_reader->GetLabelChunk();
253  auto filter_table = filter_result.value();
254  total_count += filter_table->num_rows();
255  filter_reader->next_chunk();
256  }
257  // std::cout << "Total valid count: " << total_count << std::endl;
258  std::vector<int64_t> indices64;
259 
260  for (int value : indices) {
261  indices64.push_back(static_cast<int64_t>(value));
262  }
263 
264  return indices64;
265 }
266 
267 Result<std::vector<IdType>> VerticesCollection::filter(
268  const std::string& property_name,
269  std::shared_ptr<Expression> filter_expression,
270  std::vector<IdType>* new_valid_chunk) {
271  std::vector<int> indices;
272  const int TOT_ROWS_NUM = vertex_num_;
273  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
274  int total_count = 0;
275  auto property_group = vertex_info_->GetPropertyGroup(property_name);
276  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
277  vertex_info_, property_group, prefix_, {});
278  auto filter_reader = maybe_filter_reader.value();
279  filter_reader->Filter(filter_expression);
280  std::vector<int64_t> indices64;
281  if (is_filtered_) {
282  for (int chunk_idx : valid_chunk_) {
283  // how to itetate valid_chunk_?
284  filter_reader->seek(chunk_idx * CHUNK_SIZE);
285  auto filter_result =
286  filter_reader->GetChunk(graphar::GetChunkVersion::V1);
287  auto filter_table = filter_result.value();
288  int count = filter_table->num_rows();
289  if (count != 0 && new_valid_chunk != nullptr) {
290  new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
291  // TODO(elssky): record indices
292  int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
293  GeneralParams::kVertexIndexCol);
294  auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
295  auto int64_array =
296  std::static_pointer_cast<arrow::Int64Array>(column_array);
297  for (int64_t i = 0; i < int64_array->length(); ++i) {
298  if (!int64_array->IsNull(i)) {
299  indices64.push_back(int64_array->Value(i));
300  }
301  }
302  }
303  }
304  } else {
305  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
306  ++chunk_idx) {
307  auto filter_result =
308  filter_reader->GetChunk(graphar::GetChunkVersion::V1);
309  auto filter_table = filter_result.value();
310  int count = filter_table->num_rows();
311  filter_reader->next_chunk();
312  total_count += count;
313  if (count != 0) {
314  valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
315  // TODO(elssky): record indices
316  int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
317  GeneralParams::kVertexIndexCol);
318  auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
319  auto int64_array =
320  std::static_pointer_cast<arrow::Int64Array>(column_array);
321  for (int64_t i = 0; i < int64_array->length(); ++i) {
322  if (!int64_array->IsNull(i)) {
323  indices64.push_back(int64_array->Value(i));
324  }
325  }
326  }
327  }
328  }
329  // std::cout << "Total valid count: " << total_count << std::endl;
330  return indices64;
331 }
332 
333 Result<std::shared_ptr<VerticesCollection>>
335  const std::string& filter_label,
336  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
337  auto prefix = graph_info->GetPrefix();
338  auto vertex_info = graph_info->GetVertexInfo(type);
339  auto labels = vertex_info->GetLabels();
340  auto vertices_collection =
341  std::make_shared<VerticesCollection>(vertex_info, prefix);
342  vertices_collection->filtered_ids_ =
343  vertices_collection->filter({filter_label}).value();
344  vertices_collection->is_filtered_ = true;
345  return vertices_collection;
346 }
347 
348 Result<std::shared_ptr<VerticesCollection>>
349 VerticesCollection::verticesWithLabelbyAcero(
350  const std::string& filter_label,
351  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
352  auto prefix = graph_info->GetPrefix();
353  auto vertex_info = graph_info->GetVertexInfo(type);
354  auto labels = vertex_info->GetLabels();
355  auto vertices_collection =
356  std::make_shared<VerticesCollection>(vertex_info, prefix);
357  vertices_collection->filtered_ids_ =
358  vertices_collection->filter_by_acero({filter_label}).value();
359  vertices_collection->is_filtered_ = true;
360  return vertices_collection;
361 }
362 
363 Result<std::shared_ptr<VerticesCollection>>
365  const std::string& filter_label,
366  const std::shared_ptr<VerticesCollection>& vertices_collection) {
367  auto new_vertices_collection = std::make_shared<VerticesCollection>(
368  vertices_collection->vertex_info_, vertices_collection->prefix_);
369  auto filtered_ids =
370  new_vertices_collection
371  ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
372  .value();
373  if (vertices_collection->is_filtered_) {
374  std::unordered_set<IdType> origin_set(
375  vertices_collection->filtered_ids_.begin(),
376  vertices_collection->filtered_ids_.end());
377  std::unordered_set<int> intersection;
378  for (int num : filtered_ids) {
379  if (origin_set.count(num)) {
380  intersection.insert(num);
381  }
382  }
383  filtered_ids =
384  std::vector<IdType>(intersection.begin(), intersection.end());
385 
386  new_vertices_collection->is_filtered_ = true;
387  }
388  new_vertices_collection->filtered_ids_ = filtered_ids;
389 
390  return new_vertices_collection;
391 }
392 
393 Result<std::shared_ptr<VerticesCollection>>
395  const std::vector<std::string>& filter_labels,
396  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
397  auto prefix = graph_info->GetPrefix();
398  auto vertex_info = graph_info->GetVertexInfo(type);
399  auto labels = vertex_info->GetLabels();
400  auto vertices_collection =
401  std::make_shared<VerticesCollection>(vertex_info, prefix);
402  vertices_collection->filtered_ids_ =
403  vertices_collection->filter(filter_labels).value();
404  vertices_collection->is_filtered_ = true;
405  return vertices_collection;
406 }
407 
408 Result<std::shared_ptr<VerticesCollection>>
409 VerticesCollection::verticesWithMultipleLabelsbyAcero(
410  const std::vector<std::string>& filter_labels,
411  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
412  auto prefix = graph_info->GetPrefix();
413  auto vertex_info = graph_info->GetVertexInfo(type);
414  auto labels = vertex_info->GetLabels();
415  auto vertices_collection =
416  std::make_shared<VerticesCollection>(vertex_info, prefix);
417  vertices_collection->filtered_ids_ =
418  vertices_collection->filter_by_acero(filter_labels).value();
419  vertices_collection->is_filtered_ = true;
420  return vertices_collection;
421 }
422 
423 Result<std::shared_ptr<VerticesCollection>>
425  const std::vector<std::string>& filter_labels,
426  const std::shared_ptr<VerticesCollection>& vertices_collection) {
427  auto new_vertices_collection = std::make_shared<VerticesCollection>(
428  vertices_collection->vertex_info_, vertices_collection->prefix_);
429  auto filtered_ids =
430  vertices_collection
431  ->filter(filter_labels, &new_vertices_collection->valid_chunk_)
432  .value();
433  if (vertices_collection->is_filtered_) {
434  std::unordered_set<IdType> origin_set(
435  vertices_collection->filtered_ids_.begin(),
436  vertices_collection->filtered_ids_.end());
437  std::unordered_set<int> intersection;
438  for (int num : filtered_ids) {
439  if (origin_set.count(num)) {
440  intersection.insert(num);
441  }
442  }
443  filtered_ids =
444  std::vector<IdType>(intersection.begin(), intersection.end());
445 
446  new_vertices_collection->is_filtered_ = true;
447  }
448  new_vertices_collection->filtered_ids_ = filtered_ids;
449 
450  return new_vertices_collection;
451 }
452 
453 Result<std::shared_ptr<VerticesCollection>>
454 VerticesCollection::verticesWithProperty(
455  const std::string property_name, const graphar::util::Filter filter,
456  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
457  auto prefix = graph_info->GetPrefix();
458  auto vertex_info = graph_info->GetVertexInfo(type);
459  auto vertices_collection =
460  std::make_shared<VerticesCollection>(vertex_info, prefix);
461  vertices_collection->filtered_ids_ =
462  vertices_collection->filter(property_name, filter).value();
463  vertices_collection->is_filtered_ = true;
464  return vertices_collection;
465 }
466 
467 Result<std::shared_ptr<VerticesCollection>>
468 VerticesCollection::verticesWithProperty(
469  const std::string property_name, const graphar::util::Filter filter,
470  const std::shared_ptr<VerticesCollection>& vertices_collection) {
471  auto new_vertices_collection = std::make_shared<VerticesCollection>(
472  vertices_collection->vertex_info_, vertices_collection->prefix_);
473  auto filtered_ids = vertices_collection
474  ->filter(property_name, filter,
475  &new_vertices_collection->valid_chunk_)
476  .value();
477  if (vertices_collection->is_filtered_) {
478  std::unordered_set<IdType> origin_set(
479  vertices_collection->filtered_ids_.begin(),
480  vertices_collection->filtered_ids_.end());
481  std::unordered_set<int> intersection;
482  for (int num : filtered_ids) {
483  if (origin_set.count(num)) {
484  intersection.insert(num);
485  }
486  }
487  filtered_ids =
488  std::vector<IdType>(intersection.begin(), intersection.end());
489  new_vertices_collection->is_filtered_ = true;
490  }
491  new_vertices_collection->filtered_ids_ = filtered_ids;
492  return new_vertices_collection;
493 }
494 
495 template <typename T>
496 Result<T> Vertex::property(const std::string& property) const {
497  if constexpr (std::is_final<T>::value) {
498  auto it = list_properties_.find(property);
499  if (it == list_properties_.end()) {
500  return Status::KeyError("The list property ", property,
501  " doesn't exist.");
502  }
503  auto array = std::dynamic_pointer_cast<
505  it->second);
506  const typename T::ValueType* values = array->raw_values();
507  return T(values, array->length());
508  } else {
509  if (properties_.find(property) == properties_.end()) {
510  return Status::KeyError("Property with name ", property,
511  " does not exist in the vertex.");
512  }
513  try {
514  if (!properties_.at(property).has_value())
515  return Status::TypeError("The value of the ", property, " is null.");
516  T ret = std::any_cast<T>(properties_.at(property));
517  return ret;
518  } catch (const std::bad_any_cast& e) {
519  return Status::TypeError("Any cast failed, the property type of ",
520  property, " is not matched ", e.what());
521  }
522  }
523 }
524 
525 template <>
526 Result<Date> Vertex::property(const std::string& property) const {
527  if (properties_.find(property) == properties_.end()) {
528  return Status::KeyError("Property with name ", property,
529  " does not exist in the vertex.");
530  }
531  try {
532  if (!properties_.at(property).has_value())
533  return Status::TypeError("The value of the ", property, " is null.");
534  Date ret(std::any_cast<Date::c_type>(properties_.at(property)));
535  return ret;
536  } catch (const std::bad_any_cast& e) {
537  return Status::TypeError("Any cast failed, the property type of ", property,
538  " is not matched ", e.what());
539  }
540 }
541 
542 template <>
543 Result<Timestamp> Vertex::property(const std::string& property) const {
544  if (properties_.find(property) == properties_.end()) {
545  return Status::KeyError("Property with name ", property,
546  " does not exist in the vertex.");
547  }
548  try {
549  if (!properties_.at(property).has_value())
550  return Status::TypeError("The value of the ", property, " is null.");
551  Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(property)));
552  return ret;
553  } catch (const std::bad_any_cast& e) {
554  return Status::TypeError("Any cast failed, the property type of ", property,
555  " is not matched ", e.what());
556  }
557 }
558 
559 template <>
560 Result<StringArray> Vertex::property(const std::string& property) const {
561  auto it = list_properties_.find(property);
562  if (it == list_properties_.end()) {
563  return Status::KeyError("The list property ", property, " doesn't exist.");
564  }
565  auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
566  return StringArray(array->raw_value_offsets(), array->raw_data(),
567  array->length());
568 }
569 
571  AdjListArrowChunkReader& adj_list_reader, // NOLINT
572  std::vector<AdjListPropertyArrowChunkReader>& property_readers) { // NOLINT
573  // get the first row of table
574  GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_chunk_table,
575  adj_list_reader.GetChunk());
576  src_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
577  adj_list_chunk_table->column(0)->chunk(0))
578  ->GetView(0);
579  dst_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
580  adj_list_chunk_table->column(1)->chunk(0))
581  ->GetView(0);
582  for (auto& reader : property_readers) {
583  // get the first row of table
584  GAR_ASSIGN_OR_RAISE_ERROR(auto chunk_table, reader.GetChunk());
585  auto schema = chunk_table->schema();
586  for (int i = 0; i < schema->num_fields(); ++i) {
587  auto field = chunk_table->field(i);
588  if (field->type()->id() == arrow::Type::LIST) {
589  auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
590  chunk_table->column(i)->chunk(0));
591  list_properties_[field->name()] = list_array->value_slice(0);
592  } else {
593  auto type = DataType::ArrowDataTypeToDataType(field->type());
594  GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
595  chunk_table->column(i)->chunk(0),
596  properties_[field->name()]));
597  }
598  }
599  }
600 }
601 
602 template <typename T>
603 Result<T> Edge::property(const std::string& property) const {
604  if constexpr (std::is_final<T>::value) {
605  auto it = list_properties_.find(property);
606  if (it == list_properties_.end()) {
607  return Status::KeyError("The list property ", property,
608  " doesn't exist.");
609  }
610  auto array = std::dynamic_pointer_cast<
612  it->second);
613  const typename T::ValueType* values = array->raw_values();
614  return T(values, array->length());
615  } else {
616  if (properties_.find(property) == properties_.end()) {
617  return Status::KeyError("Property with name ", property,
618  " does not exist in the edge.");
619  }
620  try {
621  if (!properties_.at(property).has_value())
622  return Status::TypeError("The value of the ", property, " is null.");
623  T ret = std::any_cast<T>(properties_.at(property));
624  return ret;
625  } catch (const std::bad_any_cast& e) {
626  return Status::TypeError("Any cast failed, the property type of ",
627  property, " is not matched ", e.what());
628  }
629  }
630 }
631 
632 template <>
633 Result<Date> Edge::property(const std::string& property) const {
634  if (properties_.find(property) == properties_.end()) {
635  return Status::KeyError("Property with name ", property,
636  " does not exist in the edge.");
637  }
638  try {
639  if (!properties_.at(property).has_value())
640  return Status::TypeError("The value of the ", property, " is null.");
641  Date ret(std::any_cast<Date::c_type>(properties_.at(property)));
642  return ret;
643  } catch (const std::bad_any_cast& e) {
644  return Status::TypeError("Any cast failed, the property type of ", property,
645  " is not matched ", e.what());
646  }
647 }
648 
649 template <>
650 Result<Timestamp> Edge::property(const std::string& property) const {
651  if (properties_.find(property) == properties_.end()) {
652  return Status::KeyError("Property with name ", property,
653  " does not exist in the edge.");
654  }
655  try {
656  if (!properties_.at(property).has_value())
657  return Status::TypeError("The value of the ", property, " is null.");
658  Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(property)));
659  return ret;
660  } catch (const std::bad_any_cast& e) {
661  return Status::TypeError("Any cast failed, the property type of ", property,
662  " is not matched ", e.what());
663  }
664 }
665 
666 template <>
667 Result<StringArray> Edge::property(const std::string& property) const {
668  auto it = list_properties_.find(property);
669  if (it == list_properties_.end()) {
670  return Status::KeyError("The list property ", property, " doesn't exist.");
671  }
672  auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
673  return StringArray(array->raw_value_offsets(), array->raw_data(),
674  array->length());
675 }
676 
677 #define INSTANTIATE_PROPERTY(T) \
678  template Result<T> Vertex::property<T>(const std::string& name) const; \
679  template Result<T> Edge::property<T>(const std::string& name) const;
680 
681 INSTANTIATE_PROPERTY(bool)
682 INSTANTIATE_PROPERTY(const bool&)
683 INSTANTIATE_PROPERTY(int32_t)
684 INSTANTIATE_PROPERTY(const int32_t&)
685 INSTANTIATE_PROPERTY(Int32Array)
686 INSTANTIATE_PROPERTY(int64_t)
687 INSTANTIATE_PROPERTY(const int64_t&)
688 INSTANTIATE_PROPERTY(Int64Array)
689 INSTANTIATE_PROPERTY(float)
690 INSTANTIATE_PROPERTY(const float&)
691 INSTANTIATE_PROPERTY(FloatArray)
692 INSTANTIATE_PROPERTY(double)
693 INSTANTIATE_PROPERTY(const double&)
694 INSTANTIATE_PROPERTY(DoubleArray)
695 INSTANTIATE_PROPERTY(std::string)
696 INSTANTIATE_PROPERTY(const std::string&)
697 
698 IdType EdgeIter::source() {
699  adj_list_reader_.seek(cur_offset_);
700  GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk());
701  auto src_column = chunk->column(0);
702  return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
703  ->GetView(0);
704 }
705 
707  adj_list_reader_.seek(cur_offset_);
708  GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk());
709  auto src_column = chunk->column(1);
710  return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
711  ->GetView(0);
712 }
713 
714 bool EdgeIter::first_src(const EdgeIter& from, IdType id) {
715  if (from.is_end())
716  return false;
717 
718  // ordered_by_dest or unordered_by_dest
719  if (adj_list_type_ == AdjListType::ordered_by_dest ||
720  adj_list_type_ == AdjListType::unordered_by_dest) {
721  if (from.global_chunk_index_ >= chunk_end_) {
722  return false;
723  }
724  if (from.global_chunk_index_ == global_chunk_index_) {
725  cur_offset_ = from.cur_offset_;
726  } else if (from.global_chunk_index_ < chunk_begin_) {
727  this->to_begin();
728  } else {
729  global_chunk_index_ = from.global_chunk_index_;
730  cur_offset_ = from.cur_offset_;
731  vertex_chunk_index_ = from.vertex_chunk_index_;
732  this->refresh();
733  }
734  while (!this->is_end()) {
735  if (this->source() == id)
736  return true;
737  this->operator++();
738  }
739  return false;
740  }
741 
742  // unordered_by_source
743  if (adj_list_type_ == AdjListType::unordered_by_source) {
744  IdType expect_chunk_index =
745  index_converter_->IndexPairToGlobalChunkIndex(id / src_chunk_size_, 0);
746  if (expect_chunk_index > chunk_end_)
747  return false;
748  if (from.global_chunk_index_ >= chunk_end_) {
749  return false;
750  }
751  bool need_refresh = false;
752  if (from.global_chunk_index_ == global_chunk_index_) {
753  cur_offset_ = from.cur_offset_;
754  } else if (from.global_chunk_index_ < chunk_begin_) {
755  this->to_begin();
756  } else {
757  global_chunk_index_ = from.global_chunk_index_;
758  cur_offset_ = from.cur_offset_;
759  vertex_chunk_index_ = from.vertex_chunk_index_;
760  need_refresh = true;
761  }
762  if (global_chunk_index_ < expect_chunk_index) {
763  global_chunk_index_ = expect_chunk_index;
764  cur_offset_ = 0;
765  vertex_chunk_index_ = id / src_chunk_size_;
766  need_refresh = true;
767  }
768  if (need_refresh)
769  this->refresh();
770  while (!this->is_end()) {
771  if (this->source() == id)
772  return true;
773  if (vertex_chunk_index_ > id / src_chunk_size_)
774  return false;
775  this->operator++();
776  }
777  return false;
778  }
779 
780  // ordered_by_source
781  auto st = offset_reader_->seek(id);
782  if (!st.ok()) {
783  return false;
784  }
785  auto maybe_offset_chunk = offset_reader_->GetChunk();
786  if (!maybe_offset_chunk.status().ok()) {
787  return false;
788  }
789  auto offset_array =
790  std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
791  auto begin_offset = static_cast<IdType>(offset_array->Value(0));
792  auto end_offset = static_cast<IdType>(offset_array->Value(1));
793  if (begin_offset >= end_offset) {
794  return false;
795  }
796  auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
797  auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
798  vertex_chunk_index_of_id, begin_offset / chunk_size_);
799  auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
800  vertex_chunk_index_of_id, end_offset / chunk_size_);
801  if (begin_global_index <= from.global_chunk_index_ &&
802  from.global_chunk_index_ <= end_global_index) {
803  if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
804  global_chunk_index_ = from.global_chunk_index_;
805  cur_offset_ = from.cur_offset_;
806  vertex_chunk_index_ = from.vertex_chunk_index_;
807  refresh();
808  return true;
809  } else if (from.cur_offset_ <= begin_offset) {
810  global_chunk_index_ = begin_global_index;
811  cur_offset_ = begin_offset;
812  vertex_chunk_index_ = vertex_chunk_index_of_id;
813  refresh();
814  return true;
815  } else {
816  return false;
817  }
818  } else if (from.global_chunk_index_ < begin_global_index) {
819  global_chunk_index_ = begin_global_index;
820  cur_offset_ = begin_offset;
821  vertex_chunk_index_ = vertex_chunk_index_of_id;
822  refresh();
823  return true;
824  } else {
825  return false;
826  }
827 }
828 
829 bool EdgeIter::first_dst(const EdgeIter& from, IdType id) {
830  if (from.is_end())
831  return false;
832 
833  // ordered_by_source or unordered_by_source
834  if (adj_list_type_ == AdjListType::ordered_by_source ||
835  adj_list_type_ == AdjListType::unordered_by_source) {
836  if (from.global_chunk_index_ >= chunk_end_) {
837  return false;
838  }
839  if (from.global_chunk_index_ == global_chunk_index_) {
840  cur_offset_ = from.cur_offset_;
841  } else if (from.global_chunk_index_ < chunk_begin_) {
842  this->to_begin();
843  } else {
844  global_chunk_index_ = from.global_chunk_index_;
845  cur_offset_ = from.cur_offset_;
846  vertex_chunk_index_ = from.vertex_chunk_index_;
847  this->refresh();
848  }
849  while (!this->is_end()) {
850  if (this->destination() == id)
851  return true;
852  this->operator++();
853  }
854  return false;
855  }
856 
857  // unordered_by_dest
858  if (adj_list_type_ == AdjListType::unordered_by_dest) {
859  IdType expect_chunk_index =
860  index_converter_->IndexPairToGlobalChunkIndex(id / dst_chunk_size_, 0);
861  if (expect_chunk_index > chunk_end_)
862  return false;
863  if (from.global_chunk_index_ >= chunk_end_) {
864  return false;
865  }
866  bool need_refresh = false;
867  if (from.global_chunk_index_ == global_chunk_index_) {
868  cur_offset_ = from.cur_offset_;
869  } else if (from.global_chunk_index_ < chunk_begin_) {
870  this->to_begin();
871  } else {
872  global_chunk_index_ = from.global_chunk_index_;
873  cur_offset_ = from.cur_offset_;
874  vertex_chunk_index_ = from.vertex_chunk_index_;
875  need_refresh = true;
876  }
877  if (global_chunk_index_ < expect_chunk_index) {
878  global_chunk_index_ = expect_chunk_index;
879  cur_offset_ = 0;
880  vertex_chunk_index_ = id / dst_chunk_size_;
881  need_refresh = true;
882  }
883  if (need_refresh)
884  this->refresh();
885  while (!this->is_end()) {
886  if (this->destination() == id)
887  return true;
888  if (vertex_chunk_index_ > id / dst_chunk_size_)
889  return false;
890  this->operator++();
891  }
892  return false;
893  }
894 
895  // ordered_by_dest
896  auto st = offset_reader_->seek(id);
897  if (!st.ok()) {
898  return false;
899  }
900  auto maybe_offset_chunk = offset_reader_->GetChunk();
901  if (!maybe_offset_chunk.status().ok()) {
902  return false;
903  }
904  auto offset_array =
905  std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
906  auto begin_offset = static_cast<IdType>(offset_array->Value(0));
907  auto end_offset = static_cast<IdType>(offset_array->Value(1));
908  if (begin_offset >= end_offset) {
909  return false;
910  }
911  auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
912  auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
913  vertex_chunk_index_of_id, begin_offset / chunk_size_);
914  auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
915  vertex_chunk_index_of_id, end_offset / chunk_size_);
916  if (begin_global_index <= from.global_chunk_index_ &&
917  from.global_chunk_index_ <= end_global_index) {
918  if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
919  global_chunk_index_ = from.global_chunk_index_;
920  cur_offset_ = from.cur_offset_;
921  vertex_chunk_index_ = from.vertex_chunk_index_;
922  refresh();
923  return true;
924  } else if (from.cur_offset_ <= begin_offset) {
925  global_chunk_index_ = begin_global_index;
926  cur_offset_ = begin_offset;
927  vertex_chunk_index_ = vertex_chunk_index_of_id;
928  refresh();
929  return true;
930  } else {
931  return false;
932  }
933  } else if (from.global_chunk_index_ < begin_global_index) {
934  global_chunk_index_ = begin_global_index;
935  cur_offset_ = begin_offset;
936  vertex_chunk_index_ = vertex_chunk_index_of_id;
937  refresh();
938  return true;
939  } else {
940  return false;
941  }
942 }
943 
944 Result<std::shared_ptr<EdgesCollection>> EdgesCollection::Make(
945  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
946  const std::string& edge_type, const std::string& dst_type,
947  AdjListType adj_list_type, const IdType vertex_chunk_begin,
948  const IdType vertex_chunk_end) noexcept {
949  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
950  if (!edge_info) {
951  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
952  dst_type, " doesn't exist.");
953  }
954  if (!edge_info->HasAdjacentListType(adj_list_type)) {
955  return Status::Invalid("The edge ", edge_type, " of adj list type ",
956  AdjListTypeToString(adj_list_type),
957  " doesn't exist.");
958  }
959  switch (adj_list_type) {
960  case AdjListType::ordered_by_source:
961  return std::make_shared<OBSEdgeCollection>(
962  edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
963  vertex_chunk_end);
964  case AdjListType::ordered_by_dest:
965  return std::make_shared<OBDEdgesCollection>(
966  edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
967  vertex_chunk_end);
968  case AdjListType::unordered_by_source:
969  return std::make_shared<UBSEdgesCollection>(
970  edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
971  vertex_chunk_end);
972  case AdjListType::unordered_by_dest:
973  return std::make_shared<UBDEdgesCollection>(
974  edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
975  vertex_chunk_end);
976  default:
977  return Status::Invalid("Unknown adj list type.");
978  }
979  return Status::OK();
980 }
981 
982 } // namespace graphar
The arrow chunk reader for adj list topology chunk.
Definition: chunk_reader.h:277
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
Result< T > property(const std::string &property) const
Get the property value of the edge.
The iterator for traversing a type of edges.
Definition: graph_reader.h:491
EdgeIter & operator++()
Definition: graph_reader.h:591
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
bool is_end() const
Definition: graph_reader.h:713
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_type, const std::string &edge_type, const std::string &dst_type, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge type.
static Status TypeError(Args &&... args)
Definition: status.h:178
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)
Definition: graph_reader.cc:79
Result< std::vector< std::string > > label() noexcept
Result< bool > hasLabel(const std::string &label) noexcept
Result< std::shared_ptr< arrow::Table > > GetLabelChunk()
Return the current arrow label chunk table of chunk position indicator.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.
static Result< std::shared_ptr< VerticesCollection > > verticesWithMultipleLabels(const std::vector< std::string > &filter_labels, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with multiple labels.
static Result< std::shared_ptr< VerticesCollection > > verticesWithLabel(const std::string &filter_label, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with a specific label.
Result< std::vector< IdType > > filter(const std::vector< std::string > &filter_labels, std::vector< IdType > *new_valid_chunk=nullptr)