25 #include "arrow/api.h"
26 #include "arrow/compute/api.h"
28 #include "graphar/arrow/chunk_reader.h"
29 #include "graphar/filesystem.h"
30 #include "graphar/fwd.h"
31 #include "graphar/general_params.h"
32 #include "graphar/graph_info.h"
33 #include "graphar/reader_util.h"
34 #include "graphar/result.h"
35 #include "graphar/status.h"
36 #include "graphar/types.h"
37 #include "graphar/util.h"
43 Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
44 const std::shared_ptr<PropertyGroup> pg,
45 bool contain_index_column =
false) {
46 std::vector<std::shared_ptr<arrow::Field>> fields;
47 if (contain_index_column) {
48 fields.push_back(std::make_shared<arrow::Field>(
49 GeneralParams::kVertexIndexCol, arrow::int64()));
51 for (
const auto& prop : pg->GetProperties()) {
52 auto dataType = DataType::DataTypeToArrowDataType(prop.type);
53 if (prop.cardinality != Cardinality::SINGLE) {
54 dataType = arrow::list(dataType);
56 fields.push_back(std::make_shared<arrow::Field>(prop.name, dataType));
58 return arrow::schema(fields);
61 Result<std::shared_ptr<arrow::Schema>> LabelToSchema(
62 std::vector<std::string> labels,
bool contain_index_column =
false) {
63 std::vector<std::shared_ptr<arrow::Field>> fields;
64 if (contain_index_column) {
65 fields.push_back(std::make_shared<arrow::Field>(
66 GeneralParams::kVertexIndexCol, arrow::int64()));
68 for (
const auto& lab : labels) {
69 fields.push_back(std::make_shared<arrow::Field>(lab, arrow::boolean()));
71 return arrow::schema(fields);
73 Status GeneralCast(
const std::shared_ptr<arrow::Array>& in,
74 const std::shared_ptr<arrow::DataType>& to_type,
75 std::shared_ptr<arrow::Array>* out) {
76 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
77 arrow::compute::Cast(*in, to_type));
81 Status CastStringToLargeString(
const std::shared_ptr<arrow::Array>& in,
82 const std::shared_ptr<arrow::DataType>& to_type,
83 std::shared_ptr<arrow::Array>* out) {
84 auto array_data = in->data()->Copy();
85 auto offset = array_data->buffers[1];
86 using from_offset_type =
typename arrow::StringArray::offset_type;
87 using to_string_offset_type =
typename arrow::LargeStringArray::offset_type;
88 auto raw_value_offsets_ =
91 :
reinterpret_cast<const from_offset_type*
>(offset->data());
92 std::vector<to_string_offset_type> to_offset(offset->size() /
93 sizeof(from_offset_type));
94 for (
size_t i = 0; i < to_offset.size(); ++i) {
95 to_offset[i] = raw_value_offsets_[i];
97 std::shared_ptr<arrow::Buffer> buffer;
98 arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
100 buffer_builder.Append(to_offset.data(), to_offset.size()));
101 RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
102 array_data->type = to_type;
103 array_data->buffers[1] = buffer;
104 *out = arrow::MakeArray(array_data);
105 RETURN_NOT_ARROW_OK((*out)->ValidateFull());
110 Status CastTableWithSchema(
const std::shared_ptr<arrow::Table>& table,
111 const std::shared_ptr<arrow::Schema>& schema,
112 std::shared_ptr<arrow::Table>* out_table) {
113 if (table->schema()->Equals(*schema)) {
116 std::vector<std::shared_ptr<arrow::Field>> fields;
117 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
118 for (int64_t i = 0; i < table->num_columns(); ++i) {
119 auto column = table->column(i);
120 auto table_field = table->field(i);
121 auto field_name = table_field->name();
123 auto schema_field = schema->GetFieldByName(field_name);
124 if (table_field->type()->Equals(schema_field->type())) {
125 columns.push_back(column);
126 fields.push_back(table_field);
129 auto from_t = table_field->type();
130 auto to_t = schema_field->type();
131 std::vector<std::shared_ptr<arrow::Array>> chunks;
133 for (int64_t j = 0; j < column->num_chunks(); ++j) {
134 auto chunk = column->chunk(j);
135 std::shared_ptr<arrow::Array> out;
136 if (arrow::compute::CanCast(*from_t, *to_t)) {
137 GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
138 chunks.push_back(out);
139 }
else if (from_t->Equals(arrow::utf8()) &&
140 to_t->Equals(arrow::large_utf8())) {
141 GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
142 chunks.push_back(out);
145 fields.push_back(arrow::field(field_name, to_t));
146 columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
148 auto new_schema = std::make_shared<arrow::Schema>(fields);
149 *out_table = arrow::Table::Make(new_schema, columns);
154 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
155 const std::shared_ptr<VertexInfo>& vertex_info,
156 const std::shared_ptr<PropertyGroup>& property_group,
161 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
162 const std::shared_ptr<VertexInfo>& vertex_info,
163 const std::shared_ptr<PropertyGroup>& property_group,
164 const std::vector<std::string>& property_names,
const std::string& prefix,
166 : vertex_info_(std::move(vertex_info)),
167 property_group_(std::move(property_group)),
168 property_names_(std::move(property_names)),
172 chunk_table_(nullptr),
173 filter_options_(options) {
174 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
175 GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
176 vertex_info->GetPathPrefix(property_group));
177 std::string base_dir = prefix_ + pg_path_prefix;
178 GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
179 util::GetVertexChunkNum(prefix_, vertex_info));
180 GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
181 util::GetVertexNum(prefix_, vertex_info_));
182 GAR_ASSIGN_OR_RAISE_ERROR(schema_,
183 PropertyGroupToSchema(property_group_,
true));
187 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
188 const std::shared_ptr<VertexInfo>& vertex_info,
189 const std::vector<std::string>& labels,
const std::string& prefix,
191 : vertex_info_(std::move(vertex_info)),
196 chunk_table_(nullptr),
197 filter_options_(options) {
198 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
200 std::string base_dir = prefix_ + vertex_info_->GetPrefix() +
"labels/chunk" +
201 std::to_string(chunk_index_);
202 GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
203 util::GetVertexChunkNum(prefix_, vertex_info));
204 GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
205 util::GetVertexNum(prefix_, vertex_info_));
206 GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels));
211 IdType pre_chunk_index = chunk_index_;
212 chunk_index_ =
id / vertex_info_->GetChunkSize();
213 if (chunk_index_ != pre_chunk_index) {
216 chunk_table_.reset();
218 if (chunk_index_ >= chunk_num_) {
220 chunk_num_ * vertex_info_->GetChunkSize(),
221 ") of vertex ", vertex_info_->GetType());
226 Result<std::shared_ptr<arrow::Table>>
227 VertexPropertyArrowChunkReader::GetChunkV2() {
228 if (chunk_table_ ==
nullptr) {
230 auto chunk_file_path,
231 vertex_info_->GetFilePath(property_group_, chunk_index_));
232 std::vector<int> column_indices = {};
233 std::vector<std::string> property_names;
234 if (!filter_options_.columns && !property_names_.empty()) {
235 property_names = property_names_;
237 if (!property_names_.empty()) {
238 for (
const auto& col : filter_options_.columns.value().get()) {
239 if (std::find(property_names_.begin(), property_names_.end(), col) ==
240 property_names_.end()) {
242 " is not in select properties.");
244 property_names.push_back(col);
248 for (
const auto& col : property_names) {
249 auto field_index = schema_->GetFieldIndex(col);
250 if (field_index == -1) {
251 return Status::Invalid(
"Column ", col,
" is not in select properties.");
253 column_indices.push_back(field_index);
255 std::string path = prefix_ + chunk_file_path;
257 chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
259 if (schema_ !=
nullptr && filter_options_.filter ==
nullptr) {
261 CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
264 IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
265 return chunk_table_->Slice(row_offset);
268 Result<std::shared_ptr<arrow::Table>>
269 VertexPropertyArrowChunkReader::GetChunkV1() {
270 GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
271 if (chunk_table_ ==
nullptr) {
273 auto chunk_file_path,
274 vertex_info_->GetFilePath(property_group_, chunk_index_));
275 std::string path = prefix_ + chunk_file_path;
276 if (property_names_.empty()) {
279 fs_->ReadFileToTable(path, property_group_->GetFileType(),
282 util::FilterOptions temp_filter_options;
283 temp_filter_options.filter = filter_options_.filter;
284 std::vector<std::string> intersection_columns;
285 if (!filter_options_.columns) {
286 temp_filter_options.columns = std::ref(property_names_);
288 for (
const auto& col : filter_options_.columns.value().get()) {
289 if (std::find(property_names_.begin(), property_names_.end(), col) ==
290 property_names_.end()) {
292 " is not in select properties.");
295 temp_filter_options.columns = filter_options_.columns;
299 fs_->ReadFileToTable(path, property_group_->GetFileType(),
300 temp_filter_options));
303 if (schema_ !=
nullptr && filter_options_.filter ==
nullptr) {
305 CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
308 IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
309 return chunk_table_->Slice(row_offset);
313 GetChunkVersion version) {
315 case GetChunkVersion::V1:
317 case GetChunkVersion::V2:
319 case GetChunkVersion::AUTO:
320 if (filter_options_.filter !=
nullptr) {
330 Result<std::shared_ptr<arrow::Table>>
332 FileType filetype = FileType::PARQUET;
333 if (chunk_table_ ==
nullptr) {
334 std::string path = prefix_ + vertex_info_->GetPrefix() +
"labels/chunk" +
335 std::to_string(chunk_index_);
336 GAR_ASSIGN_OR_RAISE(chunk_table_,
337 fs_->ReadFileToTable(path, filetype, filter_options_));
344 IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
345 return chunk_table_->Slice(row_offset);
349 if (++chunk_index_ >= chunk_num_) {
351 "vertex chunk index ", chunk_index_,
" is out-of-bounds for vertex ",
352 vertex_info_->GetType(),
" chunk num ", chunk_num_);
354 seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
355 chunk_table_.reset();
361 filter_options_.filter = filter;
365 filter_options_.columns = column_names;
368 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
370 const std::shared_ptr<VertexInfo>& vertex_info,
371 const std::shared_ptr<PropertyGroup>& property_group,
373 return std::make_shared<VertexPropertyArrowChunkReader>(
374 vertex_info, property_group, prefix, options);
377 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
379 const std::shared_ptr<VertexInfo>& vertex_info,
380 const std::shared_ptr<PropertyGroup>& property_group,
381 const std::vector<std::string>& property_names,
const std::string& prefix,
383 return std::make_shared<VertexPropertyArrowChunkReader>(
384 vertex_info, property_group, property_names, prefix, options);
387 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
389 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
390 const std::shared_ptr<PropertyGroup>& property_group,
392 auto vertex_info = graph_info->GetVertexInfo(type);
395 " doesn't exist in graph ", graph_info->GetName(),
398 return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
401 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
403 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
405 auto vertex_info = graph_info->GetVertexInfo(type);
408 " doesn't exist in graph ", graph_info->GetName(),
411 auto property_group = vertex_info->GetPropertyGroup(property_name);
412 if (!property_group) {
414 " doesn't exist in vertex type ", type,
".");
416 std::vector<std::string> property_names = {property_name};
417 if (property_name != graphar::GeneralParams::kVertexIndexCol) {
418 property_names.insert(property_names.begin(),
419 graphar::GeneralParams::kVertexIndexCol);
421 return Make(vertex_info, property_group, property_names,
422 graph_info->GetPrefix(), options);
425 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
427 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
428 const std::vector<std::string>& property_names_or_labels,
430 switch (select_type) {
431 case SelectType::LABELS:
432 return MakeForLabels(graph_info, type, property_names_or_labels, options);
433 case SelectType::PROPERTIES:
439 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
441 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
442 const std::vector<std::string>& property_names,
444 auto vertex_info = graph_info->GetVertexInfo(type);
447 " doesn't exist in graph ", graph_info->GetName(),
450 if (property_names.empty()) {
453 bool hasIndexCol =
false;
454 std::vector<std::string> property_names_mutable = property_names;
455 if (property_names_mutable[property_names_mutable.size() - 1] ==
456 graphar::GeneralParams::kVertexIndexCol) {
458 std::iter_swap(property_names_mutable.begin(),
459 property_names_mutable.end() - 1);
461 auto property_group = vertex_info->GetPropertyGroup(
462 property_names_mutable[property_names_mutable.size() - 1]);
463 if (!property_group) {
466 property_names_mutable[property_names_mutable.size() - 1],
467 " doesn't exist in vertex type ", type,
".");
469 for (
int i = 0; i < property_names_mutable.size() - 1; i++) {
470 if (property_names_mutable[i] == graphar::GeneralParams::kVertexIndexCol) {
473 auto pg = vertex_info->GetPropertyGroup(property_names_mutable[i]);
476 " doesn't exist in vertex type ", type,
".");
478 if (pg != property_group) {
480 "The properties ", property_names_mutable[i],
" and ",
481 property_names_mutable[property_names_mutable.size() - 1],
482 " are not in the same property group, please use Make with "
483 "property_group instead.");
487 property_names_mutable.insert(property_names_mutable.begin(),
488 graphar::GeneralParams::kVertexIndexCol);
490 return Make(vertex_info, property_group, property_names_mutable,
491 graph_info->GetPrefix(), options);
493 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
495 const std::shared_ptr<VertexInfo>& vertex_info,
496 const std::vector<std::string>& labels,
const std::string& prefix,
498 return std::make_shared<VertexPropertyArrowChunkReader>(vertex_info, labels,
502 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
504 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
505 const std::vector<std::string>& labels,
507 auto vertex_info = graph_info->GetVertexInfo(type);
510 " doesn't exist in graph ", graph_info->GetName(),
513 return Make(vertex_info, labels, graph_info->GetPrefix(), options);
517 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
518 const std::string& prefix)
519 : edge_info_(edge_info),
520 adj_list_type_(adj_list_type),
522 vertex_chunk_index_(0),
525 chunk_table_(nullptr),
527 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
528 GAR_ASSIGN_OR_RAISE_ERROR(
auto adj_list_path_prefix,
529 edge_info->GetAdjListPathPrefix(adj_list_type));
530 base_dir_ = prefix_ + adj_list_path_prefix;
531 GAR_ASSIGN_OR_RAISE_ERROR(
533 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
538 : edge_info_(other.edge_info_),
539 adj_list_type_(other.adj_list_type_),
540 prefix_(other.prefix_),
541 vertex_chunk_index_(other.vertex_chunk_index_),
542 chunk_index_(other.chunk_index_),
543 seek_offset_(other.seek_offset_),
544 chunk_table_(nullptr),
545 vertex_chunk_num_(other.vertex_chunk_num_),
546 chunk_num_(other.chunk_num_),
547 base_dir_(other.base_dir_),
551 if (adj_list_type_ != AdjListType::unordered_by_source &&
552 adj_list_type_ != AdjListType::ordered_by_source) {
554 edge_info_->GetEdgeType(),
" reader with ",
555 AdjListTypeToString(adj_list_type_),
" type.");
558 IdType new_vertex_chunk_index =
id / edge_info_->GetSrcChunkSize();
559 if (new_vertex_chunk_index >= vertex_chunk_num_) {
561 "The source internal id ",
id,
" is out of range [0,",
562 edge_info_->GetSrcChunkSize() * vertex_chunk_num_,
") of edge ",
563 edge_info_->GetEdgeType(),
" reader.");
565 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
567 vertex_chunk_index_ = new_vertex_chunk_index;
568 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
569 chunk_table_.reset();
572 if (adj_list_type_ == AdjListType::unordered_by_source) {
575 GAR_ASSIGN_OR_RAISE(
auto range,
576 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
577 adj_list_type_,
id));
578 return seek(range.first);
584 if (adj_list_type_ != AdjListType::unordered_by_dest &&
585 adj_list_type_ != AdjListType::ordered_by_dest) {
587 edge_info_->GetEdgeType(),
" reader with ",
588 AdjListTypeToString(adj_list_type_),
" type.");
591 IdType new_vertex_chunk_index =
id / edge_info_->GetDstChunkSize();
592 if (new_vertex_chunk_index >= vertex_chunk_num_) {
594 "The destination internal id ",
id,
" is out of range [0,",
595 edge_info_->GetDstChunkSize() * vertex_chunk_num_,
") of edge ",
596 edge_info_->GetEdgeType(),
" reader.");
598 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
600 vertex_chunk_index_ = new_vertex_chunk_index;
601 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
602 chunk_table_.reset();
605 if (adj_list_type_ == AdjListType::unordered_by_dest) {
608 GAR_ASSIGN_OR_RAISE(
auto range,
609 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
610 adj_list_type_,
id));
611 return seek(range.first);
616 seek_offset_ = offset;
617 IdType pre_chunk_index = chunk_index_;
618 chunk_index_ = offset / edge_info_->GetChunkSize();
619 if (chunk_index_ != pre_chunk_index) {
620 chunk_table_.reset();
622 if (chunk_num_ < 0) {
624 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
626 if (chunk_index_ >= chunk_num_) {
628 " is out of range [0,",
629 edge_info_->GetChunkSize() * chunk_num_,
630 "), edge type: ", edge_info_->GetEdgeType());
636 if (chunk_table_ ==
nullptr) {
638 GAR_ASSIGN_OR_RAISE(
auto edge_num,
639 util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
640 vertex_chunk_index_));
644 GAR_ASSIGN_OR_RAISE(
auto chunk_file_path,
645 edge_info_->GetAdjListFilePath(
646 vertex_chunk_index_, chunk_index_, adj_list_type_));
647 std::string path = prefix_ + chunk_file_path;
648 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
649 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
651 IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
652 return chunk_table_->Slice(row_offset);
657 if (chunk_num_ < 0) {
659 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
661 while (chunk_index_ >= chunk_num_) {
662 ++vertex_chunk_index_;
663 if (vertex_chunk_index_ >= vertex_chunk_num_) {
665 " is out-of-bounds for vertex chunk num ",
669 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
671 seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
672 chunk_table_.reset();
677 IdType chunk_index) {
678 if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
679 vertex_chunk_index_ = vertex_chunk_index;
680 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
681 chunk_table_.reset();
683 if (chunk_index_ != chunk_index) {
684 chunk_index_ = chunk_index;
685 seek_offset_ = chunk_index * edge_info_->GetChunkSize();
686 chunk_table_.reset();
692 if (chunk_table_ ==
nullptr) {
693 GAR_ASSIGN_OR_RAISE(
auto chunk_file_path,
694 edge_info_->GetAdjListFilePath(
695 vertex_chunk_index_, chunk_index_, adj_list_type_));
696 std::string path = prefix_ + chunk_file_path;
697 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
698 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
700 return chunk_table_->num_rows();
704 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
705 const std::string& prefix) {
706 if (!edge_info->HasAdjacentListType(adj_list_type)) {
708 "The adjacent list type ", AdjListTypeToString(adj_list_type),
709 " doesn't exist in edge ", edge_info->GetEdgeType(),
".");
711 return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
716 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
717 const std::string& edge_type,
const std::string& dst_type,
718 AdjListType adj_list_type) {
719 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
722 dst_type,
" doesn't exist.");
724 return Make(edge_info, adj_list_type, graph_info->GetPrefix());
727 Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
728 GAR_ASSIGN_OR_RAISE(chunk_num_,
729 util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
730 vertex_chunk_index_));
735 const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
736 const std::string& prefix)
737 : edge_info_(std::move(edge_info)),
738 adj_list_type_(adj_list_type),
742 chunk_table_(nullptr) {
743 std::string base_dir;
744 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
745 GAR_ASSIGN_OR_RAISE_ERROR(
auto dir_path,
746 edge_info->GetOffsetPathPrefix(adj_list_type));
747 base_dir_ = prefix_ + dir_path;
748 if (adj_list_type == AdjListType::ordered_by_source ||
749 adj_list_type == AdjListType::ordered_by_dest) {
750 GAR_ASSIGN_OR_RAISE_ERROR(
752 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
753 vertex_chunk_size_ = adj_list_type == AdjListType::ordered_by_source
754 ? edge_info_->GetSrcChunkSize()
755 : edge_info_->GetDstChunkSize();
757 std::string err_msg =
"Invalid adj list type " +
758 std::string(AdjListTypeToString(adj_list_type)) +
759 " to construct AdjListOffsetReader.";
760 throw std::runtime_error(err_msg);
766 IdType pre_chunk_index = chunk_index_;
767 chunk_index_ =
id / vertex_chunk_size_;
768 if (chunk_index_ != pre_chunk_index) {
769 chunk_table_.reset();
771 if (chunk_index_ >= vertex_chunk_num_) {
773 vertex_chunk_num_ * vertex_chunk_size_,
774 "), of edge ", edge_info_->GetEdgeType(),
775 " of adj list type ",
776 AdjListTypeToString(adj_list_type_),
".");
781 Result<std::shared_ptr<arrow::Array>>
783 if (chunk_table_ ==
nullptr) {
785 auto chunk_file_path,
786 edge_info_->GetAdjListOffsetFilePath(chunk_index_, adj_list_type_));
787 std::string path = prefix_ + chunk_file_path;
788 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
789 GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
791 IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
792 return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
796 if (++chunk_index_ >= vertex_chunk_num_) {
798 " is out-of-bounds for vertex chunk num ",
799 vertex_chunk_num_,
" of edge ",
800 edge_info_->GetEdgeType(),
" of adj list type ",
801 AdjListTypeToString(adj_list_type_),
".");
803 seek_id_ = chunk_index_ * vertex_chunk_size_;
804 chunk_table_.reset();
809 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
811 AdjListType adj_list_type,
812 const std::string& prefix) {
813 if (!edge_info->HasAdjacentListType(adj_list_type)) {
815 "The adjacent list type ", AdjListTypeToString(adj_list_type),
816 " doesn't exist in edge ", edge_info->GetEdgeType(),
".");
818 return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
819 adj_list_type, prefix);
822 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
824 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
825 const std::string& edge_type,
const std::string& dst_type,
826 AdjListType adj_list_type) {
827 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
830 dst_type,
" doesn't exist.");
832 return Make(edge_info, adj_list_type, graph_info->GetPrefix());
836 const std::shared_ptr<EdgeInfo>& edge_info,
837 const std::shared_ptr<PropertyGroup>& property_group,
838 AdjListType adj_list_type,
const std::string prefix,
840 : edge_info_(std::move(edge_info)),
841 property_group_(std::move(property_group)),
842 adj_list_type_(adj_list_type),
844 vertex_chunk_index_(0),
848 chunk_table_(nullptr),
849 filter_options_(options),
851 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
852 GAR_ASSIGN_OR_RAISE_ERROR(
854 edge_info->GetPropertyGroupPathPrefix(property_group, adj_list_type));
855 base_dir_ = prefix_ + pg_path_prefix;
856 GAR_ASSIGN_OR_RAISE_ERROR(
858 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
859 GAR_ASSIGN_OR_RAISE_ERROR(schema_,
860 PropertyGroupToSchema(property_group,
false));
865 : edge_info_(other.edge_info_),
866 property_group_(other.property_group_),
867 adj_list_type_(other.adj_list_type_),
868 prefix_(other.prefix_),
869 vertex_chunk_index_(other.vertex_chunk_index_),
870 chunk_index_(other.chunk_index_),
871 seek_offset_(other.seek_offset_),
872 schema_(other.schema_),
873 chunk_table_(nullptr),
874 filter_options_(other.filter_options_),
875 vertex_chunk_num_(other.vertex_chunk_num_),
876 chunk_num_(other.chunk_num_),
877 base_dir_(other.base_dir_),
881 if (adj_list_type_ != AdjListType::unordered_by_source &&
882 adj_list_type_ != AdjListType::ordered_by_source) {
884 edge_info_->GetEdgeType(),
" reader with ",
885 AdjListTypeToString(adj_list_type_),
" type.");
888 IdType new_vertex_chunk_index =
id / edge_info_->GetSrcChunkSize();
889 if (new_vertex_chunk_index >= vertex_chunk_num_) {
891 "The source internal id ",
id,
" is out of range [0,",
892 edge_info_->GetSrcChunkSize() * vertex_chunk_num_,
") of edge ",
893 edge_info_->GetEdgeType(),
" reader.");
895 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
896 vertex_chunk_index_ = new_vertex_chunk_index;
897 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
898 chunk_table_.reset();
901 if (adj_list_type_ == AdjListType::unordered_by_source) {
904 GAR_ASSIGN_OR_RAISE(
auto range,
905 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
906 adj_list_type_,
id));
907 return seek(range.first);
913 if (adj_list_type_ != AdjListType::unordered_by_dest &&
914 adj_list_type_ != AdjListType::ordered_by_dest) {
916 edge_info_->GetEdgeType(),
" reader with ",
917 AdjListTypeToString(adj_list_type_),
" type.");
920 IdType new_vertex_chunk_index =
id / edge_info_->GetDstChunkSize();
921 if (new_vertex_chunk_index >= vertex_chunk_num_) {
923 "The destination internal id ",
id,
" is out of range [0,",
924 edge_info_->GetDstChunkSize() * vertex_chunk_num_,
") of edge ",
925 edge_info_->GetEdgeType(),
" reader.");
927 if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
928 vertex_chunk_index_ = new_vertex_chunk_index;
929 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
930 chunk_table_.reset();
933 if (adj_list_type_ == AdjListType::unordered_by_dest) {
936 GAR_ASSIGN_OR_RAISE(
auto range,
937 util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
938 adj_list_type_,
id));
939 return seek(range.first);
944 IdType pre_chunk_index = chunk_index_;
945 seek_offset_ = offset;
946 chunk_index_ = offset / edge_info_->GetChunkSize();
947 if (chunk_index_ != pre_chunk_index) {
948 chunk_table_.reset();
950 if (chunk_num_ < 0) {
952 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
954 if (chunk_index_ >= chunk_num_) {
956 " is out of range [0,",
957 edge_info_->GetChunkSize() * chunk_num_,
958 "), edge type: ", edge_info_->GetEdgeType());
963 Result<std::shared_ptr<arrow::Table>>
965 GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
966 if (chunk_table_ ==
nullptr) {
968 GAR_ASSIGN_OR_RAISE(
auto edge_num,
969 util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
970 vertex_chunk_index_));
975 auto chunk_file_path,
976 edge_info_->GetPropertyFilePath(property_group_, adj_list_type_,
977 vertex_chunk_index_, chunk_index_));
978 std::string path = prefix_ + chunk_file_path;
980 chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
983 if (schema_ !=
nullptr && filter_options_.filter ==
nullptr) {
985 CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
988 IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
989 return chunk_table_->Slice(row_offset);
994 if (chunk_num_ < 0) {
996 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
998 while (chunk_index_ >= chunk_num_) {
999 ++vertex_chunk_index_;
1000 if (vertex_chunk_index_ >= vertex_chunk_num_) {
1002 " is out-of-bounds for vertex chunk num ",
1003 vertex_chunk_num_,
" of edge ",
1004 edge_info_->GetEdgeType(),
" of adj list type ",
1005 AdjListTypeToString(adj_list_type_),
1006 ", property group ", property_group_,
".");
1009 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
1011 seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
1012 chunk_table_.reset();
1017 IdType vertex_chunk_index, IdType chunk_index) {
1018 if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
1019 vertex_chunk_index_ = vertex_chunk_index;
1020 GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
1021 chunk_table_.reset();
1023 if (chunk_index_ != chunk_index) {
1024 chunk_index_ = chunk_index;
1025 seek_offset_ = chunk_index * edge_info_->GetChunkSize();
1026 chunk_table_.reset();
1032 filter_options_.filter = filter;
1036 filter_options_.columns = column_names;
1039 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1041 const std::shared_ptr<EdgeInfo>& edge_info,
1042 const std::shared_ptr<PropertyGroup>& property_group,
1043 AdjListType adj_list_type,
const std::string& prefix,
1045 if (!edge_info->HasAdjacentListType(adj_list_type)) {
1047 "The adjacent list type ", AdjListTypeToString(adj_list_type),
1048 " doesn't exist in edge ", edge_info->GetEdgeType(),
".");
1050 return std::make_shared<AdjListPropertyArrowChunkReader>(
1051 edge_info, property_group, adj_list_type, prefix, options);
1054 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1056 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
1057 const std::string& edge_type,
const std::string& dst_type,
1058 const std::shared_ptr<PropertyGroup>& property_group,
1060 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1063 dst_type,
" doesn't exist.");
1065 return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
1069 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1071 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
1072 const std::string& edge_type,
const std::string& dst_type,
1073 const std::string& property_name, AdjListType adj_list_type,
1075 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1078 dst_type,
" doesn't exist.");
1080 auto property_group = edge_info->GetPropertyGroup(property_name);
1081 if (!property_group) {
1083 " doesn't exist in edge ", src_type,
" ", edge_type,
1084 " ", dst_type,
".");
1086 return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
1090 Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
1091 GAR_ASSIGN_OR_RAISE(chunk_num_,
1092 util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
1093 vertex_chunk_index_));
The arrow chunk reader for adj list topology chunk.
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
AdjListArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListArrowChunkReader.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
static Result< std::shared_ptr< AdjListArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek_dst(IdType offset)
Sets chunk position indicator for reader by destination vertex id.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Array > > GetChunk()
Get the current offset chunk as arrow::Array.
AdjListOffsetArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListOffsetArrowChunkReader.
static Result< std::shared_ptr< AdjListOffsetArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListOffsetArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk. if current chunk is the last chunk, will return Status::...
The arrow chunk reader for edge property group chunks.
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek_dst(IdType id)
Sets chunk position indicator for reader by destination vertex id.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
static Result< std::shared_ptr< AdjListPropertyArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string &prefix, const util::FilterOptions &options={})
Create an AdjListPropertyArrowChunkReader instance from edge info.
AdjListPropertyArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string prefix, const util::FilterOptions &options={})
Initialize the AdjListPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
Status outcome object (success or error)
static Status IndexError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
The arrow chunk reader for vertex property group.
Result< std::shared_ptr< arrow::Table > > GetLabelChunk()
Return the current arrow label chunk table of chunk position indicator.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > MakeForLabels(const std::shared_ptr< GraphInfo > &graph_info, const std::string &type, const std::vector< std::string > &labels, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from graph info for labels.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Table > > GetChunk(GetChunkVersion version=GetChunkVersion::AUTO)
Return the current arrow chunk table of chunk position indicator.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > MakeForProperties(const std::shared_ptr< GraphInfo > &graph_info, const std::string &type, const std::vector< std::string > &property_names, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from graph info for properties.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.