22 #include "arrow/api.h"
23 #include "arrow/compute/api.h"
25 #include "graphar/arrow/chunk_reader.h"
26 #include "graphar/filesystem.h"
27 #include "graphar/general_params.h"
28 #include "graphar/graph_info.h"
29 #include "graphar/reader_util.h"
30 #include "graphar/result.h"
31 #include "graphar/status.h"
32 #include "graphar/types.h"
33 #include "graphar/util.h"
39 Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
40 const std::shared_ptr<PropertyGroup> pg,
41 bool contain_index_column =
false) {
42 std::vector<std::shared_ptr<arrow::Field>> fields;
43 if (contain_index_column) {
44 fields.push_back(std::make_shared<arrow::Field>(
45 GeneralParams::kVertexIndexCol, arrow::int64()));
47 for (
const auto& prop : pg->GetProperties()) {
48 fields.push_back(std::make_shared<arrow::Field>(
49 prop.name, DataType::DataTypeToArrowDataType(prop.type)));
51 return arrow::schema(fields);
54 Status GeneralCast(
const std::shared_ptr<arrow::Array>& in,
55 const std::shared_ptr<arrow::DataType>& to_type,
56 std::shared_ptr<arrow::Array>* out) {
57 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
58 arrow::compute::Cast(*in, to_type));
62 Status CastStringToLargeString(
const std::shared_ptr<arrow::Array>& in,
63 const std::shared_ptr<arrow::DataType>& to_type,
64 std::shared_ptr<arrow::Array>* out) {
65 auto array_data = in->data()->Copy();
66 auto offset = array_data->buffers[1];
67 using from_offset_type =
typename arrow::StringArray::offset_type;
68 using to_string_offset_type =
typename arrow::LargeStringArray::offset_type;
69 auto raw_value_offsets_ =
72 :
reinterpret_cast<const from_offset_type*
>(offset->data());
73 std::vector<to_string_offset_type> to_offset(offset->size() /
74 sizeof(from_offset_type));
75 for (
size_t i = 0; i < to_offset.size(); ++i) {
76 to_offset[i] = raw_value_offsets_[i];
78 std::shared_ptr<arrow::Buffer> buffer;
79 arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
81 buffer_builder.Append(to_offset.data(), to_offset.size()));
82 RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
83 array_data->type = to_type;
84 array_data->buffers[1] = buffer;
85 *out = arrow::MakeArray(array_data);
86 RETURN_NOT_ARROW_OK((*out)->ValidateFull());
91 Status CastTableWithSchema(
const std::shared_ptr<arrow::Table>& table,
92 const std::shared_ptr<arrow::Schema>& schema,
93 std::shared_ptr<arrow::Table>* out_table) {
94 if (table->schema()->Equals(*schema)) {
97 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
98 for (int64_t i = 0; i < table->num_columns(); ++i) {
99 auto column = table->column(i);
100 if (table->field(i)->type()->Equals(schema->field(i)->type())) {
101 columns.push_back(column);
104 auto from_t = table->field(i)->type();
105 auto to_t = schema->field(i)->type();
106 std::vector<std::shared_ptr<arrow::Array>> chunks;
108 for (int64_t j = 0; j < column->num_chunks(); ++j) {
109 auto chunk = column->chunk(j);
110 std::shared_ptr<arrow::Array> out;
111 if (arrow::compute::CanCast(*from_t, *to_t)) {
112 GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
113 chunks.push_back(out);
114 }
else if (from_t->Equals(arrow::utf8()) &&
115 to_t->Equals(arrow::large_utf8())) {
116 GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
117 chunks.push_back(out);
120 columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
123 *out_table = arrow::Table::Make(schema, columns);
129 const std::shared_ptr<VertexInfo>& vertex_info,
130 const std::shared_ptr<PropertyGroup>& property_group,
132 : vertex_info_(std::move(vertex_info)),
133 property_group_(std::move(property_group)),
137 chunk_table_(nullptr),
138 filter_options_(options) {
139 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
140 GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
141 vertex_info->GetPathPrefix(property_group));
142 std::string base_dir = prefix_ + pg_path_prefix;
143 GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
144 util::GetVertexChunkNum(prefix_, vertex_info));
145 GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
146 util::GetVertexNum(prefix_, vertex_info_));
147 GAR_ASSIGN_OR_RAISE_ERROR(schema_,
148 PropertyGroupToSchema(property_group_,
true));
153 IdType pre_chunk_index = chunk_index_;
154 chunk_index_ =
id / vertex_info_->GetChunkSize();
155 if (chunk_index_ != pre_chunk_index) {
158 chunk_table_.reset();
160 if (chunk_index_ >= chunk_num_) {
162 chunk_num_ * vertex_info_->GetChunkSize(),
163 ") of vertex ", vertex_info_->GetLabel());
168 Result<std::shared_ptr<arrow::Table>>
170 GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
171 if (chunk_table_ ==
nullptr) {
173 auto chunk_file_path,
174 vertex_info_->GetFilePath(property_group_, chunk_index_));
175 std::string path = prefix_ + chunk_file_path;
177 chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
180 if (schema_ !=
nullptr && filter_options_.filter ==
nullptr) {
182 CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
185 IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
186 return chunk_table_->Slice(row_offset);
190 if (++chunk_index_ >= chunk_num_) {
192 "vertex chunk index ", chunk_index_,
" is out-of-bounds for vertex ",
193 vertex_info_->GetLabel(),
" chunk num ", chunk_num_);
195 seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
196 chunk_table_.reset();
202 filter_options_.filter = filter;
206 filter_options_.columns = column_names;
209 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
211 const std::shared_ptr<VertexInfo>& vertex_info,
212 const std::shared_ptr<PropertyGroup>& property_group,
214 return std::make_shared<VertexPropertyArrowChunkReader>(
215 vertex_info, property_group, prefix, options);
218 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
220 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& label,
221 const std::shared_ptr<PropertyGroup>& property_group,
223 auto vertex_info = graph_info->GetVertexInfo(label);
226 " doesn't exist in graph ", graph_info->GetName(),
229 return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
232 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
234 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& label,
236 auto vertex_info = graph_info->GetVertexInfo(label);
239 " doesn't exist in graph ", graph_info->GetName(),
242 auto property_group = vertex_info->GetPropertyGroup(property_name);
243 if (!property_group) {
245 " doesn't exist in vertex type ", label,
".");
247 return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
251 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
252 const std::string& prefix)
253 : edge_info_(edge_info),
254 adj_list_type_(adj_list_type),
256 vertex_chunk_index_(0),
259 chunk_table_(nullptr),
261 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
262 GAR_ASSIGN_OR_RAISE_ERROR(
auto adj_list_path_prefix,
263 edge_info->GetAdjListPathPrefix(adj_list_type));
264 base_dir_ = prefix_ + adj_list_path_prefix;
265 GAR_ASSIGN_OR_RAISE_ERROR(
267 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
272 : edge_info_(other.edge_info_),
273 adj_list_type_(other.adj_list_type_),
274 prefix_(other.prefix_),
275 vertex_chunk_index_(other.vertex_chunk_index_),
276 chunk_index_(other.chunk_index_),
277 seek_offset_(other.seek_offset_),
278 chunk_table_(nullptr),
279 vertex_chunk_num_(other.vertex_chunk_num_),
280 chunk_num_(other.chunk_num_),
281 base_dir_(other.base_dir_),
285 if (adj_list_type_ != AdjListType::unordered_by_source &&
286 adj_list_type_ != AdjListType::ordered_by_source) {
288 edge_info_->GetEdgeLabel(),
" reader with ",
289 AdjListTypeToString(adj_list_type_),
" type.");
292 IdType new_vertex_chunk_index =
id / edge_info_->GetSrcChunkSize();
293 if (new_vertex_chunk_index >= vertex_chunk_num_) {
295 "The source internal id ",
id,
" is out of range [0,",
296 edge_info_->GetSrcChunkSize() * vertex_chunk_num_,
") of edge ",
297 edge_info_->GetEdgeLabel(),
" reader.");
299 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
301 vertex_chunk_index_ = new_vertex_chunk_index;
302 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
303 chunk_table_.reset();
306 if (adj_list_type_ == AdjListType::unordered_by_source) {
309 GAR_ASSIGN_OR_RAISE(
auto range,
310 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
311 adj_list_type_,
id));
312 return seek(range.first);
318 if (adj_list_type_ != AdjListType::unordered_by_dest &&
319 adj_list_type_ != AdjListType::ordered_by_dest) {
321 edge_info_->GetEdgeLabel(),
" reader with ",
322 AdjListTypeToString(adj_list_type_),
" type.");
325 IdType new_vertex_chunk_index =
id / edge_info_->GetDstChunkSize();
326 if (new_vertex_chunk_index >= vertex_chunk_num_) {
328 "The destination internal id ",
id,
" is out of range [0,",
329 edge_info_->GetDstChunkSize() * vertex_chunk_num_,
") of edge ",
330 edge_info_->GetEdgeLabel(),
" reader.");
332 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
334 vertex_chunk_index_ = new_vertex_chunk_index;
335 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
336 chunk_table_.reset();
339 if (adj_list_type_ == AdjListType::unordered_by_dest) {
342 GAR_ASSIGN_OR_RAISE(
auto range,
343 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
344 adj_list_type_,
id));
345 return seek(range.first);
350 seek_offset_ = offset;
351 IdType pre_chunk_index = chunk_index_;
352 chunk_index_ = offset / edge_info_->GetChunkSize();
353 if (chunk_index_ != pre_chunk_index) {
354 chunk_table_.reset();
356 if (chunk_num_ < 0) {
358 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
360 if (chunk_index_ >= chunk_num_) {
362 " is out of range [0,",
363 edge_info_->GetChunkSize() * chunk_num_,
364 "), edge label: ", edge_info_->GetEdgeLabel());
370 if (chunk_table_ ==
nullptr) {
372 GAR_ASSIGN_OR_RAISE(
auto edge_num,
373 util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
374 vertex_chunk_index_));
378 GAR_ASSIGN_OR_RAISE(
auto chunk_file_path,
379 edge_info_->GetAdjListFilePath(
380 vertex_chunk_index_, chunk_index_, adj_list_type_));
381 std::string path = prefix_ + chunk_file_path;
382 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
383 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
385 IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
386 return chunk_table_->Slice(row_offset);
391 if (chunk_num_ < 0) {
393 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
395 while (chunk_index_ >= chunk_num_) {
396 ++vertex_chunk_index_;
397 if (vertex_chunk_index_ >= vertex_chunk_num_) {
399 " is out-of-bounds for vertex chunk num ",
403 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
405 seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
406 chunk_table_.reset();
411 IdType chunk_index) {
412 if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
413 vertex_chunk_index_ = vertex_chunk_index;
414 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
415 chunk_table_.reset();
417 if (chunk_index_ != chunk_index) {
418 chunk_index_ = chunk_index;
419 seek_offset_ = chunk_index * edge_info_->GetChunkSize();
420 chunk_table_.reset();
426 if (chunk_table_ ==
nullptr) {
427 GAR_ASSIGN_OR_RAISE(
auto chunk_file_path,
428 edge_info_->GetAdjListFilePath(
429 vertex_chunk_index_, chunk_index_, adj_list_type_));
430 std::string path = prefix_ + chunk_file_path;
431 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
432 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
434 return chunk_table_->num_rows();
438 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
439 const std::string& prefix) {
440 if (!edge_info->HasAdjacentListType(adj_list_type)) {
442 "The adjacent list type ", AdjListTypeToString(adj_list_type),
443 " doesn't exist in edge ", edge_info->GetEdgeLabel(),
".");
445 return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
450 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
451 const std::string& edge_label,
const std::string& dst_label,
452 AdjListType adj_list_type) {
453 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
456 dst_label,
" doesn't exist.");
458 return Make(edge_info, adj_list_type, graph_info->GetPrefix());
461 Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
462 GAR_ASSIGN_OR_RAISE(chunk_num_,
463 util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
464 vertex_chunk_index_));
469 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
470 const std::string& prefix)
471 : edge_info_(std::move(edge_info)),
472 adj_list_type_(adj_list_type),
476 chunk_table_(nullptr) {
477 std::string base_dir;
478 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
479 GAR_ASSIGN_OR_RAISE_ERROR(
auto dir_path,
480 edge_info->GetOffsetPathPrefix(adj_list_type));
481 base_dir_ = prefix_ + dir_path;
482 if (adj_list_type == AdjListType::ordered_by_source ||
483 adj_list_type == AdjListType::ordered_by_dest) {
484 GAR_ASSIGN_OR_RAISE_ERROR(
486 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
487 vertex_chunk_size_ = adj_list_type == AdjListType::ordered_by_source
488 ? edge_info_->GetSrcChunkSize()
489 : edge_info_->GetDstChunkSize();
491 std::string err_msg =
"Invalid adj list type " +
492 std::string(AdjListTypeToString(adj_list_type)) +
493 " to construct AdjListOffsetReader.";
494 throw std::runtime_error(err_msg);
500 IdType pre_chunk_index = chunk_index_;
501 chunk_index_ =
id / vertex_chunk_size_;
502 if (chunk_index_ != pre_chunk_index) {
503 chunk_table_.reset();
505 if (chunk_index_ >= vertex_chunk_num_) {
507 vertex_chunk_num_ * vertex_chunk_size_,
508 "), of edge ", edge_info_->GetEdgeLabel(),
509 " of adj list type ",
510 AdjListTypeToString(adj_list_type_),
".");
515 Result<std::shared_ptr<arrow::Array>>
517 if (chunk_table_ ==
nullptr) {
519 auto chunk_file_path,
520 edge_info_->GetAdjListOffsetFilePath(chunk_index_, adj_list_type_));
521 std::string path = prefix_ + chunk_file_path;
522 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
523 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
525 IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
526 return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
530 if (++chunk_index_ >= vertex_chunk_num_) {
532 " is out-of-bounds for vertex chunk num ",
533 vertex_chunk_num_,
" of edge ",
534 edge_info_->GetEdgeLabel(),
" of adj list type ",
535 AdjListTypeToString(adj_list_type_),
".");
537 seek_id_ = chunk_index_ * vertex_chunk_size_;
538 chunk_table_.reset();
543 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
545 AdjListType adj_list_type,
546 const std::string& prefix) {
547 if (!edge_info->HasAdjacentListType(adj_list_type)) {
549 "The adjacent list type ", AdjListTypeToString(adj_list_type),
550 " doesn't exist in edge ", edge_info->GetEdgeLabel(),
".");
552 return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
553 adj_list_type, prefix);
556 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
558 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
559 const std::string& edge_label,
const std::string& dst_label,
560 AdjListType adj_list_type) {
561 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
564 dst_label,
" doesn't exist.");
566 return Make(edge_info, adj_list_type, graph_info->GetPrefix());
570 const std::shared_ptr<EdgeInfo>& edge_info,
571 const std::shared_ptr<PropertyGroup>& property_group,
572 AdjListType adj_list_type,
const std::string prefix,
574 : edge_info_(std::move(edge_info)),
575 property_group_(std::move(property_group)),
576 adj_list_type_(adj_list_type),
578 vertex_chunk_index_(0),
582 chunk_table_(nullptr),
583 filter_options_(options),
585 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
586 GAR_ASSIGN_OR_RAISE_ERROR(
588 edge_info->GetPropertyGroupPathPrefix(property_group, adj_list_type));
589 base_dir_ = prefix_ + pg_path_prefix;
590 GAR_ASSIGN_OR_RAISE_ERROR(
592 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
593 GAR_ASSIGN_OR_RAISE_ERROR(schema_,
594 PropertyGroupToSchema(property_group,
false));
599 : edge_info_(other.edge_info_),
600 property_group_(other.property_group_),
601 adj_list_type_(other.adj_list_type_),
602 prefix_(other.prefix_),
603 vertex_chunk_index_(other.vertex_chunk_index_),
604 chunk_index_(other.chunk_index_),
605 seek_offset_(other.seek_offset_),
606 schema_(other.schema_),
607 chunk_table_(nullptr),
608 filter_options_(other.filter_options_),
609 vertex_chunk_num_(other.vertex_chunk_num_),
610 chunk_num_(other.chunk_num_),
611 base_dir_(other.base_dir_),
615 if (adj_list_type_ != AdjListType::unordered_by_source &&
616 adj_list_type_ != AdjListType::ordered_by_source) {
618 edge_info_->GetEdgeLabel(),
" reader with ",
619 AdjListTypeToString(adj_list_type_),
" type.");
622 IdType new_vertex_chunk_index =
id / edge_info_->GetSrcChunkSize();
623 if (new_vertex_chunk_index >= vertex_chunk_num_) {
625 "The source internal id ",
id,
" is out of range [0,",
626 edge_info_->GetSrcChunkSize() * vertex_chunk_num_,
") of edge ",
627 edge_info_->GetEdgeLabel(),
" reader.");
629 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
630 vertex_chunk_index_ = new_vertex_chunk_index;
631 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
632 chunk_table_.reset();
635 if (adj_list_type_ == AdjListType::unordered_by_source) {
638 GAR_ASSIGN_OR_RAISE(
auto range,
639 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
640 adj_list_type_,
id));
641 return seek(range.first);
647 if (adj_list_type_ != AdjListType::unordered_by_dest &&
648 adj_list_type_ != AdjListType::ordered_by_dest) {
650 edge_info_->GetEdgeLabel(),
" reader with ",
651 AdjListTypeToString(adj_list_type_),
" type.");
654 IdType new_vertex_chunk_index =
id / edge_info_->GetDstChunkSize();
655 if (new_vertex_chunk_index >= vertex_chunk_num_) {
657 "The destination internal id ",
id,
" is out of range [0,",
658 edge_info_->GetDstChunkSize() * vertex_chunk_num_,
") of edge ",
659 edge_info_->GetEdgeLabel(),
" reader.");
661 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
662 vertex_chunk_index_ = new_vertex_chunk_index;
663 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
664 chunk_table_.reset();
667 if (adj_list_type_ == AdjListType::unordered_by_dest) {
670 GAR_ASSIGN_OR_RAISE(
auto range,
671 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
672 adj_list_type_,
id));
673 return seek(range.first);
678 IdType pre_chunk_index = chunk_index_;
679 seek_offset_ = offset;
680 chunk_index_ = offset / edge_info_->GetChunkSize();
681 if (chunk_index_ != pre_chunk_index) {
682 chunk_table_.reset();
684 if (chunk_num_ < 0) {
686 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
688 if (chunk_index_ >= chunk_num_) {
690 " is out of range [0,",
691 edge_info_->GetChunkSize() * chunk_num_,
692 "), edge label: ", edge_info_->GetEdgeLabel());
697 Result<std::shared_ptr<arrow::Table>>
699 GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
700 if (chunk_table_ ==
nullptr) {
702 GAR_ASSIGN_OR_RAISE(
auto edge_num,
703 util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
704 vertex_chunk_index_));
709 auto chunk_file_path,
710 edge_info_->GetPropertyFilePath(property_group_, adj_list_type_,
711 vertex_chunk_index_, chunk_index_));
712 std::string path = prefix_ + chunk_file_path;
714 chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
717 if (schema_ !=
nullptr && filter_options_.filter ==
nullptr) {
719 CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
722 IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
723 return chunk_table_->Slice(row_offset);
728 if (chunk_num_ < 0) {
730 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
732 while (chunk_index_ >= chunk_num_) {
733 ++vertex_chunk_index_;
734 if (vertex_chunk_index_ >= vertex_chunk_num_) {
736 "vertex chunk index ", vertex_chunk_index_,
737 " is out-of-bounds for vertex chunk num ", vertex_chunk_num_,
738 " of edge ", edge_info_->GetEdgeLabel(),
" of adj list type ",
739 AdjListTypeToString(adj_list_type_),
", property group ",
740 property_group_,
".");
743 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
745 seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
746 chunk_table_.reset();
751 IdType vertex_chunk_index, IdType chunk_index) {
752 if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
753 vertex_chunk_index_ = vertex_chunk_index;
754 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
755 chunk_table_.reset();
757 if (chunk_index_ != chunk_index) {
758 chunk_index_ = chunk_index;
759 seek_offset_ = chunk_index * edge_info_->GetChunkSize();
760 chunk_table_.reset();
766 filter_options_.filter = filter;
770 filter_options_.columns = column_names;
773 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
775 const std::shared_ptr<EdgeInfo>& edge_info,
776 const std::shared_ptr<PropertyGroup>& property_group,
777 AdjListType adj_list_type,
const std::string& prefix,
779 if (!edge_info->HasAdjacentListType(adj_list_type)) {
781 "The adjacent list type ", AdjListTypeToString(adj_list_type),
782 " doesn't exist in edge ", edge_info->GetEdgeLabel(),
".");
784 return std::make_shared<AdjListPropertyArrowChunkReader>(
785 edge_info, property_group, adj_list_type, prefix, options);
788 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
790 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
791 const std::string& edge_label,
const std::string& dst_label,
792 const std::shared_ptr<PropertyGroup>& property_group,
794 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
797 dst_label,
" doesn't exist.");
799 return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
803 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
805 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
806 const std::string& edge_label,
const std::string& dst_label,
807 const std::string& property_name, AdjListType adj_list_type,
809 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
812 dst_label,
" doesn't exist.");
814 auto property_group = edge_info->GetPropertyGroup(property_name);
815 if (!property_group) {
817 " doesn't exist in edge ", src_label,
" ",
818 edge_label,
" ", dst_label,
".");
820 return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
824 Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
825 GAR_ASSIGN_OR_RAISE(chunk_num_,
826 util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
827 vertex_chunk_index_));
The arrow chunk reader for adj list topology chunk.
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
AdjListArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListArrowChunkReader.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
static Result< std::shared_ptr< AdjListArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek_dst(IdType offset)
Sets chunk position indicator for reader by destination vertex id.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Array > > GetChunk()
Get the current offset chunk as arrow::Array.
AdjListOffsetArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListOffsetArrowChunkReader.
static Result< std::shared_ptr< AdjListOffsetArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListOffsetArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk. if current chunk is the last chunk, will return Status::...
The arrow chunk reader for edge property group chunks.
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek_dst(IdType id)
Sets chunk position indicator for reader by destination vertex id.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
static Result< std::shared_ptr< AdjListPropertyArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string &prefix, const util::FilterOptions &options={})
Create an AdjListPropertyArrowChunkReader instance from edge info.
AdjListPropertyArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string prefix, const util::FilterOptions &options={})
Initialize the AdjListPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
Status outcome object (success or error)
static Status IndexError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current arrow chunk table of chunk position indicator.
VertexPropertyArrowChunkReader(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Initialize the VertexPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.