Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
chunk_writer.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <iostream>
21 #include <utility>
22 
23 #include "arrow/api.h"
24 #include "arrow/compute/api.h"
25 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
26 #include "arrow/acero/exec_plan.h"
27 #else
28 #include "arrow/compute/exec/exec_plan.h"
29 #endif
30 #include "arrow/dataset/dataset.h"
31 #include "arrow/dataset/file_base.h"
32 #include "arrow/dataset/file_parquet.h"
33 #include "arrow/dataset/plan.h"
34 #include "arrow/dataset/scanner.h"
35 
36 #include "graphar/arrow/chunk_writer.h"
37 #include "graphar/filesystem.h"
38 #include "graphar/general_params.h"
39 #include "graphar/graph_info.h"
40 #include "graphar/result.h"
41 #include "graphar/status.h"
42 #include "graphar/types.h"
43 #include "graphar/util.h"
44 
45 namespace graphar {
46 // common methods
47 
48 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
49 namespace arrow_acero_namespace = arrow::acero;
50 #else
51 namespace arrow_acero_namespace = arrow::compute;
52 #endif
53 
54 #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
55 using AsyncGeneratorType =
56  arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
57 #else
58 using AsyncGeneratorType =
59  arrow::AsyncGenerator<arrow::util::optional<arrow::compute::ExecBatch>>;
60 #endif
61 
70 Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable(
71  const arrow::compute::ExecContext& exec_context,
72  std::shared_ptr<arrow_acero_namespace::ExecPlan> plan,
73  std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
74  // translate sink_gen (async) to sink_reader (sync)
75  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
76  arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen),
77  exec_context.memory_pool());
78 
79  // validate the ExecPlan
80  RETURN_NOT_ARROW_OK(plan->Validate());
81  // start the ExecPlan
82 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
83  plan->StartProducing(); // arrow 12.0.0 or later return void, not Status
84 #else
85  RETURN_NOT_ARROW_OK(plan->StartProducing());
86 #endif
87 
88  // collect sink_reader into a Table
89  std::shared_ptr<arrow::Table> response_table;
90  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
91  response_table, arrow::Table::FromRecordBatchReader(sink_reader.get()));
92 
93  // stop producing
94  plan->StopProducing();
95  // plan mark finished
96  RETURN_NOT_ARROW_OK(plan->finished().status());
97  return response_table;
98 }
99 
100 // implementations for VertexPropertyChunkWriter
101 
103  const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
104  const ValidateLevel& validate_level)
105  : vertex_info_(vertex_info),
106  prefix_(prefix),
107  validate_level_(validate_level) {
108  if (validate_level_ == ValidateLevel::default_validate) {
109  throw std::runtime_error(
110  "default_validate is not allowed to be set as the global validate "
111  "level for VertexPropertyWriter");
112  }
113  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
114 }
115 
116 // Check if the operation of writing vertices number is allowed.
117 Status VertexPropertyWriter::validate(const IdType& count,
118  ValidateLevel validate_level) const {
119  // use the writer's validate level
120  if (validate_level == ValidateLevel::default_validate)
121  validate_level = validate_level_;
122  // no validate
123  if (validate_level == ValidateLevel::no_validate)
124  return Status::OK();
125  // weak & strong validate
126  if (count < 0) {
127  return Status::Invalid("The number of vertices is negative.");
128  }
129  return Status::OK();
130 }
131 
132 // Check if the operation of copying a file as a chunk is allowed.
133 Status VertexPropertyWriter::validate(
134  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
135  ValidateLevel validate_level) const {
136  // use the writer's validate level
137  if (validate_level == ValidateLevel::default_validate)
138  validate_level = validate_level_;
139  // no validate
140  if (validate_level == ValidateLevel::no_validate)
141  return Status::OK();
142  // weak & strong validate
143  if (!vertex_info_->HasPropertyGroup(property_group)) {
144  return Status::KeyError("The property group", " does not exist in ",
145  vertex_info_->GetLabel(), " vertex info.");
146  }
147  if (chunk_index < 0) {
148  return Status::IndexError("Negative chunk index ", chunk_index, ".");
149  }
150  return Status::OK();
151 }
152 
153 // Check if the operation of writing a table as a chunk is allowed.
154 Status VertexPropertyWriter::validate(
155  const std::shared_ptr<arrow::Table>& input_table,
156  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
157  ValidateLevel validate_level) const {
158  // use the writer's validate level
159  if (validate_level == ValidateLevel::default_validate) {
160  validate_level = validate_level_;
161  }
162  // no validate
163  if (validate_level == ValidateLevel::no_validate) {
164  return Status::OK();
165  }
166  // validate property_group & chunk_index
167  GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level));
168  // weak validate for the input_table
169  if (input_table->num_rows() > vertex_info_->GetChunkSize()) {
170  return Status::Invalid("The number of rows of input table is ",
171  input_table->num_rows(),
172  " which is larger than the vertex chunk size",
173  vertex_info_->GetChunkSize(), ".");
174  }
175  // strong validate for the input_table
176  if (validate_level == ValidateLevel::strong_validate) {
177  // validate the input table
178  RETURN_NOT_ARROW_OK(input_table->Validate());
179  // validate the schema
180  auto schema = input_table->schema();
181  for (auto& property : property_group->GetProperties()) {
182  int indice = schema->GetFieldIndex(property.name);
183  if (indice == -1) {
184  return Status::Invalid("Column named ", property.name,
185  " of property group ", property_group,
186  " does not exist in the input table.");
187  }
188  auto field = schema->field(indice);
189  if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
190  return Status::TypeError(
191  "The data type of property: ", property.name, " is ",
192  property.type->ToTypeName(), ", but got ",
193  DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
194  ".");
195  }
196  }
197  }
198  return Status::OK();
199 }
200 
202  const IdType& count, ValidateLevel validate_level) const {
203  GAR_RETURN_NOT_OK(validate(count, validate_level));
204  GAR_ASSIGN_OR_RAISE(auto suffix, vertex_info_->GetVerticesNumFilePath());
205  std::string path = prefix_ + suffix;
206  return fs_->WriteValueToFile<IdType>(count, path);
207 }
208 
210  const std::shared_ptr<arrow::Table>& input_table,
211  const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index,
212  ValidateLevel validate_level) const {
213  GAR_RETURN_NOT_OK(
214  validate(input_table, property_group, chunk_index, validate_level));
215  auto file_type = property_group->GetFileType();
216  auto schema = input_table->schema();
217  int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
218  if (indice == -1) {
219  return Status::Invalid("The internal id Column named ",
220  GeneralParams::kVertexIndexCol,
221  " does not exist in the input table.");
222  }
223 
224  std::vector<int> indices({indice});
225  for (auto& property : property_group->GetProperties()) {
226  int indice = schema->GetFieldIndex(property.name);
227  if (indice == -1) {
228  return Status::Invalid("Column named ", property.name,
229  " of property group ", property_group,
230  " of vertex ", vertex_info_->GetLabel(),
231  " does not exist in the input table.");
232  }
233  indices.push_back(indice);
234  }
235  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table,
236  input_table->SelectColumns(indices));
237  GAR_ASSIGN_OR_RAISE(auto suffix,
238  vertex_info_->GetFilePath(property_group, chunk_index));
239  std::string path = prefix_ + suffix;
240  return fs_->WriteTableToFile(in_table, file_type, path);
241 }
242 
244  const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index,
245  ValidateLevel validate_level) const {
246  auto property_groups = vertex_info_->GetPropertyGroups();
247  for (auto& property_group : property_groups) {
248  GAR_RETURN_NOT_OK(
249  WriteChunk(input_table, property_group, chunk_index, validate_level));
250  }
251  return Status::OK();
252 }
253 
255  const std::shared_ptr<arrow::Table>& input_table,
256  const std::shared_ptr<PropertyGroup>& property_group,
257  IdType start_chunk_index, ValidateLevel validate_level) const {
258  auto schema = input_table->schema();
259  int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
260  auto table_with_index = input_table;
261  if (indice == -1) {
262  // add index column
263  GAR_ASSIGN_OR_RAISE(table_with_index,
264  addIndexColumn(input_table, start_chunk_index,
265  vertex_info_->GetChunkSize()));
266  }
267  IdType chunk_size = vertex_info_->GetChunkSize();
268  int64_t length = table_with_index->num_rows();
269  IdType chunk_index = start_chunk_index;
270  for (int64_t offset = 0; offset < length;
271  offset += chunk_size, chunk_index++) {
272  auto in_chunk = table_with_index->Slice(offset, chunk_size);
273  GAR_RETURN_NOT_OK(
274  WriteChunk(in_chunk, property_group, chunk_index, validate_level));
275  }
276  return Status::OK();
277 }
278 
280  const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
281  ValidateLevel validate_level) const {
282  auto property_groups = vertex_info_->GetPropertyGroups();
283  GAR_ASSIGN_OR_RAISE(auto table_with_index,
284  addIndexColumn(input_table, start_chunk_index,
285  vertex_info_->GetChunkSize()));
286  for (auto& property_group : property_groups) {
287  GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group,
288  start_chunk_index, validate_level));
289  }
290  return Status::OK();
291 }
292 
293 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
294  const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
295  const ValidateLevel& validate_level) {
296  return std::make_shared<VertexPropertyWriter>(vertex_info, prefix,
297  validate_level);
298 }
299 
300 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
301  const std::shared_ptr<GraphInfo>& graph_info, const std::string& label,
302  const ValidateLevel& validate_level) {
303  auto vertex_info = graph_info->GetVertexInfo(label);
304  if (!vertex_info) {
305  return Status::KeyError("The vertex ", label, " doesn't exist.");
306  }
307  return Make(vertex_info, graph_info->GetPrefix(), validate_level);
308 }
309 
310 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::addIndexColumn(
311  const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
312  IdType chunk_size) const {
313  arrow::Int64Builder array_builder;
314  RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
315  int64_t length = table->num_rows();
316  for (IdType i = 0; i < length; i++) {
317  RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
318  }
319  std::shared_ptr<arrow::Array> array;
320  RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
321  std::shared_ptr<arrow::ChunkedArray> chunked_array =
322  std::make_shared<arrow::ChunkedArray>(array);
323  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
324  auto ret, table->AddColumn(0,
325  arrow::field(GeneralParams::kVertexIndexCol,
326  arrow::int64(), false),
327  chunked_array));
328  return ret;
329 }
330 
331 // implementations for EdgeChunkWriter
332 
333 EdgeChunkWriter::EdgeChunkWriter(const std::shared_ptr<EdgeInfo>& edge_info,
334  const std::string& prefix,
335  AdjListType adj_list_type,
336  const ValidateLevel& validate_level)
337  : edge_info_(edge_info),
338  adj_list_type_(adj_list_type),
339  validate_level_(validate_level) {
340  if (validate_level_ == ValidateLevel::default_validate) {
341  throw std::runtime_error(
342  "default_validate is not allowed to be set as the global validate "
343  "level for EdgeChunkWriter");
344  }
345  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
346  chunk_size_ = edge_info_->GetChunkSize();
347  switch (adj_list_type) {
348  case AdjListType::unordered_by_source:
349  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
350  break;
351  case AdjListType::ordered_by_source:
352  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
353  break;
354  case AdjListType::unordered_by_dest:
355  vertex_chunk_size_ = edge_info_->GetDstChunkSize();
356  break;
357  case AdjListType::ordered_by_dest:
358  vertex_chunk_size_ = edge_info_->GetDstChunkSize();
359  break;
360  default:
361  vertex_chunk_size_ = edge_info_->GetSrcChunkSize();
362  }
363 }
364 // Check if the operation of writing number or copying a file is allowed.
365 Status EdgeChunkWriter::validate(IdType count_or_index1, IdType count_or_index2,
366  ValidateLevel validate_level) const {
367  // use the writer's validate level
368  if (validate_level == ValidateLevel::default_validate)
369  validate_level = validate_level_;
370  // no validate
371  if (validate_level == ValidateLevel::no_validate)
372  return Status::OK();
373  // weak & strong validate for adj list type
374  if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
375  return Status::KeyError(
376  "Adj list type ", AdjListTypeToString(adj_list_type_),
377  " does not exist in the ", edge_info_->GetEdgeLabel(), " edge info.");
378  }
379  // weak & strong validate for count or index
380  if (count_or_index1 < 0 || count_or_index2 < 0) {
381  return Status::IndexError(
382  "The count or index must be non-negative, but got ", count_or_index1,
383  " and ", count_or_index2, ".");
384  }
385  return Status::OK();
386 }
387 
388 // Check if the operation of copying a file as a property chunk is allowed.
389 Status EdgeChunkWriter::validate(
390  const std::shared_ptr<PropertyGroup>& property_group,
391  IdType vertex_chunk_index, IdType chunk_index,
392  ValidateLevel validate_level) const {
393  // use the writer's validate level
394  if (validate_level == ValidateLevel::default_validate)
395  validate_level = validate_level_;
396  // no validate
397  if (validate_level == ValidateLevel::no_validate)
398  return Status::OK();
399  // validate for adj list type & index
400  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
401  // weak & strong validate for property group
402  if (!edge_info_->HasPropertyGroup(property_group)) {
403  return Status::KeyError("Property group", " does not exist in the ",
404  edge_info_->GetEdgeLabel(), " edge info.");
405  }
406  return Status::OK();
407 }
408 
409 // Check if the operation of writing a table as an offset chunk is allowed.
410 Status EdgeChunkWriter::validate(
411  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
412  ValidateLevel validate_level) const {
413  // use the writer's validate level
414  if (validate_level == ValidateLevel::default_validate)
415  validate_level = validate_level_;
416  // no validate
417  if (validate_level == ValidateLevel::no_validate)
418  return Status::OK();
419  // validate for adj list type & index
420  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level));
421  // weak validate for the input table
422  if (adj_list_type_ != AdjListType::ordered_by_source &&
423  adj_list_type_ != AdjListType::ordered_by_dest) {
424  return Status::Invalid(
425  "The adj list type has to be ordered_by_source or ordered_by_dest, but "
426  "got " +
427  std::string(AdjListTypeToString(adj_list_type_)));
428  }
429  if (adj_list_type_ == AdjListType::ordered_by_source &&
430  input_table->num_rows() > edge_info_->GetSrcChunkSize() + 1) {
431  return Status::Invalid(
432  "The number of rows of input offset table is ", input_table->num_rows(),
433  " which is larger than the offset size of source vertex chunk ",
434  edge_info_->GetSrcChunkSize() + 1, ".");
435  }
436  if (adj_list_type_ == AdjListType::ordered_by_dest &&
437  input_table->num_rows() > edge_info_->GetDstChunkSize() + 1) {
438  return Status::Invalid(
439  "The number of rows of input offset table is ", input_table->num_rows(),
440  " which is larger than the offset size of destination vertex chunk ",
441  edge_info_->GetSrcChunkSize() + 1, ".");
442  }
443  // strong validate for the input_table
444  if (validate_level == ValidateLevel::strong_validate) {
445  // validate the input table
446  RETURN_NOT_ARROW_OK(input_table->Validate());
447  // validate the schema
448  auto schema = input_table->schema();
449  int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
450  if (index == -1) {
451  return Status::Invalid("The offset column ", GeneralParams::kOffsetCol,
452  " does not exist in the input table");
453  }
454  auto field = schema->field(index);
455  if (field->type()->id() != arrow::Type::INT64) {
456  return Status::TypeError(
457  "The data type for offset column should be INT64, but got ",
458  field->type()->name());
459  }
460  }
461  return Status::OK();
462 }
463 
464 // Check if the operation of writing a table as an adj list chunk is allowed.
465 Status EdgeChunkWriter::validate(
466  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
467  IdType chunk_index, ValidateLevel validate_level) const {
468  // use the writer's validate level
469  if (validate_level == ValidateLevel::default_validate)
470  validate_level = validate_level_;
471  // no validate
472  if (validate_level == ValidateLevel::no_validate)
473  return Status::OK();
474  // validate for adj list type & index
475  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level));
476  // weak validate for the input table
477  if (input_table->num_rows() > edge_info_->GetChunkSize()) {
478  return Status::Invalid(
479  "The number of rows of input table is ", input_table->num_rows(),
480  " which is larger than the ", edge_info_->GetEdgeLabel(),
481  " edge chunk size ", edge_info_->GetChunkSize(), ".");
482  }
483  // strong validate for the input table
484  if (validate_level == ValidateLevel::strong_validate) {
485  auto schema = input_table->schema();
486  int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
487  if (index == -1) {
488  return Status::Invalid("The source index column ",
489  GeneralParams::kSrcIndexCol,
490  " does not exist in the input table");
491  }
492  auto field = schema->field(index);
493  if (field->type()->id() != arrow::Type::INT64) {
494  return Status::TypeError(
495  "The data type for source index column should be INT64, but got ",
496  field->type()->name());
497  }
498  index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
499  if (index == -1) {
500  return Status::Invalid("The destination index column ",
501  GeneralParams::kDstIndexCol,
502  " does not exist in the input table");
503  }
504  field = schema->field(index);
505  if (field->type()->id() != arrow::Type::INT64) {
506  return Status::TypeError(
507  "The data type for destination index column should be INT64, but "
508  "got ",
509  field->type()->name());
510  }
511  }
512  return Status::OK();
513 }
514 
515 // Check if the operation of writing a table as a property chunk is allowed.
516 Status EdgeChunkWriter::validate(
517  const std::shared_ptr<arrow::Table>& input_table,
518  const std::shared_ptr<PropertyGroup>& property_group,
519  IdType vertex_chunk_index, IdType chunk_index,
520  ValidateLevel validate_level) const {
521  // use the writer's validate level
522  if (validate_level == ValidateLevel::default_validate)
523  validate_level = validate_level_;
524  // no validate
525  if (validate_level == ValidateLevel::no_validate)
526  return Status::OK();
527  // validate for property group, adj list type & index
528  GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index,
529  validate_level));
530  // weak validate for the input table
531  if (input_table->num_rows() > edge_info_->GetChunkSize()) {
532  return Status::Invalid(
533  "The number of rows of input table is ", input_table->num_rows(),
534  " which is larger than the ", edge_info_->GetEdgeLabel(),
535  " edge chunk size ", edge_info_->GetChunkSize(), ".");
536  }
537  // strong validate for the input table
538  if (validate_level == ValidateLevel::strong_validate) {
539  // validate the input table
540  RETURN_NOT_ARROW_OK(input_table->Validate());
541  // validate the schema
542  auto schema = input_table->schema();
543  for (auto& property : property_group->GetProperties()) {
544  int indice = schema->GetFieldIndex(property.name);
545  if (indice == -1) {
546  return Status::Invalid("Column named ", property.name,
547  " of property group ", property_group,
548  " does not exist in the input table.");
549  }
550  auto field = schema->field(indice);
551  if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) {
552  return Status::TypeError(
553  "The data type of property: ", property.name, " is ",
554  property.type->ToTypeName(), ", but got ",
555  DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(),
556  ".");
557  }
558  }
559  }
560  return Status::OK();
561 }
562 
563 Status EdgeChunkWriter::WriteEdgesNum(IdType vertex_chunk_index,
564  const IdType& count,
565  ValidateLevel validate_level) const {
566  GAR_RETURN_NOT_OK(validate(vertex_chunk_index, count, validate_level));
567  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetEdgesNumFilePath(
568  vertex_chunk_index, adj_list_type_));
569  std::string path = prefix_ + suffix;
570  return fs_->WriteValueToFile<IdType>(count, path);
571 }
572 
574  ValidateLevel validate_level) const {
575  GAR_RETURN_NOT_OK(validate(0, count, validate_level));
576  GAR_ASSIGN_OR_RAISE(auto suffix,
577  edge_info_->GetVerticesNumFilePath(adj_list_type_));
578  std::string path = prefix_ + suffix;
579  return fs_->WriteValueToFile<IdType>(count, path);
580 }
581 
583  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
584  ValidateLevel validate_level) const {
585  GAR_RETURN_NOT_OK(validate(input_table, vertex_chunk_index, validate_level));
586  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
587  auto schema = input_table->schema();
588  int index = schema->GetFieldIndex(GeneralParams::kOffsetCol);
589  if (index == -1) {
590  return Status::Invalid("The offset column ", GeneralParams::kOffsetCol,
591  " does not exist in the input table");
592  }
593  auto in_table = input_table->SelectColumns({index}).ValueOrDie();
594  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath(
595  vertex_chunk_index, adj_list_type_));
596  std::string path = prefix_ + suffix;
597  return fs_->WriteTableToFile(in_table, file_type, path);
598 }
599 
601  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
602  IdType chunk_index, ValidateLevel validate_level) const {
603  GAR_RETURN_NOT_OK(
604  validate(input_table, vertex_chunk_index, chunk_index, validate_level));
605  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
606  std::vector<int> indices;
607  indices.clear();
608  auto schema = input_table->schema();
609  int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol);
610  if (index == -1) {
611  return Status::Invalid("The source index column ",
612  GeneralParams::kSrcIndexCol,
613  " does not exist in the input table");
614  }
615  indices.push_back(index);
616  index = schema->GetFieldIndex(GeneralParams::kDstIndexCol);
617  if (index == -1) {
618  return Status::Invalid("The destination index column ",
619  GeneralParams::kDstIndexCol,
620  " does not exist in the input table");
621  }
622  indices.push_back(index);
623  auto in_table = input_table->SelectColumns(indices).ValueOrDie();
624 
625  GAR_ASSIGN_OR_RAISE(
626  auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
627  chunk_index, adj_list_type_));
628  std::string path = prefix_ + suffix;
629  return fs_->WriteTableToFile(in_table, file_type, path);
630 }
631 
633  const std::shared_ptr<arrow::Table>& input_table,
634  const std::shared_ptr<PropertyGroup>& property_group,
635  IdType vertex_chunk_index, IdType chunk_index,
636  ValidateLevel validate_level) const {
637  GAR_RETURN_NOT_OK(validate(input_table, property_group, vertex_chunk_index,
638  chunk_index, validate_level));
639  auto file_type = property_group->GetFileType();
640 
641  std::vector<int> indices;
642  indices.clear();
643  auto schema = input_table->schema();
644  for (auto& property : property_group->GetProperties()) {
645  int indice = schema->GetFieldIndex(property.name);
646  if (indice == -1) {
647  return Status::Invalid("Column named ", property.name,
648  " of property group ", property_group, " of edge ",
649  edge_info_->GetEdgeLabel(),
650  " does not exist in the input table.");
651  }
652  indices.push_back(indice);
653  }
654  auto in_table = input_table->SelectColumns(indices).ValueOrDie();
655  GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetPropertyFilePath(
656  property_group, adj_list_type_,
657  vertex_chunk_index, chunk_index));
658  std::string path = prefix_ + suffix;
659  return fs_->WriteTableToFile(in_table, file_type, path);
660 }
661 
663  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
664  IdType chunk_index, ValidateLevel validate_level) const {
665  const auto& property_groups = edge_info_->GetPropertyGroups();
666  for (auto& property_group : property_groups) {
667  GAR_RETURN_NOT_OK(WritePropertyChunk(input_table, property_group,
668  vertex_chunk_index, chunk_index,
669  validate_level));
670  }
671  return Status::OK();
672 }
673 
675  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
676  IdType chunk_index, ValidateLevel validate_level) const {
677  GAR_RETURN_NOT_OK(WriteAdjListChunk(input_table, vertex_chunk_index,
678  chunk_index, validate_level));
679  return WritePropertyChunk(input_table, vertex_chunk_index, chunk_index,
680  validate_level);
681 }
682 
684  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
685  IdType start_chunk_index, ValidateLevel validate_level) const {
686  int64_t length = input_table->num_rows();
687  IdType chunk_index = start_chunk_index;
688  for (int64_t offset = 0; offset < length;
689  offset += chunk_size_, chunk_index++) {
690  auto in_chunk = input_table->Slice(offset, chunk_size_);
691  GAR_RETURN_NOT_OK(WriteAdjListChunk(in_chunk, vertex_chunk_index,
692  chunk_index, validate_level));
693  }
694  return Status::OK();
695 }
696 
698  const std::shared_ptr<arrow::Table>& input_table,
699  const std::shared_ptr<PropertyGroup>& property_group,
700  IdType vertex_chunk_index, IdType start_chunk_index,
701  ValidateLevel validate_level) const {
702  int64_t length = input_table->num_rows();
703  IdType chunk_index = start_chunk_index;
704  for (int64_t offset = 0; offset < length;
705  offset += chunk_size_, chunk_index++) {
706  auto in_chunk = input_table->Slice(offset, chunk_size_);
707  GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, property_group,
708  vertex_chunk_index, chunk_index,
709  validate_level));
710  }
711  return Status::OK();
712 }
713 
715  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
716  IdType start_chunk_index, ValidateLevel validate_level) const {
717  int64_t length = input_table->num_rows();
718  IdType chunk_index = start_chunk_index;
719  for (int64_t offset = 0; offset < length;
720  offset += chunk_size_, chunk_index++) {
721  auto in_chunk = input_table->Slice(offset, chunk_size_);
722  GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, vertex_chunk_index,
723  chunk_index, validate_level));
724  }
725  return Status::OK();
726 }
727 
729  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
730  IdType start_chunk_index, ValidateLevel validate_level) const {
731  int64_t length = input_table->num_rows();
732  IdType chunk_index = start_chunk_index;
733  for (int64_t offset = 0; offset < length;
734  offset += chunk_size_, chunk_index++) {
735  auto in_chunk = input_table->Slice(offset, chunk_size_);
736  GAR_RETURN_NOT_OK(
737  WriteChunk(in_chunk, vertex_chunk_index, chunk_index, validate_level));
738  }
739  return Status::OK();
740 }
741 
743  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
744  IdType start_chunk_index, ValidateLevel validate_level) const {
745  GAR_ASSIGN_OR_RAISE(
746  auto response_table,
747  sortTable(input_table, getSortColumnName(adj_list_type_)));
748  if (adj_list_type_ == AdjListType::ordered_by_source ||
749  adj_list_type_ == AdjListType::ordered_by_dest) {
750  GAR_ASSIGN_OR_RAISE(
751  auto offset_table,
752  getOffsetTable(response_table, getSortColumnName(adj_list_type_),
753  vertex_chunk_index));
754  GAR_RETURN_NOT_OK(
755  WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level));
756  }
757  return WriteAdjListTable(response_table, vertex_chunk_index,
758  start_chunk_index, validate_level);
759 }
760 
762  const std::shared_ptr<arrow::Table>& input_table,
763  const std::shared_ptr<PropertyGroup>& property_group,
764  IdType vertex_chunk_index, IdType start_chunk_index,
765  ValidateLevel validate_level) const {
766  GAR_ASSIGN_OR_RAISE(
767  auto response_table,
768  sortTable(input_table, getSortColumnName(adj_list_type_)));
769  return WritePropertyTable(response_table, property_group, vertex_chunk_index,
770  start_chunk_index, validate_level);
771 }
772 
774  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
775  IdType start_chunk_index, ValidateLevel validate_level) const {
776  GAR_ASSIGN_OR_RAISE(
777  auto response_table,
778  sortTable(input_table, getSortColumnName(adj_list_type_)));
779  return WritePropertyTable(response_table, vertex_chunk_index,
780  start_chunk_index, validate_level);
781 }
782 
784  const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index,
785  IdType start_chunk_index, ValidateLevel validate_level) const {
786  GAR_ASSIGN_OR_RAISE(
787  auto response_table,
788  sortTable(input_table, getSortColumnName(adj_list_type_)));
789 
790  if (adj_list_type_ == AdjListType::ordered_by_source ||
791  adj_list_type_ == AdjListType::ordered_by_dest) {
792  GAR_ASSIGN_OR_RAISE(
793  auto offset_table,
794  getOffsetTable(response_table, getSortColumnName(adj_list_type_),
795  vertex_chunk_index));
796  GAR_RETURN_NOT_OK(
797  WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level));
798  }
799 
800  return WriteTable(response_table, vertex_chunk_index, start_chunk_index,
801  validate_level);
802 }
803 
804 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::getOffsetTable(
805  const std::shared_ptr<arrow::Table>& input_table,
806  const std::string& column_name, IdType vertex_chunk_index) const {
807  std::shared_ptr<arrow::ChunkedArray> column =
808  input_table->GetColumnByName(column_name);
809  int64_t array_index = 0, index = 0;
810  auto ids =
811  std::static_pointer_cast<arrow::Int64Array>(column->chunk(array_index));
812 
813  arrow::Int64Builder builder;
814  IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
815  end_index = begin_index + vertex_chunk_size_;
816  RETURN_NOT_ARROW_OK(builder.Append(0));
817  std::vector<std::shared_ptr<arrow::Array>> arrays;
818  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
819  std::string property = GeneralParams::kOffsetCol;
820  schema_vector.push_back(
821  arrow::field(property, DataType::DataTypeToArrowDataType(int64())));
822 
823  int64_t global_index = 0;
824  for (IdType i = begin_index; i < end_index; i++) {
825  while (true) {
826  if (array_index >= column->num_chunks())
827  break;
828  if (index >= ids->length()) {
829  array_index++;
830  if (array_index == column->num_chunks())
831  break;
832  ids = std::static_pointer_cast<arrow::Int64Array>(
833  column->chunk(array_index));
834  index = 0;
835  }
836  if (ids->IsNull(index) || !ids->IsValid(index)) {
837  index++;
838  global_index++;
839  continue;
840  }
841  int64_t x = ids->Value(index);
842  if (x <= i) {
843  index++;
844  global_index++;
845  } else {
846  break;
847  }
848  }
849  RETURN_NOT_ARROW_OK(builder.Append(global_index));
850  }
851 
852  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish());
853  arrays.push_back(array);
854  auto schema = std::make_shared<arrow::Schema>(schema_vector);
855  return arrow::Table::Make(schema, arrays);
856 }
857 
858 Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
859  const std::shared_ptr<arrow::Table>& input_table,
860  const std::string& column_name) {
861  auto exec_context = arrow::compute::default_exec_context();
862  auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie();
863  auto table_source_options =
864  arrow_acero_namespace::TableSourceNodeOptions{input_table};
865  auto source = arrow_acero_namespace::MakeExecNode("table_source", plan.get(),
866  {}, table_source_options)
867  .ValueOrDie();
868  AsyncGeneratorType sink_gen;
869  RETURN_NOT_ARROW_OK(
870  arrow_acero_namespace::MakeExecNode(
871  "order_by_sink", plan.get(), {source},
872  arrow_acero_namespace::OrderBySinkNodeOptions{
873  arrow::compute::SortOptions{{arrow::compute::SortKey{
874  column_name, arrow::compute::SortOrder::Ascending}}},
875  &sink_gen})
876  .status());
877  return ExecutePlanAndCollectAsTable(*exec_context, plan,
878  input_table->schema(), sink_gen);
879 }
880 
881 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
882  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
883  AdjListType adj_list_type, const ValidateLevel& validate_level) {
884  if (!edge_info->HasAdjacentListType(adj_list_type)) {
885  return Status::KeyError(
886  "The adjacent list type ", AdjListTypeToString(adj_list_type),
887  " doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
888  }
889  return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
890  validate_level);
891 }
892 
893 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
894  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label,
895  const std::string& edge_label, const std::string& dst_label,
896  AdjListType adj_list_type, const ValidateLevel& validate_level) {
897  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
898  if (!edge_info) {
899  return Status::KeyError("The edge ", src_label, " ", edge_label, " ",
900  dst_label, " doesn't exist.");
901  }
902  return Make(edge_info, graph_info->GetPrefix(), adj_list_type,
903  validate_level);
904 }
905 
906 std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
907  switch (adj_list_type) {
908  case AdjListType::unordered_by_source:
909  return GeneralParams::kSrcIndexCol;
910  case AdjListType::ordered_by_source:
911  return GeneralParams::kSrcIndexCol;
912  case AdjListType::unordered_by_dest:
913  return GeneralParams::kDstIndexCol;
914  case AdjListType::ordered_by_dest:
915  return GeneralParams::kDstIndexCol;
916  default:
917  return GeneralParams::kSrcIndexCol;
918  }
919  return GeneralParams::kSrcIndexCol;
920 }
921 } // namespace graphar
Status WritePropertyChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single edge property group for an edge chunk.
Status SortAndWriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write the adj list chunks for the edges of a vertex chunk.
Status WriteAdjListTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list chunks for the edges of a vertex chunk.
Status WritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of a single property group for the edges of a vertex chunk.
EdgeChunkWriter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the EdgeChunkWriter.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the adj list and all property groups for an edge chunk.
Status SortAndWriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of the adj list and all property groups for the edges of a vertex ch...
Status SortAndWritePropertyTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Sort the edges, and write chunks of a single property group for the edges of a vertex chunk.
Status WriteAdjListChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the adj list chunk for an edge chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
Definition: status.h:123
static Status IndexError(Args &&... args)
Definition: status.h:197
static Status TypeError(Args &&... args)
Definition: status.h:178
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
VertexPropertyWriter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Initialize the VertexPropertyWriter.
Status WriteChunk(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write a single property group for a single vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, const std::shared_ptr< PropertyGroup > &property_group, IdType start_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write a single property group for multiple vertex chunks to corresponding files.
static Result< std::shared_ptr< VertexPropertyWriter > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const ValidateLevel &validate_level=ValidateLevel::no_validate)
Construct a VertexPropertyWriter from vertex info.