Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
chunk_writer.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <arrow/acero/api.h>
21 #include <cstddef>
22 #include <iostream>
23 #include <unordered_map>
24 #include <utility>
25 #include "arrow/api.h"
26 #include "arrow/compute/api.h"
27 #include "graphar/fwd.h"
28 #include "graphar/writer_util.h"
29 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
30 #include "arrow/acero/exec_plan.h"
31 #else
32 #include "arrow/compute/exec/exec_plan.h"
33 #endif
34 #include "arrow/dataset/dataset.h"
35 #include "arrow/dataset/file_base.h"
36 #include "arrow/dataset/file_parquet.h"
37 #include "arrow/dataset/plan.h"
38 #include "arrow/dataset/scanner.h"
39 
40 #include "graphar/arrow/chunk_writer.h"
41 #include "graphar/filesystem.h"
42 #include "graphar/general_params.h"
43 #include "graphar/graph_info.h"
44 #include "graphar/result.h"
45 #include "graphar/status.h"
46 #include "graphar/types.h"
47 #include "graphar/util.h"
48 
49 namespace graphar {
50 // common methods
51 
52 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
53 namespace arrow_acero_namespace = arrow::acero;
54 #else
55 namespace arrow_acero_namespace = arrow::compute;
56 #endif
57 
58 #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
59 using AsyncGeneratorType =
60  arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
61 #else
62 using AsyncGeneratorType =
63  arrow::AsyncGenerator<arrow::util::optional<arrow::compute::ExecBatch>>;
64 #endif
65 
74 Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable(
75  const arrow::compute::ExecContext& exec_context,
76  std::shared_ptr<arrow_acero_namespace::ExecPlan> plan,
77  std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
78  // translate sink_gen (async) to sink_reader (sync)
79  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
80  arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen),
81  exec_context.memory_pool());
82 
83  // validate the ExecPlan
84  RETURN_NOT_ARROW_OK(plan->Validate());
85  // start the ExecPlan
86 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
87  plan->StartProducing(); // arrow 12.0.0 or later return void, not Status
88 #else
89  RETURN_NOT_ARROW_OK(plan->StartProducing());
90 #endif
91 
92  // collect sink_reader into a Table
93  std::shared_ptr<arrow::Table> response_table;
94  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
95  response_table, arrow::Table::FromRecordBatchReader(sink_reader.get()));
96 
97  // stop producing
98  plan->StopProducing();
99  // plan mark finished
100  RETURN_NOT_ARROW_OK(plan->finished().status());
101  return response_table;
102 }
103 
104 // implementations for VertexPropertyChunkWriter
105 
107  const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
108  const std::shared_ptr<WriterOptions>& options,
109  const ValidateLevel& validate_level)
110  : vertex_info_(vertex_info),
111  prefix_(prefix),
112  validate_level_(validate_level),
113  options_(options) {
114  if (!options) {
115  options_ = WriterOptions::DefaultWriterOption();
116  }
117  if (validate_level_ == ValidateLevel::default_validate) {
118  throw std::runtime_error(
119  "default_validate is not allowed to be set as the global validate "
120  "level for VertexPropertyWriter");
121  }
122  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
123 }
124 
125 // Check if the operation of writing vertices number is allowed.
126 Status VertexPropertyWriter::validate(const IdType& count,
127  ValidateLevel validate_level) const {
128  // use the writer's validate level
129  if (validate_level == ValidateLevel::default_validate)
130  validate_level = validate_level_;
131  // no validate
132  if (validate_level == ValidateLevel::no_validate)
133  return Status::OK();
134  // weak & strong validate
135  if (count < 0) {
136  return Status::Invalid("The number of vertices is negative.");
137  }
138  return Status::OK();
139 }
140 
141 // Check if the operation of copying a file as a chunk is allowed.
142 Status VertexPropertyWriter::validate(
143  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
144  ValidateLevel validate_level) const {
145  // use the writer's validate level
146  if (validate_level == ValidateLevel::default_validate)
147  validate_level = validate_level_;
148  // no validate
149  if (validate_level == ValidateLevel::no_validate)
150  return Status::OK();
151  // weak & strong validate
152  if (!vertex_info_->HasPropertyGroup(property_group)) {
153  return Status::KeyError("The property group", " does not exist in ",
154  vertex_info_->GetType(), " vertex info.");
155  }
156  if (chunk_index < 0) {
157  return Status::IndexError("Negative chunk index ", chunk_index, ".");
158  }
159  return Status::OK();
160 }
161 
162 // Check if the operation of writing a table as a chunk is allowed.
163 Status VertexPropertyWriter::validate(
164  const std::shared_ptr<arrow::Table>& input_table,
165  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
166  ValidateLevel validate_level) const {
167  // use the writer's validate level
168  if (validate_level == ValidateLevel::default_validate) {
169  validate_level = validate_level_;
170  }
171  // no validate
172  if (validate_level == ValidateLevel::no_validate) {
173  return Status::OK();
174  }
175  // validate property_group & chunk_index
176  GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level));
177  // weak validate for the input_table
178  if (input_table->num_rows() > vertex_info_->GetChunkSize()) {
179  return Status::Invalid("The number of rows of input table is ",
180  input_table->num_rows(),
181  " which is larger than the vertex chunk size",
182  vertex_info_->GetChunkSize(), ".");
183  }
184  // strong validate for the input_table
185  if (validate_level == ValidateLevel::strong_validate) {
186  // validate the input table
187  RETURN_NOT_ARROW_OK(input_table->Validate());
188  // validate the schema
189  auto schema = input_table->schema();
190  for (auto& property : property_group->GetProperties()) {
191  int indice = schema->GetFieldIndex(property.name);
192  if (indice == -1) {
193  return Status::Invalid("Column named ", property.name,
194  " of property group ", property_group,
195  " does not exist in the input table.");
196  }
197  auto field = schema->field(indice);
198  auto schema_data_type = DataType::DataTypeToArrowDataType(property.type);
199  if (property.cardinality != Cardinality::SINGLE) {
200  schema_data_type = arrow::list(schema_data_type);
201  }
202  if (!DataType::ArrowDataTypeToDataType(field->type())
203  ->Equals(DataType::ArrowDataTypeToDataType(schema_data_type))) {
204  return Status::TypeError(
205  "The data type of property: ", property.name, " is ",
206  DataType::ArrowDataTypeToDataType(schema_data_type)->ToTypeName(),
207  ", but got ",
208  DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
209  ".");
210  }
211  }
212  }
213  return Status::OK();
214 }
215 
217  const IdType& count, ValidateLevel validate_level) const {
218  GAR_RETURN_NOT_OK(validate(count, validate_level));
219  GAR_ASSIGN_OR_RAISE(auto suffix, vertex_info_->GetVerticesNumFilePath());
220  std::string path = prefix_ + suffix;
221  return fs_->WriteValueToFile<IdType>(count, path);
222 }
223 
225  const std::shared_ptr<arrow::Table>& input_table,
226  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
227  ValidateLevel validate_level) const {
228  GAR_RETURN_NOT_OK(
229  validate(input_table, property_group, chunk_index, validate_level));
230  auto file_type = property_group->GetFileType();
231  auto schema = input_table->schema();
232  int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
233  if (indice == -1) {
234  return Status::Invalid("The internal id Column named ",
235  GeneralParams::kVertexIndexCol,
236  " does not exist in the input table.");
237  }
238 
239  std::vector<int> indices({indice});
240  for (auto& property : property_group->GetProperties()) {
241  int indice = schema->GetFieldIndex(property.name);
242  if (indice == -1) {
243  return Status::Invalid("Column named ", property.name,
244  " of property group ", property_group,
245  " of vertex ", vertex_info_->GetType(),
246  " does not exist in the input table.");
247  }
248  indices.push_back(indice);
249  }
250  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
251  input_table->SelectColumns(indices));
252  GAR_ASSIGN_OR_RAISE(auto suffix,
253  vertex_info_->GetFilePath(property_group, chunk_index));
254  std::string path = prefix_ + suffix;
255  return fs_->WriteTableToFile(in_table, file_type, path, options_);
256 }
257 
259  const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
260  ValidateLevel validate_level) const {
261  auto property_groups = vertex_info_->GetPropertyGroups();
262  for (auto& property_group : property_groups) {
263  GAR_RETURN_NOT_OK(
264  WriteChunk(input_table, property_group, chunk_index, validate_level));
265  }
266  return Status::OK();
267 }
268 
270  const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
271  FileType file_type, ValidateLevel validate_level) const {
272  auto schema = input_table->schema();
273  std::vector<int> indices;
274  for (int i = 0; i < schema->num_fields(); i++) {
275  indices.push_back(i);
276  }
277 
278  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
279  input_table->SelectColumns(indices));
280  std::string suffix =
281  vertex_info_->GetPrefix() + "labels/chunk" + std::to_string(chunk_index);
282  std::string path = prefix_ + suffix;
283  return fs_->WriteLabelTableToFile(input_table, path);
284 }
285 
287  const std::shared_ptr<arrow::Table>& input_table,
288  const std::shared_ptr<PropertyGroup>& property_group,
289  IdType start_chunk_index, ValidateLevel validate_level) const {
290  auto schema = input_table->schema();
291  int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
292  auto table_with_index = input_table;
293  if (indice == -1) {
294  // add index column
295  GAR_ASSIGN_OR_RAISE(table_with_index,
296  AddIndexColumn(input_table, start_chunk_index,
297  vertex_info_->GetChunkSize()));
298  }
299  IdType chunk_size = vertex_info_->GetChunkSize();
300  int64_t length = table_with_index->num_rows();
301  IdType chunk_index = start_chunk_index;
302  for (int64_t offset = 0; offset < length;
303  offset += chunk_size, chunk_index++) {
304  auto in_chunk = table_with_index->Slice(offset, chunk_size);
305  GAR_RETURN_NOT_OK(
306  WriteChunk(in_chunk, property_group, chunk_index, validate_level));
307  }
308  return Status::OK();
309 }
310 
312  const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
313  ValidateLevel validate_level) const {
314  auto property_groups = vertex_info_->GetPropertyGroups();
315  GAR_ASSIGN_OR_RAISE(auto table_with_index,
316  AddIndexColumn(input_table, start_chunk_index,
317  vertex_info_->GetChunkSize()));
318  for (auto& property_group : property_groups) {
319  GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group,
320  start_chunk_index, validate_level));
321  }
322  auto labels = vertex_info_->GetLabels();
323  if (!labels.empty()) {
324  GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTable(input_table, labels))
325  GAR_RETURN_NOT_OK(WriteLabelTable(label_table, start_chunk_index,
326  FileType::PARQUET, validate_level));
327  }
328 
329  return Status::OK();
330 }
331 
333  const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
334  FileType file_type, ValidateLevel validate_level) const {
335  auto schema = input_table->schema();
336  int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
337  IdType chunk_size = vertex_info_->GetChunkSize();
338  int64_t length = input_table->num_rows();
339  IdType chunk_index = start_chunk_index;
340  for (int64_t offset = 0; offset < length;
341  offset += chunk_size, chunk_index++) {
342  auto in_chunk = input_table->Slice(offset, chunk_size);
343  GAR_RETURN_NOT_OK(
344  WriteLabelChunk(in_chunk, chunk_index, file_type, validate_level));
345  }
346  return Status::OK();
347 }
348 
349 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::GetLabelTable(
350  const std::shared_ptr<arrow::Table>& input_table,
351  const std::vector<std::string>& labels) const {
352  // Find the label column index
353  auto label_col_idx =
354  input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol);
355  if (label_col_idx == -1) {
356  return Status::KeyError("label column not found in the input table.");
357  }
358 
359  // Create a matrix of booleans with dimensions [number of rows, number of
360  // labels]
361  std::vector<std::vector<bool>> bool_matrix(
362  input_table->num_rows(), std::vector<bool>(labels.size(), false));
363 
364  // Create a map for labels to column indices
365  std::unordered_map<std::string, int> label_to_index;
366  for (size_t i = 0; i < labels.size(); ++i) {
367  label_to_index[labels[i]] = i;
368  }
369 
370  int row_offset = 0; // Offset for where to fill the bool_matrix
371  // Iterate through each chunk of the :LABEL column
372  for (int64_t chunk_idx = 0;
373  chunk_idx < input_table->column(label_col_idx)->num_chunks();
374  ++chunk_idx) {
375  auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx);
376  auto label_column = std::static_pointer_cast<arrow::StringArray>(chunk);
377 
378  // Populate the matrix based on :LABEL column values
379  // TODO(@yangxk): store array in the label_column, split the string when
380  // reading file
381  for (int64_t row = 0; row < label_column->length(); ++row) {
382  if (label_column->IsValid(row)) {
383  std::string labels_string = label_column->GetString(row);
384  auto row_labels = SplitString(labels_string, ';');
385  for (const auto& lbl : row_labels) {
386  if (label_to_index.find(lbl) != label_to_index.end()) {
387  bool_matrix[row_offset + row][label_to_index[lbl]] = true;
388  }
389  }
390  }
391  }
392 
393  row_offset +=
394  label_column->length(); // Update the row offset for the next chunk
395  }
396 
397  // Create Arrow arrays for each label column
398  arrow::FieldVector fields;
399  arrow::ArrayVector arrays;
400 
401  for (const auto& label : labels) {
402  arrow::BooleanBuilder builder;
403  for (const auto& row : bool_matrix) {
404  RETURN_NOT_ARROW_OK(builder.Append(row[label_to_index[label]]));
405  }
406 
407  std::shared_ptr<arrow::Array> array;
408  RETURN_NOT_ARROW_OK(builder.Finish(&array));
409  fields.push_back(arrow::field(label, arrow::boolean()));
410  arrays.push_back(array);
411  }
412 
413  // Create the Arrow Table with the boolean columns
414  auto schema = std::make_shared<arrow::Schema>(fields);
415  auto result_table = arrow::Table::Make(schema, arrays);
416 
417  return result_table;
418 }
419 
420 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
421  const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
422  const std::shared_ptr<WriterOptions>& options,
423  const ValidateLevel& validate_level) {
424  return std::make_shared<VertexPropertyWriter>(vertex_info, prefix, options,
425  validate_level);
426 }
427 
428 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
429  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
430  const std::shared_ptr<WriterOptions>& options,
431  const ValidateLevel& validate_level) {
432  auto vertex_info = graph_info->GetVertexInfo(type);
433  if (!vertex_info) {
434  return Status::KeyError("The vertex ", type, " doesn't exist.");
435  }
436  return Make(vertex_info, graph_info->GetPrefix(), options, validate_level);
437 }
438 
439 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
440  const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
441  const ValidateLevel& validate_level) {
442  return Make(vertex_info, prefix, WriterOptions::DefaultWriterOption(),
443  validate_level);
444 }
445 
446 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
447  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
448  const ValidateLevel& validate_level) {
449  return Make(graph_info, type, WriterOptions::DefaultWriterOption(),
450  validate_level);
451 }
452 
453 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::AddIndexColumn(
454  const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
455  IdType chunk_size) const {
456  arrow::Int64Builder array_builder;
457  RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
458  int64_t length = table->num_rows();
459  for (IdType i = 0; i < length; i++) {
460  RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
461  }
462  std::shared_ptr<arrow::Array> array;
463  RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
464  std::shared_ptr<arrow::ChunkedArray> chunked_array =
465  std::make_shared<arrow::ChunkedArray>(array);
466  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
467  auto ret, table->AddColumn(0,
468  arrow::field(GeneralParams::kVertexIndexCol,
469  arrow::int64(), false),
470  chunked_array));
471  return ret;
472 }
473 
474 // implementations for EdgeChunkWriter
475 
476 EdgeChunkWriter::EdgeChunkWriter(const std::shared_ptr<EdgeInfo>& edge_info,
477  const std::string& prefix,
478  AdjListType adj_list_type,
479  const std::shared_ptr<WriterOptions>& options,
480  const ValidateLevel& validate_level)
481  : edge_info_(edge_info),
482  adj_list_type_(adj_list_type),
483  validate_level_(validate_level),
484  options_(options) {
485  if (!options) {
486  options_ = WriterOptions::DefaultWriterOption();
487  }
488  if (validate_level_ == ValidateLevel::default_validate) {
489  throw std::runtime_error(
490  "default_validate is not allowed to be set as the global validate "
491  "level for EdgeChunkWriter");
492  }
493  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
494  chunk_size_ = edge_info_->GetChunkSize();
495  switch (adj_list_type) {
496  case AdjListType::unordered_by_source:
497  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
498  break;
499  case AdjListType::ordered_by_source:
500  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
501  break;
502  case AdjListType::unordered_by_dest:
503  vertex_chunk_size_ = edge_info_->GetDstChunkSize();
504  break;
505  case AdjListType::ordered_by_dest:
506  vertex_chunk_size_ = edge_info_->GetDstChunkSize();
507  break;
508  default:
509  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
510  }
511 }
512 // Check if the operation of writing number or copying a file is allowed.
513 Status EdgeChunkWriter::validate(IdType count_or_index1, IdType count_or_index2,
514  ValidateLevel validate_level) const {
515  // use the writer's validate level
516  if (validate_level == ValidateLevel::default_validate)
517  validate_level = validate_level_;
518  // no validate
519  if (validate_level == ValidateLevel::no_validate)
520  return Status::OK();
521  // weak & strong validate for adj list type
522  if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
523  return Status::KeyError(
524  "Adj list type ", AdjListTypeToString(adj_list_type_),
525  " does not exist in the ", edge_info_->GetEdgeType(), " edge info.");
526  }
527  // weak & strong validate for count or index
528  if (count_or_index1 < 0 || count_or_index2 < 0) {
529  return Status::IndexError(
530  "The count or index must be non-negative, but got ", count_or_index1,
531  " and ", count_or_index2, ".");
532  }
533  return Status::OK();
534 }
535 
536 // Check if the operation of copying a file as a property chunk is allowed.
537 Status EdgeChunkWriter::validate(
538  const std::shared_ptr<PropertyGroup>& property_group,
539  IdType vertex_chunk_index, IdType chunk_index,
540  ValidateLevel validate_level) const {
541  // use the writer's validate level
542  if (validate_level == ValidateLevel::default_validate)
543  validate_level = validate_level_;
544  // no validate
545  if (validate_level == ValidateLevel::no_validate)
546  return Status::OK();
547  // validate for adj list type & index
548  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
549  // weak & strong validate for property group
550  if (!edge_info_->HasPropertyGroup(property_group)) {
551  return Status::KeyError("Property group", " does not exist in the ",
552  edge_info_->GetEdgeType(), " edge info.");
553  }
554  return Status::OK();
555 }
556 
557 // Check if the operation of writing a table as an offset chunk is allowed.
558 Status EdgeChunkWriter::validate(
559  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
560  ValidateLevel validate_level) const {
561  // use the writer's validate level
562  if (validate_level == ValidateLevel::default_validate)
563  validate_level = validate_level_;
564  // no validate
565  if (validate_level == ValidateLevel::no_validate)
566  return Status::OK();
567  // validate for adj list type & index
568  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level));
569  // weak validate for the input table
570  if (adj_list_type_ != AdjListType::ordered_by_source &&
571  adj_list_type_ != AdjListType::ordered_by_dest) {
572  return Status::Invalid(
573  "The adj list type has to be ordered_by_source or ordered_by_dest, but "
574  "got " +
575  std::string(AdjListTypeToString(adj_list_type_)));
576  }
577  if (adj_list_type_ == AdjListType::ordered_by_source &&
578  input_table->num_rows() > edge_info_->GetSrcChunkSize() + 1) {
579  return Status::Invalid(
580  "The number of rows of input offset table is ", input_table->num_rows(),
581  " which is larger than the offset size of source vertex chunk ",
582  edge_info_->GetSrcChunkSize() + 1, ".");
583  }
584  if (adj_list_type_ == AdjListType::ordered_by_dest &&
585  input_table->num_rows() > edge_info_->GetDstChunkSize() + 1) {
586  return Status::Invalid(
587  "The number of rows of input offset table is ", input_table->num_rows(),
588  " which is larger than the offset size of destination vertex chunk ",
589  edge_info_->GetSrcChunkSize() + 1, ".");
590  }
591  // strong validate for the input_table
592  if (validate_level == ValidateLevel::strong_validate) {
593  // validate the input table
594  RETURN_NOT_ARROW_OK(input_table->Validate());
595  // validate the schema
596  auto schema = input_table->schema();
597  int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
598  if (index == -1) {
599  return Status::Invalid("The offset column ", GeneralParams::kOffsetCol,
600  " does not exist in the input table");
601  }
602  auto field = schema->field(index);
603  if (field->type()->id() != arrow::Type::INT64) {
604  return Status::TypeError(
605  "The data type for offset column should be INT64, but got ",
606  field->type()->name());
607  }
608  }
609  return Status::OK();
610 }
611 
612 // Check if the operation of writing a table as an adj list chunk is allowed.
613 Status EdgeChunkWriter::validate(
614  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
615  IdType chunk_index, ValidateLevel validate_level) const {
616  // use the writer's validate level
617  if (validate_level == ValidateLevel::default_validate)
618  validate_level = validate_level_;
619  // no validate
620  if (validate_level == ValidateLevel::no_validate)
621  return Status::OK();
622  // validate for adj list type & index
623  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
624  // weak validate for the input table
625  if (input_table->num_rows() > edge_info_->GetChunkSize()) {
626  return Status::Invalid(
627  "The number of rows of input table is ", input_table->num_rows(),
628  " which is larger than the ", edge_info_->GetEdgeType(),
629  " edge chunk size ", edge_info_->GetChunkSize(), ".");
630  }
631  // strong validate for the input table
632  if (validate_level == ValidateLevel::strong_validate) {
633  auto schema = input_table->schema();
634  int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
635  if (index == -1) {
636  return Status::Invalid("The source index column ",
637  GeneralParams::kSrcIndexCol,
638  " does not exist in the input table");
639  }
640  auto field = schema->field(index);
641  if (field->type()->id() != arrow::Type::INT64) {
642  return Status::TypeError(
643  "The data type for source index column should be INT64, but got ",
644  field->type()->name());
645  }
646  index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
647  if (index == -1) {
648  return Status::Invalid("The destination index column ",
649  GeneralParams::kDstIndexCol,
650  " does not exist in the input table");
651  }
652  field = schema->field(index);
653  if (field->type()->id() != arrow::Type::INT64) {
654  return Status::TypeError(
655  "The data type for destination index column should be INT64, but "
656  "got ",
657  field->type()->name());
658  }
659  }
660  return Status::OK();
661 }
662 
663 // Check if the operation of writing a table as a property chunk is allowed.
664 Status EdgeChunkWriter::validate(
665  const std::shared_ptr<arrow::Table>& input_table,
666  const std::shared_ptr<PropertyGroup>& property_group,
667  IdType vertex_chunk_index, IdType chunk_index,
668  ValidateLevel validate_level) const {
669  // use the writer's validate level
670  if (validate_level == ValidateLevel::default_validate)
671  validate_level = validate_level_;
672  // no validate
673  if (validate_level == ValidateLevel::no_validate)
674  return Status::OK();
675  // validate for property group, adj list type & index
676  GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index,
677  validate_level));
678  // weak validate for the input table
679  if (input_table->num_rows() > edge_info_->GetChunkSize()) {
680  return Status::Invalid(
681  "The number of rows of input table is ", input_table->num_rows(),
682  " which is larger than the ", edge_info_->GetEdgeType(),
683  " edge chunk size ", edge_info_->GetChunkSize(), ".");
684  }
685  // strong validate for the input table
686  if (validate_level == ValidateLevel::strong_validate) {
687  // validate the input table
688  RETURN_NOT_ARROW_OK(input_table->Validate());
689  // validate the schema
690  auto schema = input_table->schema();
691  for (auto& property : property_group->GetProperties()) {
692  int indice = schema->GetFieldIndex(property.name);
693  if (indice == -1) {
694  return Status::Invalid("Column named ", property.name,
695  " of property group ", property_group,
696  " does not exist in the input table.");
697  }
698  auto field = schema->field(indice);
699  if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
700  return Status::TypeError(
701  "The data type of property: ", property.name, " is ",
702  property.type->ToTypeName(), ", but got ",
703  DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
704  ".");
705  }
706  }
707  }
708  return Status::OK();
709 }
710 
711 Status EdgeChunkWriter::WriteEdgesNum(IdType vertex_chunk_index,
712  const IdType& count,
713  ValidateLevel validate_level) const {
714  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, count, validate_level));
715  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetEdgesNumFilePath(
716  vertex_chunk_index, adj_list_type_));
717  std::string path = prefix_ + suffix;
718  return fs_->WriteValueToFile<IdType>(count, path);
719 }
720 
722  ValidateLevel validate_level) const {
723  GAR_RETURN_NOT_OK(validate(0, count, validate_level));
724  GAR_ASSIGN_OR_RAISE(auto suffix,
725  edge_info_->GetVerticesNumFilePath(adj_list_type_));
726  std::string path = prefix_ + suffix;
727  return fs_->WriteValueToFile<IdType>(count, path);
728 }
729 
731  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
732  ValidateLevel validate_level) const {
733  GAR_RETURN_NOT_OK(validate(input_table, vertex_chunk_index, validate_level));
734  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
735  auto schema = input_table->schema();
736  int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
737  if (index == -1) {
738  return Status::Invalid("The offset column ", GeneralParams::kOffsetCol,
739  " does not exist in the input table");
740  }
741  auto in_table = input_table->SelectColumns({index}).ValueOrDie();
742  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath(
743  vertex_chunk_index, adj_list_type_));
744  std::string path = prefix_ + suffix;
745  return fs_->WriteTableToFile(in_table, file_type, path, options_);
746 }
747 
749  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
750  IdType chunk_index, ValidateLevel validate_level) const {
751  GAR_RETURN_NOT_OK(
752  validate(input_table, vertex_chunk_index, chunk_index, validate_level));
753  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
754  std::vector<int> indices;
755  indices.clear();
756  auto schema = input_table->schema();
757  int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
758  if (index == -1) {
759  return Status::Invalid("The source index column ",
760  GeneralParams::kSrcIndexCol,
761  " does not exist in the input table");
762  }
763  indices.push_back(index);
764  index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
765  if (index == -1) {
766  return Status::Invalid("The destination index column ",
767  GeneralParams::kDstIndexCol,
768  " does not exist in the input table");
769  }
770  indices.push_back(index);
771  auto in_table = input_table->SelectColumns(indices).ValueOrDie();
772 
773  GAR_ASSIGN_OR_RAISE(
774  auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
775  chunk_index, adj_list_type_));
776  std::string path = prefix_ + suffix;
777  return fs_->WriteTableToFile(in_table, file_type, path, options_);
778 }
779 
781  const std::shared_ptr<arrow::Table>& input_table,
782  const std::shared_ptr<PropertyGroup>& property_group,
783  IdType vertex_chunk_index, IdType chunk_index,
784  ValidateLevel validate_level) const {
785  GAR_RETURN_NOT_OK(validate(input_table, property_group, vertex_chunk_index,
786  chunk_index, validate_level));
787  auto file_type = property_group->GetFileType();
788 
789  std::vector<int> indices;
790  indices.clear();
791  auto schema = input_table->schema();
792  for (auto& property : property_group->GetProperties()) {
793  int indice = schema->GetFieldIndex(property.name);
794  if (indice == -1) {
795  return Status::Invalid("Column named ", property.name,
796  " of property group ", property_group, " of edge ",
797  edge_info_->GetEdgeType(),
798  " does not exist in the input table.");
799  }
800  indices.push_back(indice);
801  }
802  auto in_table = input_table->SelectColumns(indices).ValueOrDie();
803  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetPropertyFilePath(
804  property_group, adj_list_type_,
805  vertex_chunk_index, chunk_index));
806  std::string path = prefix_ + suffix;
807  return fs_->WriteTableToFile(in_table, file_type, path, options_);
808 }
809 
811  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
812  IdType chunk_index, ValidateLevel validate_level) const {
813  const auto& property_groups = edge_info_->GetPropertyGroups();
814  for (auto& property_group : property_groups) {
815  GAR_RETURN_NOT_OK(WritePropertyChunk(input_table, property_group,
816  vertex_chunk_index, chunk_index,
817  validate_level));
818  }
819  return Status::OK();
820 }
821 
823  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
824  IdType chunk_index, ValidateLevel validate_level) const {
825  GAR_RETURN_NOT_OK(WriteAdjListChunk(input_table, vertex_chunk_index,
826  chunk_index, validate_level));
827  return WritePropertyChunk(input_table, vertex_chunk_index, chunk_index,
828  validate_level);
829 }
830 
832  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
833  IdType start_chunk_index, ValidateLevel validate_level) const {
834  int64_t length = input_table->num_rows();
835  IdType chunk_index = start_chunk_index;
836  for (int64_t offset = 0; offset < length;
837  offset += chunk_size_, chunk_index++) {
838  auto in_chunk = input_table->Slice(offset, chunk_size_);
839  GAR_RETURN_NOT_OK(WriteAdjListChunk(in_chunk, vertex_chunk_index,
840  chunk_index, validate_level));
841  }
842  return Status::OK();
843 }
844 
846  const std::shared_ptr<arrow::Table>& input_table,
847  const std::shared_ptr<PropertyGroup>& property_group,
848  IdType vertex_chunk_index, IdType start_chunk_index,
849  ValidateLevel validate_level) const {
850  int64_t length = input_table->num_rows();
851  IdType chunk_index = start_chunk_index;
852  for (int64_t offset = 0; offset < length;
853  offset += chunk_size_, chunk_index++) {
854  auto in_chunk = input_table->Slice(offset, chunk_size_);
855  GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, property_group,
856  vertex_chunk_index, chunk_index,
857  validate_level));
858  }
859  return Status::OK();
860 }
861 
863  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
864  IdType start_chunk_index, ValidateLevel validate_level) const {
865  int64_t length = input_table->num_rows();
866  IdType chunk_index = start_chunk_index;
867  for (int64_t offset = 0; offset < length;
868  offset += chunk_size_, chunk_index++) {
869  auto in_chunk = input_table->Slice(offset, chunk_size_);
870  GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, vertex_chunk_index,
871  chunk_index, validate_level));
872  }
873  return Status::OK();
874 }
875 
877  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
878  IdType start_chunk_index, ValidateLevel validate_level) const {
879  int64_t length = input_table->num_rows();
880  IdType chunk_index = start_chunk_index;
881  for (int64_t offset = 0; offset < length;
882  offset += chunk_size_, chunk_index++) {
883  auto in_chunk = input_table->Slice(offset, chunk_size_);
884  GAR_RETURN_NOT_OK(
885  WriteChunk(in_chunk, vertex_chunk_index, chunk_index, validate_level));
886  }
887  return Status::OK();
888 }
889 
891  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
892  IdType start_chunk_index, ValidateLevel validate_level) const {
893  GAR_ASSIGN_OR_RAISE(
894  auto response_table,
895  sortTable(input_table, getSortColumnName(adj_list_type_)));
896  if (adj_list_type_ == AdjListType::ordered_by_source ||
897  adj_list_type_ == AdjListType::ordered_by_dest) {
898  GAR_ASSIGN_OR_RAISE(
899  auto offset_table,
900  getOffsetTable(response_table, getSortColumnName(adj_list_type_),
901  vertex_chunk_index));
902  GAR_RETURN_NOT_OK(
903  WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level));
904  }
905  return WriteAdjListTable(response_table, vertex_chunk_index,
906  start_chunk_index, validate_level);
907 }
908 
910  const std::shared_ptr<arrow::Table>& input_table,
911  const std::shared_ptr<PropertyGroup>& property_group,
912  IdType vertex_chunk_index, IdType start_chunk_index,
913  ValidateLevel validate_level) const {
914  GAR_ASSIGN_OR_RAISE(
915  auto response_table,
916  sortTable(input_table, getSortColumnName(adj_list_type_)));
917  return WritePropertyTable(response_table, property_group, vertex_chunk_index,
918  start_chunk_index, validate_level);
919 }
920 
922  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
923  IdType start_chunk_index, ValidateLevel validate_level) const {
924  GAR_ASSIGN_OR_RAISE(
925  auto response_table,
926  sortTable(input_table, getSortColumnName(adj_list_type_)));
927  return WritePropertyTable(response_table, vertex_chunk_index,
928  start_chunk_index, validate_level);
929 }
930 
932  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
933  IdType start_chunk_index, ValidateLevel validate_level) const {
934  GAR_ASSIGN_OR_RAISE(
935  auto response_table,
936  sortTable(input_table, getSortColumnName(adj_list_type_)));
937 
938  if (adj_list_type_ == AdjListType::ordered_by_source ||
939  adj_list_type_ == AdjListType::ordered_by_dest) {
940  GAR_ASSIGN_OR_RAISE(
941  auto offset_table,
942  getOffsetTable(response_table, getSortColumnName(adj_list_type_),
943  vertex_chunk_index));
944  GAR_RETURN_NOT_OK(
945  WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level));
946  }
947 
948  return WriteTable(response_table, vertex_chunk_index, start_chunk_index,
949  validate_level);
950 }
951 
952 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::getOffsetTable(
953  const std::shared_ptr<arrow::Table>& input_table,
954  const std::string& column_name, IdType vertex_chunk_index) const {
955  std::shared_ptr<arrow::ChunkedArray> column =
956  input_table->GetColumnByName(column_name);
957  int64_t array_index = 0, index = 0;
958  auto ids =
959  std::static_pointer_cast<arrow::Int64Array>(column->chunk(array_index));
960 
961  arrow::Int64Builder builder;
962  IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
963  end_index = begin_index + vertex_chunk_size_;
964  RETURN_NOT_ARROW_OK(builder.Append(0));
965  std::vector<std::shared_ptr<arrow::Array>> arrays;
966  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
967  std::string property = GeneralParams::kOffsetCol;
968  schema_vector.push_back(
969  arrow::field(property, DataType::DataTypeToArrowDataType(int64())));
970 
971  int64_t global_index = 0;
972  for (IdType i = begin_index; i < end_index; i++) {
973  while (true) {
974  if (array_index >= column->num_chunks())
975  break;
976  if (index >= ids->length()) {
977  array_index++;
978  if (array_index == column->num_chunks())
979  break;
980  ids = std::static_pointer_cast<arrow::Int64Array>(
981  column->chunk(array_index));
982  index = 0;
983  }
984  if (ids->IsNull(index) || !ids->IsValid(index)) {
985  index++;
986  global_index++;
987  continue;
988  }
989  int64_t x = ids->Value(index);
990  if (x <= i) {
991  index++;
992  global_index++;
993  } else {
994  break;
995  }
996  }
997  RETURN_NOT_ARROW_OK(builder.Append(global_index));
998  }
999 
1000  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish());
1001  arrays.push_back(array);
1002  auto schema = std::make_shared<arrow::Schema>(schema_vector);
1003  return arrow::Table::Make(schema, arrays);
1004 }
1005 
1006 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
1007  const std::shared_ptr<arrow::Table>& input_table,
1008  const std::string& column_name) {
1009 #if ARROW_VERSION >= 21000000
1010  RETURN_NOT_ARROW_OK(arrow::compute::Initialize());
1011 #endif
1012  auto exec_context = arrow::compute::default_exec_context();
1013  auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie();
1014  auto table_source_options =
1015  arrow_acero_namespace::TableSourceNodeOptions{input_table};
1016  auto source = arrow_acero_namespace::MakeExecNode("table_source", plan.get(),
1017  {}, table_source_options)
1018  .ValueOrDie();
1019  AsyncGeneratorType sink_gen;
1020  RETURN_NOT_ARROW_OK(
1021  arrow_acero_namespace::MakeExecNode(
1022  "order_by_sink", plan.get(), {source},
1023  arrow_acero_namespace::OrderBySinkNodeOptions{
1024  arrow::compute::SortOptions{{arrow::compute::SortKey{
1025  column_name, arrow::compute::SortOrder::Ascending}}},
1026  &sink_gen})
1027  .status());
1028  return ExecutePlanAndCollectAsTable(*exec_context, plan,
1029  input_table->schema(), sink_gen);
1030 }
1031 
1032 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1033  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1034  AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
1035  const ValidateLevel& validate_level) {
1036  if (!edge_info->HasAdjacentListType(adj_list_type)) {
1037  return Status::KeyError(
1038  "The adjacent list type ", AdjListTypeToString(adj_list_type),
1039  " doesn't exist in edge ", edge_info->GetEdgeType(), ".");
1040  }
1041  return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
1042  options, validate_level);
1043 }
1044 
1045 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1046  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1047  AdjListType adj_list_type, const ValidateLevel& validate_level) {
1048  return Make(edge_info, prefix, adj_list_type,
1049  WriterOptions::DefaultWriterOption(), validate_level);
1050 }
1051 
1052 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1053  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
1054  const std::string& edge_type, const std::string& dst_type,
1055  AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
1056  const ValidateLevel& validate_level) {
1057  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1058  if (!edge_info) {
1059  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
1060  dst_type, " doesn't exist.");
1061  }
1062  return Make(edge_info, graph_info->GetPrefix(), adj_list_type, options,
1063  validate_level);
1064 }
1065 
1066 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
1067  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
1068  const std::string& edge_type, const std::string& dst_type,
1069  AdjListType adj_list_type, const ValidateLevel& validate_level) {
1070  return Make(graph_info, src_type, edge_type, dst_type, adj_list_type,
1071  WriterOptions::DefaultWriterOption(), validate_level);
1072 }
1073 
1074 std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
1075  switch (adj_list_type) {
1076  case AdjListType::unordered_by_source:
1077  return GeneralParams::kSrcIndexCol;
1078  case AdjListType::ordered_by_source:
1079  return GeneralParams::kSrcIndexCol;
1080  case AdjListType::unordered_by_dest:
1081  return GeneralParams::kDstIndexCol;
1082  case AdjListType::ordered_by_dest:
1083  return GeneralParams::kDstIndexCol;
1084  default:
1085  return GeneralParams::kSrcIndexCol;
1086  }
1087  return GeneralParams::kSrcIndexCol;
1088 }
1089 } // namespace graphar
Status WritePropertyChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single edge property group for an edge chunk.
Status SortAndWriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write the adj list chunks for the edges of a vertex chunk.
Status WriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list chunks for the edges of a vertex chunk.
Status WritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of a single property group for the edges of a vertex chunk.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list and all property groups for an edge chunk.
Status SortAndWriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of the adj list and all property groups for the edges of a vertex ch...
Status SortAndWritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of a single property group for the edges of a vertex chunk.
Status WriteAdjListChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the adj list chunk for an edge chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
EdgeChunkWriter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, const std::shared_ptr< WriterOptions > &options=WriterOptions::DefaultWriterOption(), const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the EdgeChunkWriter.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
Definition: status.h:123
static Status IndexError(Args &&... args)
Definition: status.h:197
static Status TypeError(Args &&... args)
Definition: status.h:178
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
Status WriteLabelTable(const std::shared_ptr< arrow::Table > &input_table, IdType start_chunk_index, FileType file_type, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write all labels for multiple vertex chunks to corresponding files.
VertexPropertyWriter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const std::shared_ptr< WriterOptions > &options=WriterOptions::DefaultWriterOption(), const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the VertexPropertyWriter.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single property group for a single vertex chunk.
Result< std::shared_ptr< arrow::Table > > GetLabelTable(const std::shared_ptr< arrow::Table > &input_table, const std::vector< std::string > &labels) const
Get label column from table to formulate label table.
static Result< std::shared_ptr< VertexPropertyWriter > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const std::shared_ptr< WriterOptions > &options, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Construct a VertexPropertyWriter from vertex info.
Status WriteLabelChunk(const std::shared_ptr< arrow::Table > &input_table, IdType chunk_index, FileType file_type, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write all labels of a single vertex chunk to corresponding files.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType start_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write a single property group for multiple vertex chunks to corresponding files.