20 #include <arrow/acero/api.h>
23 #include <unordered_map>
25 #include "arrow/api.h"
26 #include "arrow/compute/api.h"
27 #include "graphar/fwd.h"
28 #include "graphar/writer_util.h"
29 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
30 #include "arrow/acero/exec_plan.h"
32 #include "arrow/compute/exec/exec_plan.h"
34 #include "arrow/dataset/dataset.h"
35 #include "arrow/dataset/file_base.h"
36 #include "arrow/dataset/file_parquet.h"
37 #include "arrow/dataset/plan.h"
38 #include "arrow/dataset/scanner.h"
40 #include "graphar/arrow/chunk_writer.h"
41 #include "graphar/filesystem.h"
42 #include "graphar/general_params.h"
43 #include "graphar/graph_info.h"
44 #include "graphar/result.h"
45 #include "graphar/status.h"
46 #include "graphar/types.h"
47 #include "graphar/util.h"
52 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
53 namespace arrow_acero_namespace = arrow::acero;
55 namespace arrow_acero_namespace = arrow::compute;
58 #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
59 using AsyncGeneratorType =
60 arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
62 using AsyncGeneratorType =
63 arrow::AsyncGenerator<arrow::util::optional<arrow::compute::ExecBatch>>;
74 Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable(
75 const arrow::compute::ExecContext& exec_context,
76 std::shared_ptr<arrow_acero_namespace::ExecPlan> plan,
77 std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
79 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
80 arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen),
81 exec_context.memory_pool());
84 RETURN_NOT_ARROW_OK(plan->Validate());
86 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
87 plan->StartProducing();
89 RETURN_NOT_ARROW_OK(plan->StartProducing());
93 std::shared_ptr<arrow::Table> response_table;
94 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
95 response_table, arrow::Table::FromRecordBatchReader(sink_reader.get()));
98 plan->StopProducing();
100 RETURN_NOT_ARROW_OK(plan->finished().status());
101 return response_table;
107 const std::shared_ptr<VertexInfo>& vertex_info,
const std::string& prefix,
108 const std::shared_ptr<WriterOptions>& options,
109 const ValidateLevel& validate_level)
110 : vertex_info_(vertex_info),
112 validate_level_(validate_level),
115 options_ = WriterOptions::DefaultWriterOption();
117 if (validate_level_ == ValidateLevel::default_validate) {
118 throw std::runtime_error(
119 "default_validate is not allowed to be set as the global validate "
120 "level for VertexPropertyWriter");
122 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
126 Status VertexPropertyWriter::validate(
const IdType& count,
127 ValidateLevel validate_level)
const {
129 if (validate_level == ValidateLevel::default_validate)
130 validate_level = validate_level_;
132 if (validate_level == ValidateLevel::no_validate)
142 Status VertexPropertyWriter::validate(
143 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
144 ValidateLevel validate_level)
const {
146 if (validate_level == ValidateLevel::default_validate)
147 validate_level = validate_level_;
149 if (validate_level == ValidateLevel::no_validate)
152 if (!vertex_info_->HasPropertyGroup(property_group)) {
154 vertex_info_->GetType(),
" vertex info.");
156 if (chunk_index < 0) {
163 Status VertexPropertyWriter::validate(
164 const std::shared_ptr<arrow::Table>& input_table,
165 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
166 ValidateLevel validate_level)
const {
168 if (validate_level == ValidateLevel::default_validate) {
169 validate_level = validate_level_;
172 if (validate_level == ValidateLevel::no_validate) {
176 GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level));
178 if (input_table->num_rows() > vertex_info_->GetChunkSize()) {
180 input_table->num_rows(),
181 " which is larger than the vertex chunk size",
182 vertex_info_->GetChunkSize(),
".");
185 if (validate_level == ValidateLevel::strong_validate) {
187 RETURN_NOT_ARROW_OK(input_table->Validate());
189 auto schema = input_table->schema();
190 for (
auto& property : property_group->GetProperties()) {
191 int indice = schema->GetFieldIndex(property.name);
194 " of property group ", property_group,
195 " does not exist in the input table.");
197 auto field = schema->field(indice);
198 auto schema_data_type = DataType::DataTypeToArrowDataType(property.type);
199 if (property.cardinality != Cardinality::SINGLE) {
200 schema_data_type = arrow::list(schema_data_type);
202 if (!DataType::ArrowDataTypeToDataType(field->type())
203 ->Equals(DataType::ArrowDataTypeToDataType(schema_data_type))) {
205 "The data type of property: ", property.name,
" is ",
206 DataType::ArrowDataTypeToDataType(schema_data_type)->ToTypeName(),
208 DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
217 const IdType& count, ValidateLevel validate_level)
const {
218 GAR_RETURN_NOT_OK(validate(count, validate_level));
219 GAR_ASSIGN_OR_RAISE(
auto suffix, vertex_info_->GetVerticesNumFilePath());
220 std::string path = prefix_ + suffix;
221 return fs_->WriteValueToFile<IdType>(count, path);
225 const std::shared_ptr<arrow::Table>& input_table,
226 const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
227 ValidateLevel validate_level)
const {
229 validate(input_table, property_group, chunk_index, validate_level));
230 auto file_type = property_group->GetFileType();
231 auto schema = input_table->schema();
232 int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
235 GeneralParams::kVertexIndexCol,
236 " does not exist in the input table.");
239 std::vector<int> indices({indice});
240 for (
auto& property : property_group->GetProperties()) {
241 int indice = schema->GetFieldIndex(property.name);
244 " of property group ", property_group,
245 " of vertex ", vertex_info_->GetType(),
246 " does not exist in the input table.");
248 indices.push_back(indice);
250 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto in_table,
251 input_table->SelectColumns(indices));
252 GAR_ASSIGN_OR_RAISE(
auto suffix,
253 vertex_info_->GetFilePath(property_group, chunk_index));
254 std::string path = prefix_ + suffix;
255 return fs_->WriteTableToFile(in_table, file_type, path, options_);
259 const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
260 ValidateLevel validate_level)
const {
261 auto property_groups = vertex_info_->GetPropertyGroups();
262 for (
auto& property_group : property_groups) {
264 WriteChunk(input_table, property_group, chunk_index, validate_level));
270 const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
271 FileType file_type, ValidateLevel validate_level)
const {
272 auto schema = input_table->schema();
273 std::vector<int> indices;
274 for (
int i = 0; i < schema->num_fields(); i++) {
275 indices.push_back(i);
278 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto in_table,
279 input_table->SelectColumns(indices));
281 vertex_info_->GetPrefix() +
"labels/chunk" + std::to_string(chunk_index);
282 std::string path = prefix_ + suffix;
283 return fs_->WriteLabelTableToFile(input_table, path);
287 const std::shared_ptr<arrow::Table>& input_table,
288 const std::shared_ptr<PropertyGroup>& property_group,
289 IdType start_chunk_index, ValidateLevel validate_level)
const {
290 auto schema = input_table->schema();
291 int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
292 auto table_with_index = input_table;
295 GAR_ASSIGN_OR_RAISE(table_with_index,
296 AddIndexColumn(input_table, start_chunk_index,
297 vertex_info_->GetChunkSize()));
299 IdType chunk_size = vertex_info_->GetChunkSize();
300 int64_t length = table_with_index->num_rows();
301 IdType chunk_index = start_chunk_index;
302 for (int64_t offset = 0; offset < length;
303 offset += chunk_size, chunk_index++) {
304 auto in_chunk = table_with_index->Slice(offset, chunk_size);
306 WriteChunk(in_chunk, property_group, chunk_index, validate_level));
312 const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
313 ValidateLevel validate_level)
const {
314 auto property_groups = vertex_info_->GetPropertyGroups();
315 GAR_ASSIGN_OR_RAISE(
auto table_with_index,
316 AddIndexColumn(input_table, start_chunk_index,
317 vertex_info_->GetChunkSize()));
318 for (
auto& property_group : property_groups) {
319 GAR_RETURN_NOT_OK(
WriteTable(table_with_index, property_group,
320 start_chunk_index, validate_level));
322 auto labels = vertex_info_->GetLabels();
323 if (!labels.empty()) {
324 GAR_ASSIGN_OR_RAISE(
auto label_table,
GetLabelTable(input_table, labels))
326 FileType::PARQUET, validate_level));
333 const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
334 FileType file_type, ValidateLevel validate_level)
const {
335 auto schema = input_table->schema();
336 int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
337 IdType chunk_size = vertex_info_->GetChunkSize();
338 int64_t length = input_table->num_rows();
339 IdType chunk_index = start_chunk_index;
340 for (int64_t offset = 0; offset < length;
341 offset += chunk_size, chunk_index++) {
342 auto in_chunk = input_table->Slice(offset, chunk_size);
350 const std::shared_ptr<arrow::Table>& input_table,
351 const std::vector<std::string>& labels)
const {
354 input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol);
355 if (label_col_idx == -1) {
361 std::vector<std::vector<bool>> bool_matrix(
362 input_table->num_rows(), std::vector<bool>(labels.size(),
false));
365 std::unordered_map<std::string, int> label_to_index;
366 for (
size_t i = 0; i < labels.size(); ++i) {
367 label_to_index[labels[i]] = i;
372 for (int64_t chunk_idx = 0;
373 chunk_idx < input_table->column(label_col_idx)->num_chunks();
375 auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx);
376 auto label_column = std::static_pointer_cast<arrow::StringArray>(chunk);
381 for (int64_t row = 0; row < label_column->length(); ++row) {
382 if (label_column->IsValid(row)) {
383 std::string labels_string = label_column->GetString(row);
384 auto row_labels = SplitString(labels_string,
';');
385 for (
const auto& lbl : row_labels) {
386 if (label_to_index.find(lbl) != label_to_index.end()) {
387 bool_matrix[row_offset + row][label_to_index[lbl]] =
true;
394 label_column->length();
398 arrow::FieldVector fields;
399 arrow::ArrayVector arrays;
401 for (
const auto& label : labels) {
402 arrow::BooleanBuilder builder;
403 for (
const auto& row : bool_matrix) {
404 RETURN_NOT_ARROW_OK(builder.Append(row[label_to_index[label]]));
407 std::shared_ptr<arrow::Array> array;
408 RETURN_NOT_ARROW_OK(builder.Finish(&array));
409 fields.push_back(arrow::field(label, arrow::boolean()));
410 arrays.push_back(array);
414 auto schema = std::make_shared<arrow::Schema>(fields);
415 auto result_table = arrow::Table::Make(schema, arrays);
421 const std::shared_ptr<VertexInfo>& vertex_info,
const std::string& prefix,
422 const std::shared_ptr<WriterOptions>& options,
423 const ValidateLevel& validate_level) {
424 return std::make_shared<VertexPropertyWriter>(vertex_info, prefix, options,
429 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
430 const std::shared_ptr<WriterOptions>& options,
431 const ValidateLevel& validate_level) {
432 auto vertex_info = graph_info->GetVertexInfo(type);
436 return Make(vertex_info, graph_info->GetPrefix(), options, validate_level);
440 const std::shared_ptr<VertexInfo>& vertex_info,
const std::string& prefix,
441 const ValidateLevel& validate_level) {
442 return Make(vertex_info, prefix, WriterOptions::DefaultWriterOption(),
447 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& type,
448 const ValidateLevel& validate_level) {
449 return Make(graph_info, type, WriterOptions::DefaultWriterOption(),
453 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::AddIndexColumn(
454 const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
455 IdType chunk_size)
const {
456 arrow::Int64Builder array_builder;
457 RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
458 int64_t length = table->num_rows();
459 for (IdType i = 0; i < length; i++) {
460 RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
462 std::shared_ptr<arrow::Array> array;
463 RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
464 std::shared_ptr<arrow::ChunkedArray> chunked_array =
465 std::make_shared<arrow::ChunkedArray>(array);
466 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
467 auto ret, table->AddColumn(0,
468 arrow::field(GeneralParams::kVertexIndexCol,
469 arrow::int64(),
false),
477 const std::string& prefix,
478 AdjListType adj_list_type,
479 const std::shared_ptr<WriterOptions>& options,
480 const ValidateLevel& validate_level)
481 : edge_info_(edge_info),
482 adj_list_type_(adj_list_type),
483 validate_level_(validate_level),
486 options_ = WriterOptions::DefaultWriterOption();
488 if (validate_level_ == ValidateLevel::default_validate) {
489 throw std::runtime_error(
490 "default_validate is not allowed to be set as the global validate "
491 "level for EdgeChunkWriter");
493 GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
494 chunk_size_ = edge_info_->GetChunkSize();
495 switch (adj_list_type) {
496 case AdjListType::unordered_by_source:
497 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
499 case AdjListType::ordered_by_source:
500 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
502 case AdjListType::unordered_by_dest:
503 vertex_chunk_size_ = edge_info_->GetDstChunkSize();
505 case AdjListType::ordered_by_dest:
506 vertex_chunk_size_ = edge_info_->GetDstChunkSize();
509 vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
513 Status EdgeChunkWriter::validate(IdType count_or_index1, IdType count_or_index2,
514 ValidateLevel validate_level)
const {
516 if (validate_level == ValidateLevel::default_validate)
517 validate_level = validate_level_;
519 if (validate_level == ValidateLevel::no_validate)
522 if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
524 "Adj list type ", AdjListTypeToString(adj_list_type_),
525 " does not exist in the ", edge_info_->GetEdgeType(),
" edge info.");
528 if (count_or_index1 < 0 || count_or_index2 < 0) {
530 "The count or index must be non-negative, but got ", count_or_index1,
531 " and ", count_or_index2,
".");
537 Status EdgeChunkWriter::validate(
538 const std::shared_ptr<PropertyGroup>& property_group,
539 IdType vertex_chunk_index, IdType chunk_index,
540 ValidateLevel validate_level)
const {
542 if (validate_level == ValidateLevel::default_validate)
543 validate_level = validate_level_;
545 if (validate_level == ValidateLevel::no_validate)
548 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
550 if (!edge_info_->HasPropertyGroup(property_group)) {
552 edge_info_->GetEdgeType(),
" edge info.");
558 Status EdgeChunkWriter::validate(
559 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
560 ValidateLevel validate_level)
const {
562 if (validate_level == ValidateLevel::default_validate)
563 validate_level = validate_level_;
565 if (validate_level == ValidateLevel::no_validate)
568 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level));
570 if (adj_list_type_ != AdjListType::ordered_by_source &&
571 adj_list_type_ != AdjListType::ordered_by_dest) {
573 "The adj list type has to be ordered_by_source or ordered_by_dest, but "
575 std::string(AdjListTypeToString(adj_list_type_)));
577 if (adj_list_type_ == AdjListType::ordered_by_source &&
578 input_table->num_rows() > edge_info_->GetSrcChunkSize() + 1) {
580 "The number of rows of input offset table is ", input_table->num_rows(),
581 " which is larger than the offset size of source vertex chunk ",
582 edge_info_->GetSrcChunkSize() + 1,
".");
584 if (adj_list_type_ == AdjListType::ordered_by_dest &&
585 input_table->num_rows() > edge_info_->GetDstChunkSize() + 1) {
587 "The number of rows of input offset table is ", input_table->num_rows(),
588 " which is larger than the offset size of destination vertex chunk ",
589 edge_info_->GetSrcChunkSize() + 1,
".");
592 if (validate_level == ValidateLevel::strong_validate) {
594 RETURN_NOT_ARROW_OK(input_table->Validate());
596 auto schema = input_table->schema();
597 int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
599 return Status::Invalid(
"The offset column ", GeneralParams::kOffsetCol,
600 " does not exist in the input table");
602 auto field = schema->field(index);
603 if (field->type()->id() != arrow::Type::INT64) {
605 "The data type for offset column should be INT64, but got ",
606 field->type()->name());
613 Status EdgeChunkWriter::validate(
614 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
615 IdType chunk_index, ValidateLevel validate_level)
const {
617 if (validate_level == ValidateLevel::default_validate)
618 validate_level = validate_level_;
620 if (validate_level == ValidateLevel::no_validate)
623 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
625 if (input_table->num_rows() > edge_info_->GetChunkSize()) {
627 "The number of rows of input table is ", input_table->num_rows(),
628 " which is larger than the ", edge_info_->GetEdgeType(),
629 " edge chunk size ", edge_info_->GetChunkSize(),
".");
632 if (validate_level == ValidateLevel::strong_validate) {
633 auto schema = input_table->schema();
634 int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
637 GeneralParams::kSrcIndexCol,
638 " does not exist in the input table");
640 auto field = schema->field(index);
641 if (field->type()->id() != arrow::Type::INT64) {
643 "The data type for source index column should be INT64, but got ",
644 field->type()->name());
646 index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
649 GeneralParams::kDstIndexCol,
650 " does not exist in the input table");
652 field = schema->field(index);
653 if (field->type()->id() != arrow::Type::INT64) {
655 "The data type for destination index column should be INT64, but "
657 field->type()->name());
664 Status EdgeChunkWriter::validate(
665 const std::shared_ptr<arrow::Table>& input_table,
666 const std::shared_ptr<PropertyGroup>& property_group,
667 IdType vertex_chunk_index, IdType chunk_index,
668 ValidateLevel validate_level)
const {
670 if (validate_level == ValidateLevel::default_validate)
671 validate_level = validate_level_;
673 if (validate_level == ValidateLevel::no_validate)
676 GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index,
679 if (input_table->num_rows() > edge_info_->GetChunkSize()) {
681 "The number of rows of input table is ", input_table->num_rows(),
682 " which is larger than the ", edge_info_->GetEdgeType(),
683 " edge chunk size ", edge_info_->GetChunkSize(),
".");
686 if (validate_level == ValidateLevel::strong_validate) {
688 RETURN_NOT_ARROW_OK(input_table->Validate());
690 auto schema = input_table->schema();
691 for (
auto& property : property_group->GetProperties()) {
692 int indice = schema->GetFieldIndex(property.name);
695 " of property group ", property_group,
696 " does not exist in the input table.");
698 auto field = schema->field(indice);
699 if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
701 "The data type of property: ", property.name,
" is ",
702 property.type->ToTypeName(),
", but got ",
703 DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
713 ValidateLevel validate_level)
const {
714 GAR_RETURN_NOT_OK(validate(vertex_chunk_index, count, validate_level));
715 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetEdgesNumFilePath(
716 vertex_chunk_index, adj_list_type_));
717 std::string path = prefix_ + suffix;
718 return fs_->WriteValueToFile<IdType>(count, path);
722 ValidateLevel validate_level)
const {
723 GAR_RETURN_NOT_OK(validate(0, count, validate_level));
724 GAR_ASSIGN_OR_RAISE(
auto suffix,
725 edge_info_->GetVerticesNumFilePath(adj_list_type_));
726 std::string path = prefix_ + suffix;
727 return fs_->WriteValueToFile<IdType>(count, path);
731 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
732 ValidateLevel validate_level)
const {
733 GAR_RETURN_NOT_OK(validate(input_table, vertex_chunk_index, validate_level));
734 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
735 auto schema = input_table->schema();
736 int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
738 return Status::Invalid(
"The offset column ", GeneralParams::kOffsetCol,
739 " does not exist in the input table");
741 auto in_table = input_table->SelectColumns({index}).ValueOrDie();
742 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetAdjListOffsetFilePath(
743 vertex_chunk_index, adj_list_type_));
744 std::string path = prefix_ + suffix;
745 return fs_->WriteTableToFile(in_table, file_type, path, options_);
749 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
750 IdType chunk_index, ValidateLevel validate_level)
const {
752 validate(input_table, vertex_chunk_index, chunk_index, validate_level));
753 auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
754 std::vector<int> indices;
756 auto schema = input_table->schema();
757 int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
760 GeneralParams::kSrcIndexCol,
761 " does not exist in the input table");
763 indices.push_back(index);
764 index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
767 GeneralParams::kDstIndexCol,
768 " does not exist in the input table");
770 indices.push_back(index);
771 auto in_table = input_table->SelectColumns(indices).ValueOrDie();
774 auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
775 chunk_index, adj_list_type_));
776 std::string path = prefix_ + suffix;
777 return fs_->WriteTableToFile(in_table, file_type, path, options_);
781 const std::shared_ptr<arrow::Table>& input_table,
782 const std::shared_ptr<PropertyGroup>& property_group,
783 IdType vertex_chunk_index, IdType chunk_index,
784 ValidateLevel validate_level)
const {
785 GAR_RETURN_NOT_OK(validate(input_table, property_group, vertex_chunk_index,
786 chunk_index, validate_level));
787 auto file_type = property_group->GetFileType();
789 std::vector<int> indices;
791 auto schema = input_table->schema();
792 for (
auto& property : property_group->GetProperties()) {
793 int indice = schema->GetFieldIndex(property.name);
796 " of property group ", property_group,
" of edge ",
797 edge_info_->GetEdgeType(),
798 " does not exist in the input table.");
800 indices.push_back(indice);
802 auto in_table = input_table->SelectColumns(indices).ValueOrDie();
803 GAR_ASSIGN_OR_RAISE(
auto suffix, edge_info_->GetPropertyFilePath(
804 property_group, adj_list_type_,
805 vertex_chunk_index, chunk_index));
806 std::string path = prefix_ + suffix;
807 return fs_->WriteTableToFile(in_table, file_type, path, options_);
811 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
812 IdType chunk_index, ValidateLevel validate_level)
const {
813 const auto& property_groups = edge_info_->GetPropertyGroups();
814 for (
auto& property_group : property_groups) {
816 vertex_chunk_index, chunk_index,
823 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
824 IdType chunk_index, ValidateLevel validate_level)
const {
826 chunk_index, validate_level));
832 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
833 IdType start_chunk_index, ValidateLevel validate_level)
const {
834 int64_t length = input_table->num_rows();
835 IdType chunk_index = start_chunk_index;
836 for (int64_t offset = 0; offset < length;
837 offset += chunk_size_, chunk_index++) {
838 auto in_chunk = input_table->Slice(offset, chunk_size_);
840 chunk_index, validate_level));
846 const std::shared_ptr<arrow::Table>& input_table,
847 const std::shared_ptr<PropertyGroup>& property_group,
848 IdType vertex_chunk_index, IdType start_chunk_index,
849 ValidateLevel validate_level)
const {
850 int64_t length = input_table->num_rows();
851 IdType chunk_index = start_chunk_index;
852 for (int64_t offset = 0; offset < length;
853 offset += chunk_size_, chunk_index++) {
854 auto in_chunk = input_table->Slice(offset, chunk_size_);
856 vertex_chunk_index, chunk_index,
863 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
864 IdType start_chunk_index, ValidateLevel validate_level)
const {
865 int64_t length = input_table->num_rows();
866 IdType chunk_index = start_chunk_index;
867 for (int64_t offset = 0; offset < length;
868 offset += chunk_size_, chunk_index++) {
869 auto in_chunk = input_table->Slice(offset, chunk_size_);
871 chunk_index, validate_level));
877 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
878 IdType start_chunk_index, ValidateLevel validate_level)
const {
879 int64_t length = input_table->num_rows();
880 IdType chunk_index = start_chunk_index;
881 for (int64_t offset = 0; offset < length;
882 offset += chunk_size_, chunk_index++) {
883 auto in_chunk = input_table->Slice(offset, chunk_size_);
885 WriteChunk(in_chunk, vertex_chunk_index, chunk_index, validate_level));
891 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
892 IdType start_chunk_index, ValidateLevel validate_level)
const {
895 sortTable(input_table, getSortColumnName(adj_list_type_)));
896 if (adj_list_type_ == AdjListType::ordered_by_source ||
897 adj_list_type_ == AdjListType::ordered_by_dest) {
900 getOffsetTable(response_table, getSortColumnName(adj_list_type_),
901 vertex_chunk_index));
906 start_chunk_index, validate_level);
910 const std::shared_ptr<arrow::Table>& input_table,
911 const std::shared_ptr<PropertyGroup>& property_group,
912 IdType vertex_chunk_index, IdType start_chunk_index,
913 ValidateLevel validate_level)
const {
916 sortTable(input_table, getSortColumnName(adj_list_type_)));
918 start_chunk_index, validate_level);
922 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
923 IdType start_chunk_index, ValidateLevel validate_level)
const {
926 sortTable(input_table, getSortColumnName(adj_list_type_)));
928 start_chunk_index, validate_level);
932 const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
933 IdType start_chunk_index, ValidateLevel validate_level)
const {
936 sortTable(input_table, getSortColumnName(adj_list_type_)));
938 if (adj_list_type_ == AdjListType::ordered_by_source ||
939 adj_list_type_ == AdjListType::ordered_by_dest) {
942 getOffsetTable(response_table, getSortColumnName(adj_list_type_),
943 vertex_chunk_index));
948 return WriteTable(response_table, vertex_chunk_index, start_chunk_index,
952 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::getOffsetTable(
953 const std::shared_ptr<arrow::Table>& input_table,
954 const std::string& column_name, IdType vertex_chunk_index)
const {
955 std::shared_ptr<arrow::ChunkedArray> column =
956 input_table->GetColumnByName(column_name);
957 int64_t array_index = 0, index = 0;
959 std::static_pointer_cast<arrow::Int64Array>(column->chunk(array_index));
961 arrow::Int64Builder builder;
962 IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
963 end_index = begin_index + vertex_chunk_size_;
964 RETURN_NOT_ARROW_OK(builder.Append(0));
965 std::vector<std::shared_ptr<arrow::Array>> arrays;
966 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
967 std::string
property = GeneralParams::kOffsetCol;
968 schema_vector.push_back(
969 arrow::field(property, DataType::DataTypeToArrowDataType(int64())));
971 int64_t global_index = 0;
972 for (IdType i = begin_index; i < end_index; i++) {
974 if (array_index >= column->num_chunks())
976 if (index >= ids->length()) {
978 if (array_index == column->num_chunks())
980 ids = std::static_pointer_cast<arrow::Int64Array>(
981 column->chunk(array_index));
984 if (ids->IsNull(index) || !ids->IsValid(index)) {
989 int64_t x = ids->Value(index);
997 RETURN_NOT_ARROW_OK(builder.Append(global_index));
1000 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto array, builder.Finish());
1001 arrays.push_back(array);
1002 auto schema = std::make_shared<arrow::Schema>(schema_vector);
1003 return arrow::Table::Make(schema, arrays);
1006 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
1007 const std::shared_ptr<arrow::Table>& input_table,
1008 const std::string& column_name) {
1009 #if ARROW_VERSION >= 21000000
1010 RETURN_NOT_ARROW_OK(arrow::compute::Initialize());
1012 auto exec_context = arrow::compute::default_exec_context();
1013 auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie();
1014 auto table_source_options =
1015 arrow_acero_namespace::TableSourceNodeOptions{input_table};
1016 auto source = arrow_acero_namespace::MakeExecNode(
"table_source", plan.get(),
1017 {}, table_source_options)
1019 AsyncGeneratorType sink_gen;
1020 RETURN_NOT_ARROW_OK(
1021 arrow_acero_namespace::MakeExecNode(
1022 "order_by_sink", plan.get(), {source},
1023 arrow_acero_namespace::OrderBySinkNodeOptions{
1024 arrow::compute::SortOptions{{arrow::compute::SortKey{
1025 column_name, arrow::compute::SortOrder::Ascending}}},
1028 return ExecutePlanAndCollectAsTable(*exec_context, plan,
1029 input_table->schema(), sink_gen);
1032 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1033 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
1034 AdjListType adj_list_type,
const std::shared_ptr<WriterOptions>& options,
1035 const ValidateLevel& validate_level) {
1036 if (!edge_info->HasAdjacentListType(adj_list_type)) {
1037 return Status::KeyError(
1038 "The adjacent list type ", AdjListTypeToString(adj_list_type),
1039 " doesn't exist in edge ", edge_info->GetEdgeType(),
".");
1041 return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
1042 options, validate_level);
1045 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1046 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
1047 AdjListType adj_list_type,
const ValidateLevel& validate_level) {
1048 return Make(edge_info, prefix, adj_list_type,
1049 WriterOptions::DefaultWriterOption(), validate_level);
1052 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1053 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
1054 const std::string& edge_type,
const std::string& dst_type,
1055 AdjListType adj_list_type,
const std::shared_ptr<WriterOptions>& options,
1056 const ValidateLevel& validate_level) {
1057 auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1059 return Status::KeyError(
"The edge ", src_type,
" ", edge_type,
" ",
1060 dst_type,
" doesn't exist.");
1062 return Make(edge_info, graph_info->GetPrefix(), adj_list_type, options,
1066 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1067 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_type,
1068 const std::string& edge_type,
const std::string& dst_type,
1069 AdjListType adj_list_type,
const ValidateLevel& validate_level) {
1070 return Make(graph_info, src_type, edge_type, dst_type, adj_list_type,
1071 WriterOptions::DefaultWriterOption(), validate_level);
1074 std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
1075 switch (adj_list_type) {
1076 case AdjListType::unordered_by_source:
1077 return GeneralParams::kSrcIndexCol;
1078 case AdjListType::ordered_by_source:
1079 return GeneralParams::kSrcIndexCol;
1080 case AdjListType::unordered_by_dest:
1081 return GeneralParams::kDstIndexCol;
1082 case AdjListType::ordered_by_dest:
1083 return GeneralParams::kDstIndexCol;
1085 return GeneralParams::kSrcIndexCol;
1087 return GeneralParams::kSrcIndexCol;
Status WritePropertyChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single edge property group for an edge chunk.
Status SortAndWriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write the adj list chunks for the edges of a vertex chunk.
Status WriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list chunks for the edges of a vertex chunk.
Status WritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of a single property group for the edges of a vertex chunk.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list and all property groups for an edge chunk.
Status SortAndWriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of the adj list and all property groups for the edges of a vertex ch...
Status SortAndWritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of a single property group for the edges of a vertex chunk.
Status WriteAdjListChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the adj list chunk for an edge chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
EdgeChunkWriter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, const std::shared_ptr< WriterOptions > &options=WriterOptions::DefaultWriterOption(), const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the EdgeChunkWriter.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
static Status IndexError(Args &&... args)
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Status WriteLabelTable(const std::shared_ptr< arrow::Table > &input_table, IdType start_chunk_index, FileType file_type, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write all labels for multiple vertex chunks to corresponding files.
VertexPropertyWriter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const std::shared_ptr< WriterOptions > &options=WriterOptions::DefaultWriterOption(), const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the VertexPropertyWriter.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single property group for a single vertex chunk.
Result< std::shared_ptr< arrow::Table > > GetLabelTable(const std::shared_ptr< arrow::Table > &input_table, const std::vector< std::string > &labels) const
Get label column from table to formulate label table.
static Result< std::shared_ptr< VertexPropertyWriter > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const std::shared_ptr< WriterOptions > &options, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Construct a VertexPropertyWriter from vertex info.
Status WriteLabelChunk(const std::shared_ptr< arrow::Table > &input_table, IdType chunk_index, FileType file_type, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write all labels of a single vertex chunk to corresponding files.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType start_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write a single property group for multiple vertex chunks to corresponding files.