23 #include "arrow/api.h"
24 #include "arrow/compute/api.h"
25 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
26 #include "arrow/acero/exec_plan.h"
28 #include "arrow/compute/exec/exec_plan.h"
30 #include "arrow/dataset/dataset.h"
31 #include "arrow/dataset/file_base.h"
32 #include "arrow/dataset/file_parquet.h"
33 #include "arrow/dataset/plan.h"
34 #include "arrow/dataset/scanner.h"
36 #include "graphar/arrow/chunk_writer.h"
37 #include "graphar/filesystem.h"
38 #include "graphar/general_params.h"
39 #include "graphar/graph_info.h"
40 #include "graphar/result.h"
41 #include "graphar/status.h"
42 #include "graphar/types.h"
43 #include "graphar/util.h"
48 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
49 namespace arrow_acero_namespace = arrow::acero;
51 namespace arrow_acero_namespace = arrow::compute;
54 #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
55 using AsyncGeneratorType =
56 arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
58 using AsyncGeneratorType =
59 arrow::AsyncGenerator<arrow::util::optional<arrow::compute::ExecBatch>>;
70 Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable(
71 const arrow::compute::ExecContext& exec_context,
72 std::shared_ptr<arrow_acero_namespace::ExecPlan> plan,
73 std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
75 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
76 arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen),
77 exec_context.memory_pool());
80 RETURN_NOT_ARROW_OK(plan->Validate());
82 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
83 plan->StartProducing();
85 RETURN_NOT_ARROW_OK(plan->StartProducing());
89 std::shared_ptr<arrow::Table> response_table;
90 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
91 response_table, arrow::Table::FromRecordBatchReader(sink_reader.get()));
94 plan->StopProducing();
96 RETURN_NOT_ARROW_OK(plan->finished().status());
97 return response_table;
103 const std::shared_ptr<VertexInfo>& vertex_info,
const std::string& prefix,
104 const ValidateLevel& validate_level)
105 : vertex_info_(vertex_info),
107 validate_level_(validate_level) {
108 if (validate_level_ == ValidateLevel::default_validate) {
109 throw std::runtime_error(
110 "default_validate is not allowed to be set as the global validate "
111 "level for VertexPropertyWriter");
113 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
117 Status VertexPropertyWriter::validate(
const IdType& count,
118 ValidateLevel validate_level)
const {
120 if (validate_level == ValidateLevel::default_validate)
121 validate_level = validate_level_;
123 if (validate_level == ValidateLevel::no_validate)
133 Status VertexPropertyWriter::validate(
134 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
135 ValidateLevel validate_level)
const {
137 if (validate_level == ValidateLevel::default_validate)
138 validate_level = validate_level_;
140 if (validate_level == ValidateLevel::no_validate)
143 if (!vertex_info_->HasPropertyGroup(property_group)) {
145 vertex_info_->GetLabel(),
" vertex info.");
147 if (chunk_index < 0) {
154 Status VertexPropertyWriter::validate(
155 const std::shared_ptr<arrow::Table>& input_table,
156 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
157 ValidateLevel validate_level)
const {
159 if (validate_level == ValidateLevel::default_validate) {
160 validate_level = validate_level_;
163 if (validate_level == ValidateLevel::no_validate) {
167 GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level));
169 if (input_table->num_rows() > vertex_info_->GetChunkSize()) {
171 input_table->num_rows(),
172 " which is larger than the vertex chunk size",
173 vertex_info_->GetChunkSize(),
".");
176 if (validate_level == ValidateLevel::strong_validate) {
178 RETURN_NOT_ARROW_OK(input_table->Validate());
180 auto schema = input_table->schema();
181 for (
auto& property : property_group->GetProperties()) {
182 int indice = schema->GetFieldIndex(property.name);
185 " of property group ", property_group,
186 " does not exist in the input table.");
188 auto field = schema->field(indice);
189 if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
191 "The data type of property: ", property.name,
" is ",
192 property.type->ToTypeName(),
", but got ",
193 DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
202 const IdType& count, ValidateLevel validate_level)
const {
203 GAR_RETURN_NOT_OK(validate(count, validate_level));
204 GAR_ASSIGN_OR_RAISE(
auto suffix, vertex_info_->GetVerticesNumFilePath());
205 std::string path = prefix_ + suffix;
206 return fs_->WriteValueToFile<IdType>(count, path);
210 const std::shared_ptr<arrow::Table>& input_table,
211 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
212 ValidateLevel validate_level)
const {
214 validate(input_table, property_group, chunk_index, validate_level));
215 auto file_type = property_group->GetFileType();
216 auto schema = input_table->schema();
217 int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
220 GeneralParams::kVertexIndexCol,
221 " does not exist in the input table.");
224 std::vector<int> indices({indice});
225 for (
auto& property : property_group->GetProperties()) {
226 int indice = schema->GetFieldIndex(property.name);
229 " of property group ", property_group,
230 " of vertex ", vertex_info_->GetLabel(),
231 " does not exist in the input table.");
233 indices.push_back(indice);
235 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto in_table,
236 input_table->SelectColumns(indices));
237 GAR_ASSIGN_OR_RAISE(
auto suffix,
238 vertex_info_->GetFilePath(property_group, chunk_index));
239 std::string path = prefix_ + suffix;
240 return fs_->WriteTableToFile(in_table, file_type, path);
244 const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
245 ValidateLevel validate_level)
const {
246 auto property_groups = vertex_info_->GetPropertyGroups();
247 for (
auto& property_group : property_groups) {
249 WriteChunk(input_table, property_group, chunk_index, validate_level));
255 const std::shared_ptr<arrow::Table>& input_table,
256 const std::shared_ptr<PropertyGroup>& property_group,
257 IdType start_chunk_index, ValidateLevel validate_level)
const {
258 auto schema = input_table->schema();
259 int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
260 auto table_with_index = input_table;
263 GAR_ASSIGN_OR_RAISE(table_with_index,
264 addIndexColumn(input_table, start_chunk_index,
265 vertex_info_->GetChunkSize()));
267 IdType chunk_size = vertex_info_->GetChunkSize();
268 int64_t length = table_with_index->num_rows();
269 IdType chunk_index = start_chunk_index;
270 for (int64_t offset = 0; offset < length;
271 offset += chunk_size, chunk_index++) {
272 auto in_chunk = table_with_index->Slice(offset, chunk_size);
274 WriteChunk(in_chunk, property_group, chunk_index, validate_level));
280 const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
281 ValidateLevel validate_level)
const {
282 auto property_groups = vertex_info_->GetPropertyGroups();
283 GAR_ASSIGN_OR_RAISE(
auto table_with_index,
284 addIndexColumn(input_table, start_chunk_index,
285 vertex_info_->GetChunkSize()));
286 for (
auto& property_group : property_groups) {
287 GAR_RETURN_NOT_OK(
WriteTable(table_with_index, property_group,
288 start_chunk_index, validate_level));
294 const std::shared_ptr<VertexInfo>& vertex_info,
const std::string& prefix,
295 const ValidateLevel& validate_level) {
296 return std::make_shared<VertexPropertyWriter>(vertex_info, prefix,
301 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& label,
302 const ValidateLevel& validate_level) {
303 auto vertex_info = graph_info->GetVertexInfo(label);
307 return Make(vertex_info, graph_info->GetPrefix(), validate_level);
310 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::addIndexColumn(
311 const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
312 IdType chunk_size)
const {
313 arrow::Int64Builder array_builder;
314 RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
315 int64_t length = table->num_rows();
316 for (IdType i = 0; i < length; i++) {
317 RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
319 std::shared_ptr<arrow::Array> array;
320 RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
321 std::shared_ptr<arrow::ChunkedArray> chunked_array =
322 std::make_shared<arrow::ChunkedArray>(array);
323 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
324 auto ret, table->AddColumn(0,
325 arrow::field(GeneralParams::kVertexIndexCol,
326 arrow::int64(),
false),
334 const std::string& prefix,
335 AdjListType adj_list_type,
336 const ValidateLevel& validate_level)
337 : edge_info_(edge_info),
338 adj_list_type_(adj_list_type),
339 validate_level_(validate_level) {
340 if (validate_level_ == ValidateLevel::default_validate) {
341 throw std::runtime_error(
342 "default_validate is not allowed to be set as the global validate "
343 "level for EdgeChunkWriter");
345 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
346 chunk_size_ = edge_info_->GetChunkSize();
347 switch (adj_list_type) {
348 case AdjListType::unordered_by_source:
349 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
351 case AdjListType::ordered_by_source:
352 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
354 case AdjListType::unordered_by_dest:
355 vertex_chunk_size_ = edge_info_->GetDstChunkSize();
357 case AdjListType::ordered_by_dest:
358 vertex_chunk_size_ = edge_info_->GetDstChunkSize();
361 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
365 Status EdgeChunkWriter::validate(IdType count_or_index1, IdType count_or_index2,
366 ValidateLevel validate_level)
const {
368 if (validate_level == ValidateLevel::default_validate)
369 validate_level = validate_level_;
371 if (validate_level == ValidateLevel::no_validate)
374 if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
376 "Adj list type ", AdjListTypeToString(adj_list_type_),
377 " does not exist in the ", edge_info_->GetEdgeLabel(),
" edge info.");
380 if (count_or_index1 < 0 || count_or_index2 < 0) {
382 "The count or index must be non-negative, but got ", count_or_index1,
383 " and ", count_or_index2,
".");
389 Status EdgeChunkWriter::validate(
390 const std::shared_ptr<PropertyGroup>& property_group,
391 IdType vertex_chunk_index, IdType chunk_index,
392 ValidateLevel validate_level)
const {
394 if (validate_level == ValidateLevel::default_validate)
395 validate_level = validate_level_;
397 if (validate_level == ValidateLevel::no_validate)
400 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
402 if (!edge_info_->HasPropertyGroup(property_group)) {
404 edge_info_->GetEdgeLabel(),
" edge info.");
410 Status EdgeChunkWriter::validate(
411 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
412 ValidateLevel validate_level)
const {
414 if (validate_level == ValidateLevel::default_validate)
415 validate_level = validate_level_;
417 if (validate_level == ValidateLevel::no_validate)
420 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level));
422 if (adj_list_type_ != AdjListType::ordered_by_source &&
423 adj_list_type_ != AdjListType::ordered_by_dest) {
425 "The adj list type has to be ordered_by_source or ordered_by_dest, but "
427 std::string(AdjListTypeToString(adj_list_type_)));
429 if (adj_list_type_ == AdjListType::ordered_by_source &&
430 input_table->num_rows() > edge_info_->GetSrcChunkSize() + 1) {
432 "The number of rows of input offset table is ", input_table->num_rows(),
433 " which is larger than the offset size of source vertex chunk ",
434 edge_info_->GetSrcChunkSize() + 1,
".");
436 if (adj_list_type_ == AdjListType::ordered_by_dest &&
437 input_table->num_rows() > edge_info_->GetDstChunkSize() + 1) {
439 "The number of rows of input offset table is ", input_table->num_rows(),
440 " which is larger than the offset size of destination vertex chunk ",
441 edge_info_->GetSrcChunkSize() + 1,
".");
444 if (validate_level == ValidateLevel::strong_validate) {
446 RETURN_NOT_ARROW_OK(input_table->Validate());
448 auto schema = input_table->schema();
449 int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
451 return Status::Invalid(
"The offset column ", GeneralParams::kOffsetCol,
452 " does not exist in the input table");
454 auto field = schema->field(index);
455 if (field->type()->id() != arrow::Type::INT64) {
457 "The data type for offset column should be INT64, but got ",
458 field->type()->name());
465 Status EdgeChunkWriter::validate(
466 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
467 IdType chunk_index, ValidateLevel validate_level)
const {
469 if (validate_level == ValidateLevel::default_validate)
470 validate_level = validate_level_;
472 if (validate_level == ValidateLevel::no_validate)
475 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
477 if (input_table->num_rows() > edge_info_->GetChunkSize()) {
479 "The number of rows of input table is ", input_table->num_rows(),
480 " which is larger than the ", edge_info_->GetEdgeLabel(),
481 " edge chunk size ", edge_info_->GetChunkSize(),
".");
484 if (validate_level == ValidateLevel::strong_validate) {
485 auto schema = input_table->schema();
486 int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
489 GeneralParams::kSrcIndexCol,
490 " does not exist in the input table");
492 auto field = schema->field(index);
493 if (field->type()->id() != arrow::Type::INT64) {
495 "The data type for source index column should be INT64, but got ",
496 field->type()->name());
498 index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
501 GeneralParams::kDstIndexCol,
502 " does not exist in the input table");
504 field = schema->field(index);
505 if (field->type()->id() != arrow::Type::INT64) {
507 "The data type for destination index column should be INT64, but "
509 field->type()->name());
516 Status EdgeChunkWriter::validate(
517 const std::shared_ptr<arrow::Table>& input_table,
518 const std::shared_ptr<PropertyGroup>& property_group,
519 IdType vertex_chunk_index, IdType chunk_index,
520 ValidateLevel validate_level)
const {
522 if (validate_level == ValidateLevel::default_validate)
523 validate_level = validate_level_;
525 if (validate_level == ValidateLevel::no_validate)
528 GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index,
531 if (input_table->num_rows() > edge_info_->GetChunkSize()) {
533 "The number of rows of input table is ", input_table->num_rows(),
534 " which is larger than the ", edge_info_->GetEdgeLabel(),
535 " edge chunk size ", edge_info_->GetChunkSize(),
".");
538 if (validate_level == ValidateLevel::strong_validate) {
540 RETURN_NOT_ARROW_OK(input_table->Validate());
542 auto schema = input_table->schema();
543 for (
auto& property : property_group->GetProperties()) {
544 int indice = schema->GetFieldIndex(property.name);
547 " of property group ", property_group,
548 " does not exist in the input table.");
550 auto field = schema->field(indice);
551 if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
553 "The data type of property: ", property.name,
" is ",
554 property.type->ToTypeName(),
", but got ",
555 DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
565 ValidateLevel validate_level)
const {
566 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, count, validate_level));
567 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetEdgesNumFilePath(
568 vertex_chunk_index, adj_list_type_));
569 std::string path = prefix_ + suffix;
570 return fs_->WriteValueToFile<IdType>(count, path);
574 ValidateLevel validate_level)
const {
575 GAR_RETURN_NOT_OK(validate(0, count, validate_level));
576 GAR_ASSIGN_OR_RAISE(
auto suffix,
577 edge_info_->GetVerticesNumFilePath(adj_list_type_));
578 std::string path = prefix_ + suffix;
579 return fs_->WriteValueToFile<IdType>(count, path);
583 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
584 ValidateLevel validate_level)
const {
585 GAR_RETURN_NOT_OK(validate(input_table, vertex_chunk_index, validate_level));
586 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
587 auto schema = input_table->schema();
588 int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
590 return Status::Invalid(
"The offset column ", GeneralParams::kOffsetCol,
591 " does not exist in the input table");
593 auto in_table = input_table->SelectColumns({index}).ValueOrDie();
594 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetAdjListOffsetFilePath(
595 vertex_chunk_index, adj_list_type_));
596 std::string path = prefix_ + suffix;
597 return fs_->WriteTableToFile(in_table, file_type, path);
601 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
602 IdType chunk_index, ValidateLevel validate_level)
const {
604 validate(input_table, vertex_chunk_index, chunk_index, validate_level));
605 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
606 std::vector<int> indices;
608 auto schema = input_table->schema();
609 int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
612 GeneralParams::kSrcIndexCol,
613 " does not exist in the input table");
615 indices.push_back(index);
616 index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
619 GeneralParams::kDstIndexCol,
620 " does not exist in the input table");
622 indices.push_back(index);
623 auto in_table = input_table->SelectColumns(indices).ValueOrDie();
626 auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
627 chunk_index, adj_list_type_));
628 std::string path = prefix_ + suffix;
629 return fs_->WriteTableToFile(in_table, file_type, path);
633 const std::shared_ptr<arrow::Table>& input_table,
634 const std::shared_ptr<PropertyGroup>& property_group,
635 IdType vertex_chunk_index, IdType chunk_index,
636 ValidateLevel validate_level)
const {
637 GAR_RETURN_NOT_OK(validate(input_table, property_group, vertex_chunk_index,
638 chunk_index, validate_level));
639 auto file_type = property_group->GetFileType();
641 std::vector<int> indices;
643 auto schema = input_table->schema();
644 for (
auto& property : property_group->GetProperties()) {
645 int indice = schema->GetFieldIndex(property.name);
648 " of property group ", property_group,
" of edge ",
649 edge_info_->GetEdgeLabel(),
650 " does not exist in the input table.");
652 indices.push_back(indice);
654 auto in_table = input_table->SelectColumns(indices).ValueOrDie();
655 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetPropertyFilePath(
656 property_group, adj_list_type_,
657 vertex_chunk_index, chunk_index));
658 std::string path = prefix_ + suffix;
659 return fs_->WriteTableToFile(in_table, file_type, path);
663 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
664 IdType chunk_index, ValidateLevel validate_level)
const {
665 const auto& property_groups = edge_info_->GetPropertyGroups();
666 for (
auto& property_group : property_groups) {
668 vertex_chunk_index, chunk_index,
675 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
676 IdType chunk_index, ValidateLevel validate_level)
const {
678 chunk_index, validate_level));
684 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
685 IdType start_chunk_index, ValidateLevel validate_level)
const {
686 int64_t length = input_table->num_rows();
687 IdType chunk_index = start_chunk_index;
688 for (int64_t offset = 0; offset < length;
689 offset += chunk_size_, chunk_index++) {
690 auto in_chunk = input_table->Slice(offset, chunk_size_);
692 chunk_index, validate_level));
698 const std::shared_ptr<arrow::Table>& input_table,
699 const std::shared_ptr<PropertyGroup>& property_group,
700 IdType vertex_chunk_index, IdType start_chunk_index,
701 ValidateLevel validate_level)
const {
702 int64_t length = input_table->num_rows();
703 IdType chunk_index = start_chunk_index;
704 for (int64_t offset = 0; offset < length;
705 offset += chunk_size_, chunk_index++) {
706 auto in_chunk = input_table->Slice(offset, chunk_size_);
708 vertex_chunk_index, chunk_index,
715 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
716 IdType start_chunk_index, ValidateLevel validate_level)
const {
717 int64_t length = input_table->num_rows();
718 IdType chunk_index = start_chunk_index;
719 for (int64_t offset = 0; offset < length;
720 offset += chunk_size_, chunk_index++) {
721 auto in_chunk = input_table->Slice(offset, chunk_size_);
723 chunk_index, validate_level));
729 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
730 IdType start_chunk_index, ValidateLevel validate_level)
const {
731 int64_t length = input_table->num_rows();
732 IdType chunk_index = start_chunk_index;
733 for (int64_t offset = 0; offset < length;
734 offset += chunk_size_, chunk_index++) {
735 auto in_chunk = input_table->Slice(offset, chunk_size_);
737 WriteChunk(in_chunk, vertex_chunk_index, chunk_index, validate_level));
743 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
744 IdType start_chunk_index, ValidateLevel validate_level)
const {
747 sortTable(input_table, getSortColumnName(adj_list_type_)));
748 if (adj_list_type_ == AdjListType::ordered_by_source ||
749 adj_list_type_ == AdjListType::ordered_by_dest) {
752 getOffsetTable(response_table, getSortColumnName(adj_list_type_),
753 vertex_chunk_index));
758 start_chunk_index, validate_level);
762 const std::shared_ptr<arrow::Table>& input_table,
763 const std::shared_ptr<PropertyGroup>& property_group,
764 IdType vertex_chunk_index, IdType start_chunk_index,
765 ValidateLevel validate_level)
const {
768 sortTable(input_table, getSortColumnName(adj_list_type_)));
770 start_chunk_index, validate_level);
774 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
775 IdType start_chunk_index, ValidateLevel validate_level)
const {
778 sortTable(input_table, getSortColumnName(adj_list_type_)));
780 start_chunk_index, validate_level);
784 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
785 IdType start_chunk_index, ValidateLevel validate_level)
const {
788 sortTable(input_table, getSortColumnName(adj_list_type_)));
790 if (adj_list_type_ == AdjListType::ordered_by_source ||
791 adj_list_type_ == AdjListType::ordered_by_dest) {
794 getOffsetTable(response_table, getSortColumnName(adj_list_type_),
795 vertex_chunk_index));
800 return WriteTable(response_table, vertex_chunk_index, start_chunk_index,
804 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::getOffsetTable(
805 const std::shared_ptr<arrow::Table>& input_table,
806 const std::string& column_name, IdType vertex_chunk_index)
const {
807 std::shared_ptr<arrow::ChunkedArray> column =
808 input_table->GetColumnByName(column_name);
809 int64_t array_index = 0, index = 0;
811 std::static_pointer_cast<arrow::Int64Array>(column->chunk(array_index));
813 arrow::Int64Builder builder;
814 IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
815 end_index = begin_index + vertex_chunk_size_;
816 RETURN_NOT_ARROW_OK(builder.Append(0));
817 std::vector<std::shared_ptr<arrow::Array>> arrays;
818 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
819 std::string
property = GeneralParams::kOffsetCol;
820 schema_vector.push_back(
821 arrow::field(property, DataType::DataTypeToArrowDataType(int64())));
823 int64_t global_index = 0;
824 for (IdType i = begin_index; i < end_index; i++) {
826 if (array_index >= column->num_chunks())
828 if (index >= ids->length()) {
830 if (array_index == column->num_chunks())
832 ids = std::static_pointer_cast<arrow::Int64Array>(
833 column->chunk(array_index));
836 if (ids->IsNull(index) || !ids->IsValid(index)) {
841 int64_t x = ids->Value(index);
849 RETURN_NOT_ARROW_OK(builder.Append(global_index));
852 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto array, builder.Finish());
853 arrays.push_back(array);
854 auto schema = std::make_shared<arrow::Schema>(schema_vector);
855 return arrow::Table::Make(schema, arrays);
858 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
859 const std::shared_ptr<arrow::Table>& input_table,
860 const std::string& column_name) {
861 auto exec_context = arrow::compute::default_exec_context();
862 auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie();
863 auto table_source_options =
864 arrow_acero_namespace::TableSourceNodeOptions{input_table};
865 auto source = arrow_acero_namespace::MakeExecNode(
"table_source", plan.get(),
866 {}, table_source_options)
868 AsyncGeneratorType sink_gen;
870 arrow_acero_namespace::MakeExecNode(
871 "order_by_sink", plan.get(), {source},
872 arrow_acero_namespace::OrderBySinkNodeOptions{
873 arrow::compute::SortOptions{{arrow::compute::SortKey{
874 column_name, arrow::compute::SortOrder::Ascending}}},
877 return ExecutePlanAndCollectAsTable(*exec_context, plan,
878 input_table->schema(), sink_gen);
881 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
882 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
883 AdjListType adj_list_type,
const ValidateLevel& validate_level) {
884 if (!edge_info->HasAdjacentListType(adj_list_type)) {
885 return Status::KeyError(
886 "The adjacent list type ", AdjListTypeToString(adj_list_type),
887 " doesn't exist in edge ", edge_info->GetEdgeLabel(),
".");
889 return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
893 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
894 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
895 const std::string& edge_label,
const std::string& dst_label,
896 AdjListType adj_list_type,
const ValidateLevel& validate_level) {
897 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
899 return Status::KeyError(
"The edge ", src_label,
" ", edge_label,
" ",
900 dst_label,
" doesn't exist.");
902 return Make(edge_info, graph_info->GetPrefix(), adj_list_type,
906 std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
907 switch (adj_list_type) {
908 case AdjListType::unordered_by_source:
909 return GeneralParams::kSrcIndexCol;
910 case AdjListType::ordered_by_source:
911 return GeneralParams::kSrcIndexCol;
912 case AdjListType::unordered_by_dest:
913 return GeneralParams::kDstIndexCol;
914 case AdjListType::ordered_by_dest:
915 return GeneralParams::kDstIndexCol;
917 return GeneralParams::kSrcIndexCol;
919 return GeneralParams::kSrcIndexCol;
Status WritePropertyChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single edge property group for an edge chunk.
Status SortAndWriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write the adj list chunks for the edges of a vertex chunk.
Status WriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list chunks for the edges of a vertex chunk.
Status WritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of a single property group for the edges of a vertex chunk.
EdgeChunkWriter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the EdgeChunkWriter.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list and all property groups for an edge chunk.
Status SortAndWriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of the adj list and all property groups for the edges of a vertex ch...
Status SortAndWritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of a single property group for the edges of a vertex chunk.
Status WriteAdjListChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the adj list chunk for an edge chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
static Status IndexError(Args &&... args)
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
VertexPropertyWriter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the VertexPropertyWriter.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single property group for a single vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType start_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write a single property group for multiple vertex chunks to corresponding files.
static Result< std::shared_ptr< VertexPropertyWriter > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Construct a VertexPropertyWriter from vertex info.