20 #include "graphar/high-level/graph_reader.h"
22 #include <unordered_set>
23 #include "arrow/array.h"
24 #include "graphar/api/arrow_reader.h"
25 #include "graphar/convert_to_arrow_type.h"
26 #include "graphar/label.h"
27 #include "graphar/types.h"
32 Status CastToAny(std::shared_ptr<arrow::Array> array,
34 if (array->IsNull(0)) {
38 using ArrayType =
typename TypeToArrowType<type>::ArrayType;
39 auto column = std::dynamic_pointer_cast<ArrayType>(array);
40 any = column->GetView(0);
45 Status CastToAny<Type::STRING>(std::shared_ptr<arrow::Array> array,
47 using ArrayType =
typename TypeToArrowType<Type::STRING>::ArrayType;
48 auto column = std::dynamic_pointer_cast<ArrayType>(array);
49 any = column->GetString(0);
53 Status TryToCastToAny(
const std::shared_ptr<DataType>& type,
54 std::shared_ptr<arrow::Array> array,
58 return CastToAny<Type::BOOL>(array, any);
60 return CastToAny<Type::INT32>(array, any);
62 return CastToAny<Type::INT64>(array, any);
64 return CastToAny<Type::FLOAT>(array, any);
66 return CastToAny<Type::DOUBLE>(array, any);
68 return CastToAny<Type::STRING>(array, any);
70 return CastToAny<Type::DATE>(array, any);
72 return CastToAny<Type::TIMESTAMP>(array, any);
80 std::vector<VertexPropertyArrowChunkReader>& readers)
83 for (
auto& reader : readers) {
84 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk_table,
85 reader.GetChunk(graphar::GetChunkVersion::V1));
86 auto schema = chunk_table->schema();
87 for (
int i = 0; i < schema->num_fields(); ++i) {
88 auto field = chunk_table->field(i);
89 if (field->type()->id() == arrow::Type::LIST) {
90 auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
91 chunk_table->column(i)->chunk(0));
92 list_properties_[field->name()] = list_array->value_slice(0);
94 auto type = DataType::ArrowDataTypeToDataType(field->type());
95 GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
96 chunk_table->column(i)->chunk(0),
97 properties_[field->name()]));
104 std::shared_ptr<arrow::ChunkedArray> column(
nullptr);
105 label_reader_.seek(cur_offset_);
106 GAR_ASSIGN_OR_RAISE(
auto chunk_table, label_reader_.GetLabelChunk());
107 column = util::GetArrowColumnByName(chunk_table, label);
108 if (column !=
nullptr) {
109 auto array = util::GetArrowArrayByChunkIndex(column, 0);
110 auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
111 return bool_array->Value(0);
114 " does not exist in the vertex.");
118 std::shared_ptr<arrow::ChunkedArray> column(
nullptr);
119 std::vector<std::string> vertex_label;
121 label_reader_.
seek(filtered_ids_[cur_offset_]);
123 label_reader_.
seek(cur_offset_);
124 GAR_ASSIGN_OR_RAISE(
auto chunk_table, label_reader_.
GetLabelChunk());
125 for (
auto label : labels_) {
126 column = util::GetArrowColumnByName(chunk_table,
label);
127 if (column !=
nullptr) {
128 auto array = util::GetArrowArrayByChunkIndex(column, 0);
129 auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
130 if (bool_array->Value(0)) {
131 vertex_label.push_back(
label);
138 static inline bool IsValid(
bool* state,
int column_number) {
139 for (
int i = 0; i < column_number; ++i) {
153 const std::vector<std::string>& filter_labels,
154 std::vector<IdType>* new_valid_chunk) {
155 std::vector<int> indices;
156 const int TOT_ROWS_NUM = vertex_num_;
157 const int CHUNK_SIZE = vertex_info_->GetChunkSize();
158 const int TOT_LABEL_NUM = labels_.size();
159 const int TESTED_LABEL_NUM = filter_labels.size();
160 std::vector<int> tested_label_ids;
162 for (
const auto& filter_label : filter_labels) {
163 auto it = std::find(labels_.begin(), labels_.end(), filter_label);
164 if (it != labels_.end()) {
165 tested_label_ids.push_back(std::distance(labels_.begin(), it));
168 if (tested_label_ids.empty())
171 " does not exist in the vertex.");
173 uint64_t* bitmap =
new uint64_t[TOT_ROWS_NUM / 64 + 1];
174 memset(bitmap, 0,
sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
179 for (
int chunk_idx : valid_chunk_) {
180 row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
181 std::string new_filename =
182 prefix_ + vertex_info_->GetPrefix() +
"labels/chunk";
183 int count = read_parquet_file_and_get_valid_indices(
184 new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
185 tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
187 if (count != 0 && new_valid_chunk !=
nullptr)
188 new_valid_chunk->emplace_back(
static_cast<IdType
>(chunk_idx));
191 for (
int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
193 row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
194 std::string new_filename =
195 prefix_ + vertex_info_->GetPrefix() +
"labels/chunk";
196 int count = read_parquet_file_and_get_valid_indices(
197 new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
198 tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
201 valid_chunk_.emplace_back(
static_cast<IdType
>(chunk_idx));
205 std::vector<int64_t> indices64;
207 for (
int value : indices) {
208 indices64.push_back(
static_cast<int64_t
>(value));
216 Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
217 const std::vector<std::string>& filter_labels)
const {
218 std::vector<int> indices;
219 const int TOT_ROWS_NUM = vertex_num_;
220 const int CHUNK_SIZE = vertex_info_->GetChunkSize();
222 std::vector<int> tested_label_ids;
223 for (
const auto& filter_label : filter_labels) {
224 auto it = std::find(labels_.begin(), labels_.end(), filter_label);
225 if (it != labels_.end()) {
226 tested_label_ids.push_back(std::distance(labels_.begin(), it));
231 std::vector<std::shared_ptr<Expression>> filters;
232 std::shared_ptr<Expression> combined_filter =
nullptr;
234 for (
const auto& label : filter_labels) {
235 filters.emplace_back(
236 graphar::_Equal(graphar::_Property(label), graphar::_Literal(
true)));
239 for (
const auto&
filter : filters) {
240 if (!combined_filter) {
243 combined_filter = graphar::_And(combined_filter,
filter);
248 vertex_info_, labels_, prefix_, {});
249 auto filter_reader = maybe_filter_reader.value();
250 filter_reader->Filter(combined_filter);
251 for (
int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
252 auto filter_result = filter_reader->GetLabelChunk();
253 auto filter_table = filter_result.value();
254 total_count += filter_table->num_rows();
255 filter_reader->next_chunk();
258 std::vector<int64_t> indices64;
260 for (
int value : indices) {
261 indices64.push_back(
static_cast<int64_t
>(value));
268 const std::string& property_name,
269 std::shared_ptr<Expression> filter_expression,
270 std::vector<IdType>* new_valid_chunk) {
271 std::vector<int> indices;
272 const int TOT_ROWS_NUM = vertex_num_;
273 const int CHUNK_SIZE = vertex_info_->GetChunkSize();
275 auto property_group = vertex_info_->GetPropertyGroup(property_name);
277 vertex_info_, property_group, prefix_, {});
278 auto filter_reader = maybe_filter_reader.value();
279 filter_reader->Filter(filter_expression);
280 std::vector<int64_t> indices64;
282 for (
int chunk_idx : valid_chunk_) {
284 filter_reader->seek(chunk_idx * CHUNK_SIZE);
286 filter_reader->GetChunk(graphar::GetChunkVersion::V1);
287 auto filter_table = filter_result.value();
288 int count = filter_table->num_rows();
289 if (count != 0 && new_valid_chunk !=
nullptr) {
290 new_valid_chunk->emplace_back(
static_cast<IdType
>(chunk_idx));
292 int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
293 GeneralParams::kVertexIndexCol);
294 auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
296 std::static_pointer_cast<arrow::Int64Array>(column_array);
297 for (int64_t i = 0; i < int64_array->length(); ++i) {
298 if (!int64_array->IsNull(i)) {
299 indices64.push_back(int64_array->Value(i));
305 for (
int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
308 filter_reader->GetChunk(graphar::GetChunkVersion::V1);
309 auto filter_table = filter_result.value();
310 int count = filter_table->num_rows();
311 filter_reader->next_chunk();
312 total_count += count;
314 valid_chunk_.emplace_back(
static_cast<IdType
>(chunk_idx));
316 int kVertexIndexCol = filter_table->schema()->GetFieldIndex(
317 GeneralParams::kVertexIndexCol);
318 auto column_array = filter_table->column(kVertexIndexCol)->chunk(0);
320 std::static_pointer_cast<arrow::Int64Array>(column_array);
321 for (int64_t i = 0; i < int64_array->length(); ++i) {
322 if (!int64_array->IsNull(i)) {
323 indices64.push_back(int64_array->Value(i));
333 Result<std::shared_ptr<VerticesCollection>>
335 const std::string& filter_label,
336 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type) {
337 auto prefix = graph_info->GetPrefix();
338 auto vertex_info = graph_info->GetVertexInfo(type);
339 auto labels = vertex_info->GetLabels();
340 auto vertices_collection =
341 std::make_shared<VerticesCollection>(vertex_info, prefix);
342 vertices_collection->filtered_ids_ =
343 vertices_collection->filter({filter_label}).value();
344 vertices_collection->is_filtered_ =
true;
345 return vertices_collection;
348 Result<std::shared_ptr<VerticesCollection>>
349 VerticesCollection::verticesWithLabelbyAcero(
350 const std::string& filter_label,
351 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type) {
352 auto prefix = graph_info->GetPrefix();
353 auto vertex_info = graph_info->GetVertexInfo(type);
354 auto labels = vertex_info->GetLabels();
355 auto vertices_collection =
356 std::make_shared<VerticesCollection>(vertex_info, prefix);
357 vertices_collection->filtered_ids_ =
358 vertices_collection->filter_by_acero({filter_label}).value();
359 vertices_collection->is_filtered_ =
true;
360 return vertices_collection;
363 Result<std::shared_ptr<VerticesCollection>>
365 const std::string& filter_label,
366 const std::shared_ptr<VerticesCollection>& vertices_collection) {
367 auto new_vertices_collection = std::make_shared<VerticesCollection>(
368 vertices_collection->vertex_info_, vertices_collection->prefix_);
370 new_vertices_collection
371 ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
373 if (vertices_collection->is_filtered_) {
374 std::unordered_set<IdType> origin_set(
375 vertices_collection->filtered_ids_.begin(),
376 vertices_collection->filtered_ids_.end());
377 std::unordered_set<int> intersection;
378 for (
int num : filtered_ids) {
379 if (origin_set.count(num)) {
380 intersection.insert(num);
384 std::vector<IdType>(intersection.begin(), intersection.end());
386 new_vertices_collection->is_filtered_ =
true;
388 new_vertices_collection->filtered_ids_ = filtered_ids;
390 return new_vertices_collection;
393 Result<std::shared_ptr<VerticesCollection>>
395 const std::vector<std::string>& filter_labels,
396 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type) {
397 auto prefix = graph_info->GetPrefix();
398 auto vertex_info = graph_info->GetVertexInfo(type);
399 auto labels = vertex_info->GetLabels();
400 auto vertices_collection =
401 std::make_shared<VerticesCollection>(vertex_info, prefix);
402 vertices_collection->filtered_ids_ =
403 vertices_collection->filter(filter_labels).value();
404 vertices_collection->is_filtered_ =
true;
405 return vertices_collection;
408 Result<std::shared_ptr<VerticesCollection>>
409 VerticesCollection::verticesWithMultipleLabelsbyAcero(
410 const std::vector<std::string>& filter_labels,
411 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type) {
412 auto prefix = graph_info->GetPrefix();
413 auto vertex_info = graph_info->GetVertexInfo(type);
414 auto labels = vertex_info->GetLabels();
415 auto vertices_collection =
416 std::make_shared<VerticesCollection>(vertex_info, prefix);
417 vertices_collection->filtered_ids_ =
418 vertices_collection->filter_by_acero(filter_labels).value();
419 vertices_collection->is_filtered_ =
true;
420 return vertices_collection;
423 Result<std::shared_ptr<VerticesCollection>>
425 const std::vector<std::string>& filter_labels,
426 const std::shared_ptr<VerticesCollection>& vertices_collection) {
427 auto new_vertices_collection = std::make_shared<VerticesCollection>(
428 vertices_collection->vertex_info_, vertices_collection->prefix_);
431 ->filter(filter_labels, &new_vertices_collection->valid_chunk_)
433 if (vertices_collection->is_filtered_) {
434 std::unordered_set<IdType> origin_set(
435 vertices_collection->filtered_ids_.begin(),
436 vertices_collection->filtered_ids_.end());
437 std::unordered_set<int> intersection;
438 for (
int num : filtered_ids) {
439 if (origin_set.count(num)) {
440 intersection.insert(num);
444 std::vector<IdType>(intersection.begin(), intersection.end());
446 new_vertices_collection->is_filtered_ =
true;
448 new_vertices_collection->filtered_ids_ = filtered_ids;
450 return new_vertices_collection;
453 Result<std::shared_ptr<VerticesCollection>>
454 VerticesCollection::verticesWithProperty(
455 const std::string property_name,
const graphar::util::Filter filter,
456 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type) {
457 auto prefix = graph_info->GetPrefix();
458 auto vertex_info = graph_info->GetVertexInfo(type);
459 auto vertices_collection =
460 std::make_shared<VerticesCollection>(vertex_info, prefix);
461 vertices_collection->filtered_ids_ =
462 vertices_collection->filter(property_name,
filter).value();
463 vertices_collection->is_filtered_ =
true;
464 return vertices_collection;
467 Result<std::shared_ptr<VerticesCollection>>
468 VerticesCollection::verticesWithProperty(
469 const std::string property_name,
const graphar::util::Filter filter,
470 const std::shared_ptr<VerticesCollection>& vertices_collection) {
471 auto new_vertices_collection = std::make_shared<VerticesCollection>(
472 vertices_collection->vertex_info_, vertices_collection->prefix_);
473 auto filtered_ids = vertices_collection
474 ->filter(property_name,
filter,
475 &new_vertices_collection->valid_chunk_)
477 if (vertices_collection->is_filtered_) {
478 std::unordered_set<IdType> origin_set(
479 vertices_collection->filtered_ids_.begin(),
480 vertices_collection->filtered_ids_.end());
481 std::unordered_set<int> intersection;
482 for (
int num : filtered_ids) {
483 if (origin_set.count(num)) {
484 intersection.insert(num);
488 std::vector<IdType>(intersection.begin(), intersection.end());
489 new_vertices_collection->is_filtered_ =
true;
491 new_vertices_collection->filtered_ids_ = filtered_ids;
492 return new_vertices_collection;
495 template <
typename T>
497 if constexpr (std::is_final<T>::value) {
498 auto it = list_properties_.find(
property);
499 if (it == list_properties_.end()) {
503 auto array = std::dynamic_pointer_cast<
506 const typename T::ValueType* values = array->raw_values();
507 return T(values, array->length());
509 if (properties_.find(
property) == properties_.end()) {
511 " does not exist in the vertex.");
514 if (!properties_.at(
property).has_value())
516 T ret = std::any_cast<T>(properties_.at(
property));
518 }
catch (
const std::bad_any_cast& e) {
520 property,
" is not matched ", e.what());
527 if (properties_.find(
property) == properties_.end()) {
529 " does not exist in the vertex.");
532 if (!properties_.at(
property).has_value())
534 Date ret(std::any_cast<Date::c_type>(properties_.at(
property)));
536 }
catch (
const std::bad_any_cast& e) {
538 " is not matched ", e.what());
544 if (properties_.find(
property) == properties_.end()) {
546 " does not exist in the vertex.");
549 if (!properties_.at(
property).has_value())
551 Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(
property)));
553 }
catch (
const std::bad_any_cast& e) {
555 " is not matched ", e.what());
561 auto it = list_properties_.find(
property);
562 if (it == list_properties_.end()) {
565 auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
566 return StringArray(array->raw_value_offsets(), array->raw_data(),
572 std::vector<AdjListPropertyArrowChunkReader>& property_readers) {
574 GAR_ASSIGN_OR_RAISE_ERROR(
auto adj_list_chunk_table,
576 src_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
577 adj_list_chunk_table->column(0)->chunk(0))
579 dst_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
580 adj_list_chunk_table->column(1)->chunk(0))
582 for (
auto& reader : property_readers) {
584 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk_table, reader.GetChunk());
585 auto schema = chunk_table->schema();
586 for (
int i = 0; i < schema->num_fields(); ++i) {
587 auto field = chunk_table->field(i);
588 if (field->type()->id() == arrow::Type::LIST) {
589 auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
590 chunk_table->column(i)->chunk(0));
591 list_properties_[field->name()] = list_array->value_slice(0);
593 auto type = DataType::ArrowDataTypeToDataType(field->type());
594 GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
595 chunk_table->column(i)->chunk(0),
596 properties_[field->name()]));
602 template <
typename T>
604 if constexpr (std::is_final<T>::value) {
605 auto it = list_properties_.find(
property);
606 if (it == list_properties_.end()) {
610 auto array = std::dynamic_pointer_cast<
613 const typename T::ValueType* values = array->raw_values();
614 return T(values, array->length());
616 if (properties_.find(
property) == properties_.end()) {
618 " does not exist in the edge.");
621 if (!properties_.at(
property).has_value())
623 T ret = std::any_cast<T>(properties_.at(
property));
625 }
catch (
const std::bad_any_cast& e) {
627 property,
" is not matched ", e.what());
634 if (properties_.find(
property) == properties_.end()) {
636 " does not exist in the edge.");
639 if (!properties_.at(
property).has_value())
641 Date ret(std::any_cast<Date::c_type>(properties_.at(
property)));
643 }
catch (
const std::bad_any_cast& e) {
645 " is not matched ", e.what());
650 Result<Timestamp>
Edge::property(
const std::string& property)
const {
651 if (properties_.find(
property) == properties_.end()) {
653 " does not exist in the edge.");
656 if (!properties_.at(
property).has_value())
658 Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(
property)));
660 }
catch (
const std::bad_any_cast& e) {
662 " is not matched ", e.what());
667 Result<StringArray>
Edge::property(
const std::string& property)
const {
668 auto it = list_properties_.find(
property);
669 if (it == list_properties_.end()) {
672 auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
673 return StringArray(array->raw_value_offsets(), array->raw_data(),
677 #define INSTANTIATE_PROPERTY(T) \
678 template Result<T> Vertex::property<T>(const std::string& name) const; \
679 template Result<T> Edge::property<T>(const std::string& name) const;
681 INSTANTIATE_PROPERTY(
bool)
682 INSTANTIATE_PROPERTY(
const bool&)
683 INSTANTIATE_PROPERTY(int32_t)
684 INSTANTIATE_PROPERTY(
const int32_t&)
685 INSTANTIATE_PROPERTY(Int32Array)
686 INSTANTIATE_PROPERTY(int64_t)
687 INSTANTIATE_PROPERTY(
const int64_t&)
688 INSTANTIATE_PROPERTY(Int64Array)
689 INSTANTIATE_PROPERTY(
float)
690 INSTANTIATE_PROPERTY(
const float&)
691 INSTANTIATE_PROPERTY(FloatArray)
692 INSTANTIATE_PROPERTY(
double)
693 INSTANTIATE_PROPERTY(
const double&)
694 INSTANTIATE_PROPERTY(DoubleArray)
695 INSTANTIATE_PROPERTY(std::string)
696 INSTANTIATE_PROPERTY(
const std::string&)
699 adj_list_reader_.seek(cur_offset_);
700 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk, adj_list_reader_.GetChunk());
701 auto src_column = chunk->column(0);
702 return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
707 adj_list_reader_.
seek(cur_offset_);
708 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk, adj_list_reader_.
GetChunk());
709 auto src_column = chunk->column(1);
710 return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
719 if (adj_list_type_ == AdjListType::ordered_by_dest ||
720 adj_list_type_ == AdjListType::unordered_by_dest) {
721 if (from.global_chunk_index_ >= chunk_end_) {
724 if (from.global_chunk_index_ == global_chunk_index_) {
725 cur_offset_ = from.cur_offset_;
726 }
else if (from.global_chunk_index_ < chunk_begin_) {
729 global_chunk_index_ = from.global_chunk_index_;
730 cur_offset_ = from.cur_offset_;
731 vertex_chunk_index_ = from.vertex_chunk_index_;
743 if (adj_list_type_ == AdjListType::unordered_by_source) {
744 IdType expect_chunk_index =
745 index_converter_->IndexPairToGlobalChunkIndex(
id / src_chunk_size_, 0);
746 if (expect_chunk_index > chunk_end_)
748 if (from.global_chunk_index_ >= chunk_end_) {
751 bool need_refresh =
false;
752 if (from.global_chunk_index_ == global_chunk_index_) {
753 cur_offset_ = from.cur_offset_;
754 }
else if (from.global_chunk_index_ < chunk_begin_) {
757 global_chunk_index_ = from.global_chunk_index_;
758 cur_offset_ = from.cur_offset_;
759 vertex_chunk_index_ = from.vertex_chunk_index_;
762 if (global_chunk_index_ < expect_chunk_index) {
763 global_chunk_index_ = expect_chunk_index;
765 vertex_chunk_index_ =
id / src_chunk_size_;
773 if (vertex_chunk_index_ >
id / src_chunk_size_)
781 auto st = offset_reader_->seek(
id);
785 auto maybe_offset_chunk = offset_reader_->GetChunk();
786 if (!maybe_offset_chunk.status().ok()) {
790 std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
791 auto begin_offset =
static_cast<IdType
>(offset_array->Value(0));
792 auto end_offset =
static_cast<IdType
>(offset_array->Value(1));
793 if (begin_offset >= end_offset) {
796 auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
797 auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
798 vertex_chunk_index_of_id, begin_offset / chunk_size_);
799 auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
800 vertex_chunk_index_of_id, end_offset / chunk_size_);
801 if (begin_global_index <= from.global_chunk_index_ &&
802 from.global_chunk_index_ <= end_global_index) {
803 if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
804 global_chunk_index_ = from.global_chunk_index_;
805 cur_offset_ = from.cur_offset_;
806 vertex_chunk_index_ = from.vertex_chunk_index_;
809 }
else if (from.cur_offset_ <= begin_offset) {
810 global_chunk_index_ = begin_global_index;
811 cur_offset_ = begin_offset;
812 vertex_chunk_index_ = vertex_chunk_index_of_id;
818 }
else if (from.global_chunk_index_ < begin_global_index) {
819 global_chunk_index_ = begin_global_index;
820 cur_offset_ = begin_offset;
821 vertex_chunk_index_ = vertex_chunk_index_of_id;
834 if (adj_list_type_ == AdjListType::ordered_by_source ||
835 adj_list_type_ == AdjListType::unordered_by_source) {
836 if (from.global_chunk_index_ >= chunk_end_) {
839 if (from.global_chunk_index_ == global_chunk_index_) {
840 cur_offset_ = from.cur_offset_;
841 }
else if (from.global_chunk_index_ < chunk_begin_) {
844 global_chunk_index_ = from.global_chunk_index_;
845 cur_offset_ = from.cur_offset_;
846 vertex_chunk_index_ = from.vertex_chunk_index_;
858 if (adj_list_type_ == AdjListType::unordered_by_dest) {
859 IdType expect_chunk_index =
860 index_converter_->IndexPairToGlobalChunkIndex(
id / dst_chunk_size_, 0);
861 if (expect_chunk_index > chunk_end_)
863 if (from.global_chunk_index_ >= chunk_end_) {
866 bool need_refresh =
false;
867 if (from.global_chunk_index_ == global_chunk_index_) {
868 cur_offset_ = from.cur_offset_;
869 }
else if (from.global_chunk_index_ < chunk_begin_) {
872 global_chunk_index_ = from.global_chunk_index_;
873 cur_offset_ = from.cur_offset_;
874 vertex_chunk_index_ = from.vertex_chunk_index_;
877 if (global_chunk_index_ < expect_chunk_index) {
878 global_chunk_index_ = expect_chunk_index;
880 vertex_chunk_index_ =
id / dst_chunk_size_;
888 if (vertex_chunk_index_ >
id / dst_chunk_size_)
896 auto st = offset_reader_->seek(
id);
900 auto maybe_offset_chunk = offset_reader_->GetChunk();
901 if (!maybe_offset_chunk.status().ok()) {
905 std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
906 auto begin_offset =
static_cast<IdType
>(offset_array->Value(0));
907 auto end_offset =
static_cast<IdType
>(offset_array->Value(1));
908 if (begin_offset >= end_offset) {
911 auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
912 auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
913 vertex_chunk_index_of_id, begin_offset / chunk_size_);
914 auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
915 vertex_chunk_index_of_id, end_offset / chunk_size_);
916 if (begin_global_index <= from.global_chunk_index_ &&
917 from.global_chunk_index_ <= end_global_index) {
918 if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
919 global_chunk_index_ = from.global_chunk_index_;
920 cur_offset_ = from.cur_offset_;
921 vertex_chunk_index_ = from.vertex_chunk_index_;
924 }
else if (from.cur_offset_ <= begin_offset) {
925 global_chunk_index_ = begin_global_index;
926 cur_offset_ = begin_offset;
927 vertex_chunk_index_ = vertex_chunk_index_of_id;
933 }
else if (from.global_chunk_index_ < begin_global_index) {
934 global_chunk_index_ = begin_global_index;
935 cur_offset_ = begin_offset;
936 vertex_chunk_index_ = vertex_chunk_index_of_id;
945 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
946 const std::string& edge_type,
const std::string& dst_type,
947 AdjListType adj_list_type,
const IdType vertex_chunk_begin,
948 const IdType vertex_chunk_end) noexcept {
949 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
952 dst_type,
" doesn't exist.");
954 if (!edge_info->HasAdjacentListType(adj_list_type)) {
956 AdjListTypeToString(adj_list_type),
959 switch (adj_list_type) {
960 case AdjListType::ordered_by_source:
961 return std::make_shared<OBSEdgeCollection>(
962 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
964 case AdjListType::ordered_by_dest:
965 return std::make_shared<OBDEdgesCollection>(
966 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
968 case AdjListType::unordered_by_source:
969 return std::make_shared<UBSEdgesCollection>(
970 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
972 case AdjListType::unordered_by_dest:
973 return std::make_shared<UBDEdgesCollection>(
974 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
The arrow chunk reader for adj list topology chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
Result< T > property(const std::string &property) const
Get the property value of the edge.
The iterator for traversing a type of edges.
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_type, const std::string &edge_type, const std::string &dst_type, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge type.
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)
Result< std::vector< std::string > > label() noexcept
Result< bool > hasLabel(const std::string &label) noexcept
Result< std::shared_ptr< arrow::Table > > GetLabelChunk()
Return the current arrow label chunk table of chunk position indicator.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.
static Result< std::shared_ptr< VerticesCollection > > verticesWithMultipleLabels(const std::vector< std::string > &filter_labels, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with multiple labels.
static Result< std::shared_ptr< VerticesCollection > > verticesWithLabel(const std::string &filter_label, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with a specific label.
Result< std::vector< IdType > > filter(const std::vector< std::string > &filter_labels, std::vector< IdType > *new_valid_chunk=nullptr)