Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
chunk_reader.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <algorithm>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 #include "arrow/api.h"
26 #include "arrow/compute/api.h"
27 
28 #include "graphar/arrow/chunk_reader.h"
29 #include "graphar/filesystem.h"
30 #include "graphar/fwd.h"
31 #include "graphar/general_params.h"
32 #include "graphar/graph_info.h"
33 #include "graphar/reader_util.h"
34 #include "graphar/result.h"
35 #include "graphar/status.h"
36 #include "graphar/types.h"
37 #include "graphar/util.h"
38 
39 namespace graphar {
40 
41 namespace {
42 
43 Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
44  const std::shared_ptr<PropertyGroup> pg,
45  bool contain_index_column = false) {
46  std::vector<std::shared_ptr<arrow::Field>> fields;
47  if (contain_index_column) {
48  fields.push_back(std::make_shared<arrow::Field>(
49  GeneralParams::kVertexIndexCol, arrow::int64()));
50  }
51  for (const auto& prop : pg->GetProperties()) {
52  auto dataType = DataType::DataTypeToArrowDataType(prop.type);
53  if (prop.cardinality != Cardinality::SINGLE) {
54  dataType = arrow::list(dataType);
55  }
56  fields.push_back(std::make_shared<arrow::Field>(prop.name, dataType));
57  }
58  return arrow::schema(fields);
59 }
60 
61 Result<std::shared_ptr<arrow::Schema>> LabelToSchema(
62  std::vector<std::string> labels, bool contain_index_column = false) {
63  std::vector<std::shared_ptr<arrow::Field>> fields;
64  if (contain_index_column) {
65  fields.push_back(std::make_shared<arrow::Field>(
66  GeneralParams::kVertexIndexCol, arrow::int64()));
67  }
68  for (const auto& lab : labels) {
69  fields.push_back(std::make_shared<arrow::Field>(lab, arrow::boolean()));
70  }
71  return arrow::schema(fields);
72 }
73 Status GeneralCast(const std::shared_ptr<arrow::Array>& in,
74  const std::shared_ptr<arrow::DataType>& to_type,
75  std::shared_ptr<arrow::Array>* out) {
76  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
77  arrow::compute::Cast(*in, to_type));
78  return Status::OK();
79 }
80 
81 Status CastStringToLargeString(const std::shared_ptr<arrow::Array>& in,
82  const std::shared_ptr<arrow::DataType>& to_type,
83  std::shared_ptr<arrow::Array>* out) {
84  auto array_data = in->data()->Copy();
85  auto offset = array_data->buffers[1];
86  using from_offset_type = typename arrow::StringArray::offset_type;
87  using to_string_offset_type = typename arrow::LargeStringArray::offset_type;
88  auto raw_value_offsets_ =
89  offset == NULLPTR
90  ? NULLPTR
91  : reinterpret_cast<const from_offset_type*>(offset->data());
92  std::vector<to_string_offset_type> to_offset(offset->size() /
93  sizeof(from_offset_type));
94  for (size_t i = 0; i < to_offset.size(); ++i) {
95  to_offset[i] = raw_value_offsets_[i];
96  }
97  std::shared_ptr<arrow::Buffer> buffer;
98  arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
99  RETURN_NOT_ARROW_OK(
100  buffer_builder.Append(to_offset.data(), to_offset.size()));
101  RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
102  array_data->type = to_type;
103  array_data->buffers[1] = buffer;
104  *out = arrow::MakeArray(array_data);
105  RETURN_NOT_ARROW_OK((*out)->ValidateFull());
106  return Status::OK();
107 }
108 
109 // helper function to cast arrow::Table with a schema
110 Status CastTableWithSchema(const std::shared_ptr<arrow::Table>& table,
111  const std::shared_ptr<arrow::Schema>& schema,
112  std::shared_ptr<arrow::Table>* out_table) {
113  if (table->schema()->Equals(*schema)) {
114  *out_table = table;
115  }
116  std::vector<std::shared_ptr<arrow::Field>> fields;
117  std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
118  for (int64_t i = 0; i < table->num_columns(); ++i) {
119  auto column = table->column(i);
120  auto table_field = table->field(i);
121  auto field_name = table_field->name();
122 
123  auto schema_field = schema->GetFieldByName(field_name);
124  if (table_field->type()->Equals(schema_field->type())) {
125  columns.push_back(column);
126  fields.push_back(table_field);
127  continue;
128  }
129  auto from_t = table_field->type();
130  auto to_t = schema_field->type();
131  std::vector<std::shared_ptr<arrow::Array>> chunks;
132  // process cast for each chunk
133  for (int64_t j = 0; j < column->num_chunks(); ++j) {
134  auto chunk = column->chunk(j);
135  std::shared_ptr<arrow::Array> out;
136  if (arrow::compute::CanCast(*from_t, *to_t)) {
137  GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
138  chunks.push_back(out);
139  } else if (from_t->Equals(arrow::utf8()) &&
140  to_t->Equals(arrow::large_utf8())) {
141  GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
142  chunks.push_back(out);
143  }
144  }
145  fields.push_back(arrow::field(field_name, to_t));
146  columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
147  }
148  auto new_schema = std::make_shared<arrow::Schema>(fields);
149  *out_table = arrow::Table::Make(new_schema, columns);
150  return Status::OK();
151 }
152 } // namespace
153 
154 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
155  const std::shared_ptr<VertexInfo>& vertex_info,
156  const std::shared_ptr<PropertyGroup>& property_group,
157  const std::string& prefix, const util::FilterOptions& options)
158  : VertexPropertyArrowChunkReader(vertex_info, property_group, {}, prefix,
159  options) {}
160 
161 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
162  const std::shared_ptr<VertexInfo>& vertex_info,
163  const std::shared_ptr<PropertyGroup>& property_group,
164  const std::vector<std::string>& property_names, const std::string& prefix,
165  const util::FilterOptions& options)
166  : vertex_info_(std::move(vertex_info)),
167  property_group_(std::move(property_group)),
168  property_names_(std::move(property_names)),
169  chunk_index_(0),
170  seek_id_(0),
171  schema_(nullptr),
172  chunk_table_(nullptr),
173  filter_options_(options) {
174  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
175  GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
176  vertex_info->GetPathPrefix(property_group));
177  std::string base_dir = prefix_ + pg_path_prefix;
178  GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
179  util::GetVertexChunkNum(prefix_, vertex_info));
180  GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
181  util::GetVertexNum(prefix_, vertex_info_));
182  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
183  PropertyGroupToSchema(property_group_, true));
184 }
185 
186 // initialize for labels
187 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
188  const std::shared_ptr<VertexInfo>& vertex_info,
189  const std::vector<std::string>& labels, const std::string& prefix,
190  const util::FilterOptions& options)
191  : vertex_info_(std::move(vertex_info)),
192  labels_(labels),
193  chunk_index_(0),
194  seek_id_(0),
195  schema_(nullptr),
196  chunk_table_(nullptr),
197  filter_options_(options) {
198  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
199 
200  std::string base_dir = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" +
201  std::to_string(chunk_index_);
202  GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
203  util::GetVertexChunkNum(prefix_, vertex_info));
204  GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
205  util::GetVertexNum(prefix_, vertex_info_));
206  GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels));
207 }
208 
210  seek_id_ = id;
211  IdType pre_chunk_index = chunk_index_;
212  chunk_index_ = id / vertex_info_->GetChunkSize();
213  if (chunk_index_ != pre_chunk_index) {
214  // TODO(@acezen): use a cache to avoid reloading the same chunk, could use
215  // a LRU cache.
216  chunk_table_.reset();
217  }
218  if (chunk_index_ >= chunk_num_) {
219  return Status::IndexError("Internal vertex id ", id, " is out of range [0,",
220  chunk_num_ * vertex_info_->GetChunkSize(),
221  ") of vertex ", vertex_info_->GetType());
222  }
223  return Status::OK();
224 }
225 
226 Result<std::shared_ptr<arrow::Table>>
227 VertexPropertyArrowChunkReader::GetChunkV2() {
228  if (chunk_table_ == nullptr) {
229  GAR_ASSIGN_OR_RAISE(
230  auto chunk_file_path,
231  vertex_info_->GetFilePath(property_group_, chunk_index_));
232  std::vector<int> column_indices = {};
233  std::vector<std::string> property_names;
234  if (!filter_options_.columns && !property_names_.empty()) {
235  property_names = property_names_;
236  } else {
237  if (!property_names_.empty()) {
238  for (const auto& col : filter_options_.columns.value().get()) {
239  if (std::find(property_names_.begin(), property_names_.end(), col) ==
240  property_names_.end()) {
241  return Status::Invalid("Column ", col,
242  " is not in select properties.");
243  }
244  property_names.push_back(col);
245  }
246  }
247  }
248  for (const auto& col : property_names) {
249  auto field_index = schema_->GetFieldIndex(col);
250  if (field_index == -1) {
251  return Status::Invalid("Column ", col, " is not in select properties.");
252  }
253  column_indices.push_back(field_index);
254  }
255  std::string path = prefix_ + chunk_file_path;
256  GAR_ASSIGN_OR_RAISE(
257  chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
258  column_indices));
259  if (schema_ != nullptr && filter_options_.filter == nullptr) {
260  GAR_RETURN_NOT_OK(
261  CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
262  }
263  }
264  IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
265  return chunk_table_->Slice(row_offset);
266 }
267 
268 Result<std::shared_ptr<arrow::Table>>
269 VertexPropertyArrowChunkReader::GetChunkV1() {
270  GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
271  if (chunk_table_ == nullptr) {
272  GAR_ASSIGN_OR_RAISE(
273  auto chunk_file_path,
274  vertex_info_->GetFilePath(property_group_, chunk_index_));
275  std::string path = prefix_ + chunk_file_path;
276  if (property_names_.empty()) {
277  GAR_ASSIGN_OR_RAISE(
278  chunk_table_,
279  fs_->ReadFileToTable(path, property_group_->GetFileType(),
280  filter_options_));
281  } else {
282  util::FilterOptions temp_filter_options;
283  temp_filter_options.filter = filter_options_.filter;
284  std::vector<std::string> intersection_columns;
285  if (!filter_options_.columns) {
286  temp_filter_options.columns = std::ref(property_names_);
287  } else {
288  for (const auto& col : filter_options_.columns.value().get()) {
289  if (std::find(property_names_.begin(), property_names_.end(), col) ==
290  property_names_.end()) {
291  return Status::Invalid("Column ", col,
292  " is not in select properties.");
293  }
294  }
295  temp_filter_options.columns = filter_options_.columns;
296  }
297  GAR_ASSIGN_OR_RAISE(
298  chunk_table_,
299  fs_->ReadFileToTable(path, property_group_->GetFileType(),
300  temp_filter_options));
301  }
302  // TODO(acezen): filter pushdown doesn't support cast schema now
303  if (schema_ != nullptr && filter_options_.filter == nullptr) {
304  GAR_RETURN_NOT_OK(
305  CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
306  }
307  }
308  IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
309  return chunk_table_->Slice(row_offset);
310 }
311 
312 Result<std::shared_ptr<arrow::Table>> VertexPropertyArrowChunkReader::GetChunk(
313  GetChunkVersion version) {
314  switch (version) {
315  case GetChunkVersion::V1:
316  return GetChunkV1();
317  case GetChunkVersion::V2:
318  return GetChunkV2();
319  case GetChunkVersion::AUTO:
320  if (filter_options_.filter != nullptr) {
321  return GetChunkV1();
322  } else {
323  return GetChunkV2();
324  }
325  default:
326  return Status::Invalid("unsupport GetChunkVersion ", version);
327  }
328 }
329 
330 Result<std::shared_ptr<arrow::Table>>
332  FileType filetype = FileType::PARQUET;
333  if (chunk_table_ == nullptr) {
334  std::string path = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" +
335  std::to_string(chunk_index_);
336  GAR_ASSIGN_OR_RAISE(chunk_table_,
337  fs_->ReadFileToTable(path, filetype, filter_options_));
338  // TODO(acezen): filter pushdown doesn't support cast schema now
339  // if (schema_ != nullptr && filter_options_.filter == nullptr) {
340  // GAR_RETURN_NOT_OK(
341  // CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
342  // }
343  }
344  IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
345  return chunk_table_->Slice(row_offset);
346 }
347 
349  if (++chunk_index_ >= chunk_num_) {
350  return Status::IndexError(
351  "vertex chunk index ", chunk_index_, " is out-of-bounds for vertex ",
352  vertex_info_->GetType(), " chunk num ", chunk_num_);
353  }
354  seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
355  chunk_table_.reset();
356 
357  return Status::OK();
358 }
359 
360 void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
361  filter_options_.filter = filter;
362 }
363 
364 void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
365  filter_options_.columns = column_names;
366 }
367 
368 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
370  const std::shared_ptr<VertexInfo>& vertex_info,
371  const std::shared_ptr<PropertyGroup>& property_group,
372  const std::string& prefix, const util::FilterOptions& options) {
373  return std::make_shared<VertexPropertyArrowChunkReader>(
374  vertex_info, property_group, prefix, options);
375 }
376 
377 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
379  const std::shared_ptr<VertexInfo>& vertex_info,
380  const std::shared_ptr<PropertyGroup>& property_group,
381  const std::vector<std::string>& property_names, const std::string& prefix,
382  const util::FilterOptions& options) {
383  return std::make_shared<VertexPropertyArrowChunkReader>(
384  vertex_info, property_group, property_names, prefix, options);
385 }
386 
387 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
389  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
390  const std::shared_ptr<PropertyGroup>& property_group,
391  const util::FilterOptions& options) {
392  auto vertex_info = graph_info->GetVertexInfo(type);
393  if (!vertex_info) {
394  return Status::KeyError("The vertex type ", type,
395  " doesn't exist in graph ", graph_info->GetName(),
396  ".");
397  }
398  return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
399 }
400 
401 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
403  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
404  const std::string& property_name, const util::FilterOptions& options) {
405  auto vertex_info = graph_info->GetVertexInfo(type);
406  if (!vertex_info) {
407  return Status::KeyError("The vertex type ", type,
408  " doesn't exist in graph ", graph_info->GetName(),
409  ".");
410  }
411  auto property_group = vertex_info->GetPropertyGroup(property_name);
412  if (!property_group) {
413  return Status::KeyError("The property ", property_name,
414  " doesn't exist in vertex type ", type, ".");
415  }
416  std::vector<std::string> property_names = {property_name};
417  if (property_name != graphar::GeneralParams::kVertexIndexCol) {
418  property_names.insert(property_names.begin(),
419  graphar::GeneralParams::kVertexIndexCol);
420  }
421  return Make(vertex_info, property_group, property_names,
422  graph_info->GetPrefix(), options);
423 }
424 
425 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
427  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
428  const std::vector<std::string>& property_names_or_labels,
429  const SelectType select_type, const util::FilterOptions& options) {
430  switch (select_type) {
431  case SelectType::LABELS:
432  return MakeForLabels(graph_info, type, property_names_or_labels, options);
433  case SelectType::PROPERTIES:
434  return MakeForProperties(graph_info, type, property_names_or_labels,
435  options);
436  }
437 }
438 
439 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
441  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
442  const std::vector<std::string>& property_names,
443  const util::FilterOptions& options) {
444  auto vertex_info = graph_info->GetVertexInfo(type);
445  if (!vertex_info) {
446  return Status::KeyError("The vertex type ", type,
447  " doesn't exist in graph ", graph_info->GetName(),
448  ".");
449  }
450  if (property_names.empty()) {
451  return Status::Invalid("The property names cannot be empty.");
452  }
453  bool hasIndexCol = false;
454  std::vector<std::string> property_names_mutable = property_names;
455  if (property_names_mutable[property_names_mutable.size() - 1] ==
456  graphar::GeneralParams::kVertexIndexCol) {
457  hasIndexCol = true;
458  std::iter_swap(property_names_mutable.begin(),
459  property_names_mutable.end() - 1);
460  }
461  auto property_group = vertex_info->GetPropertyGroup(
462  property_names_mutable[property_names_mutable.size() - 1]);
463  if (!property_group) {
464  return Status::KeyError(
465  "The property ",
466  property_names_mutable[property_names_mutable.size() - 1],
467  " doesn't exist in vertex type ", type, ".");
468  }
469  for (int i = 0; i < property_names_mutable.size() - 1; i++) {
470  if (property_names_mutable[i] == graphar::GeneralParams::kVertexIndexCol) {
471  hasIndexCol = true;
472  }
473  auto pg = vertex_info->GetPropertyGroup(property_names_mutable[i]);
474  if (!pg) {
475  return Status::KeyError("The property ", property_names_mutable[i],
476  " doesn't exist in vertex type ", type, ".");
477  }
478  if (pg != property_group) {
479  return Status::Invalid(
480  "The properties ", property_names_mutable[i], " and ",
481  property_names_mutable[property_names_mutable.size() - 1],
482  " are not in the same property group, please use Make with "
483  "property_group instead.");
484  }
485  }
486  if (!hasIndexCol) {
487  property_names_mutable.insert(property_names_mutable.begin(),
488  graphar::GeneralParams::kVertexIndexCol);
489  }
490  return Make(vertex_info, property_group, property_names_mutable,
491  graph_info->GetPrefix(), options);
492 }
493 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
495  const std::shared_ptr<VertexInfo>& vertex_info,
496  const std::vector<std::string>& labels, const std::string& prefix,
497  const util::FilterOptions& options) {
498  return std::make_shared<VertexPropertyArrowChunkReader>(vertex_info, labels,
499  prefix, options);
500 }
501 
502 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
504  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
505  const std::vector<std::string>& labels,
506  const util::FilterOptions& options) {
507  auto vertex_info = graph_info->GetVertexInfo(type);
508  if (!vertex_info) {
509  return Status::KeyError("The vertex type ", type,
510  " doesn't exist in graph ", graph_info->GetName(),
511  ".");
512  }
513  return Make(vertex_info, labels, graph_info->GetPrefix(), options);
514 }
515 
517  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
518  const std::string& prefix)
519  : edge_info_(edge_info),
520  adj_list_type_(adj_list_type),
521  prefix_(prefix),
522  vertex_chunk_index_(0),
523  chunk_index_(0),
524  seek_offset_(0),
525  chunk_table_(nullptr),
526  chunk_num_(-1) /* -1 means uninitialized */ {
527  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
528  GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
529  edge_info->GetAdjListPathPrefix(adj_list_type));
530  base_dir_ = prefix_ + adj_list_path_prefix;
531  GAR_ASSIGN_OR_RAISE_ERROR(
532  vertex_chunk_num_,
533  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
534 }
535 
537  const AdjListArrowChunkReader& other)
538  : edge_info_(other.edge_info_),
539  adj_list_type_(other.adj_list_type_),
540  prefix_(other.prefix_),
541  vertex_chunk_index_(other.vertex_chunk_index_),
542  chunk_index_(other.chunk_index_),
543  seek_offset_(other.seek_offset_),
544  chunk_table_(nullptr),
545  vertex_chunk_num_(other.vertex_chunk_num_),
546  chunk_num_(other.chunk_num_),
547  base_dir_(other.base_dir_),
548  fs_(other.fs_) {}
549 
551  if (adj_list_type_ != AdjListType::unordered_by_source &&
552  adj_list_type_ != AdjListType::ordered_by_source) {
553  return Status::Invalid("The seek_src operation is invalid in edge ",
554  edge_info_->GetEdgeType(), " reader with ",
555  AdjListTypeToString(adj_list_type_), " type.");
556  }
557 
558  IdType new_vertex_chunk_index = id / edge_info_->GetSrcChunkSize();
559  if (new_vertex_chunk_index >= vertex_chunk_num_) {
560  return Status::IndexError(
561  "The source internal id ", id, " is out of range [0,",
562  edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
563  edge_info_->GetEdgeType(), " reader.");
564  }
565  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
566  // initialize or update chunk_num_
567  vertex_chunk_index_ = new_vertex_chunk_index;
568  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
569  chunk_table_.reset();
570  }
571 
572  if (adj_list_type_ == AdjListType::unordered_by_source) {
573  return seek(0); // start from first chunk
574  } else {
575  GAR_ASSIGN_OR_RAISE(auto range,
576  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
577  adj_list_type_, id));
578  return seek(range.first);
579  }
580  return Status::OK();
581 }
582 
584  if (adj_list_type_ != AdjListType::unordered_by_dest &&
585  adj_list_type_ != AdjListType::ordered_by_dest) {
586  return Status::Invalid("The seek_dst operation is invalid in edge ",
587  edge_info_->GetEdgeType(), " reader with ",
588  AdjListTypeToString(adj_list_type_), " type.");
589  }
590 
591  IdType new_vertex_chunk_index = id / edge_info_->GetDstChunkSize();
592  if (new_vertex_chunk_index >= vertex_chunk_num_) {
593  return Status::IndexError(
594  "The destination internal id ", id, " is out of range [0,",
595  edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
596  edge_info_->GetEdgeType(), " reader.");
597  }
598  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
599  // initialize or update chunk_num_
600  vertex_chunk_index_ = new_vertex_chunk_index;
601  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
602  chunk_table_.reset();
603  }
604 
605  if (adj_list_type_ == AdjListType::unordered_by_dest) {
606  return seek(0); // start from the first chunk
607  } else {
608  GAR_ASSIGN_OR_RAISE(auto range,
609  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
610  adj_list_type_, id));
611  return seek(range.first);
612  }
613 }
614 
616  seek_offset_ = offset;
617  IdType pre_chunk_index = chunk_index_;
618  chunk_index_ = offset / edge_info_->GetChunkSize();
619  if (chunk_index_ != pre_chunk_index) {
620  chunk_table_.reset();
621  }
622  if (chunk_num_ < 0) {
623  // initialize chunk_num_
624  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
625  }
626  if (chunk_index_ >= chunk_num_) {
627  return Status::IndexError("The edge offset ", offset,
628  " is out of range [0,",
629  edge_info_->GetChunkSize() * chunk_num_,
630  "), edge type: ", edge_info_->GetEdgeType());
631  }
632  return Status::OK();
633 }
634 
635 Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {
636  if (chunk_table_ == nullptr) {
637  // check if the edge num of the current vertex chunk is 0
638  GAR_ASSIGN_OR_RAISE(auto edge_num,
639  util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
640  vertex_chunk_index_));
641  if (edge_num == 0) {
642  return nullptr;
643  }
644  GAR_ASSIGN_OR_RAISE(auto chunk_file_path,
645  edge_info_->GetAdjListFilePath(
646  vertex_chunk_index_, chunk_index_, adj_list_type_));
647  std::string path = prefix_ + chunk_file_path;
648  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
649  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
650  }
651  IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
652  return chunk_table_->Slice(row_offset);
653 }
654 
656  ++chunk_index_;
657  if (chunk_num_ < 0) {
658  // initialize chunk_num_
659  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
660  }
661  while (chunk_index_ >= chunk_num_) {
662  ++vertex_chunk_index_;
663  if (vertex_chunk_index_ >= vertex_chunk_num_) {
664  return Status::IndexError("vertex chunk index ", vertex_chunk_index_,
665  " is out-of-bounds for vertex chunk num ",
666  vertex_chunk_num_);
667  }
668  chunk_index_ = 0;
669  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
670  }
671  seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
672  chunk_table_.reset();
673  return Status::OK();
674 }
675 
677  IdType chunk_index) {
678  if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
679  vertex_chunk_index_ = vertex_chunk_index;
680  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
681  chunk_table_.reset();
682  }
683  if (chunk_index_ != chunk_index) {
684  chunk_index_ = chunk_index;
685  seek_offset_ = chunk_index * edge_info_->GetChunkSize();
686  chunk_table_.reset();
687  }
688  return Status::OK();
689 }
690 
692  if (chunk_table_ == nullptr) {
693  GAR_ASSIGN_OR_RAISE(auto chunk_file_path,
694  edge_info_->GetAdjListFilePath(
695  vertex_chunk_index_, chunk_index_, adj_list_type_));
696  std::string path = prefix_ + chunk_file_path;
697  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
698  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
699  }
700  return chunk_table_->num_rows();
701 }
702 
703 Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
704  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
705  const std::string& prefix) {
706  if (!edge_info->HasAdjacentListType(adj_list_type)) {
707  return Status::KeyError(
708  "The adjacent list type ", AdjListTypeToString(adj_list_type),
709  " doesn't exist in edge ", edge_info->GetEdgeType(), ".");
710  }
711  return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
712  prefix);
713 }
714 
715 Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
716  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
717  const std::string& edge_type, const std::string& dst_type,
718  AdjListType adj_list_type) {
719  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
720  if (!edge_info) {
721  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
722  dst_type, " doesn't exist.");
723  }
724  return Make(edge_info, adj_list_type, graph_info->GetPrefix());
725 }
726 
727 Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
728  GAR_ASSIGN_OR_RAISE(chunk_num_,
729  util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
730  vertex_chunk_index_));
731  return Status::OK();
732 }
733 
735  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
736  const std::string& prefix)
737  : edge_info_(std::move(edge_info)),
738  adj_list_type_(adj_list_type),
739  prefix_(prefix),
740  chunk_index_(0),
741  seek_id_(0),
742  chunk_table_(nullptr) {
743  std::string base_dir;
744  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
745  GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path,
746  edge_info->GetOffsetPathPrefix(adj_list_type));
747  base_dir_ = prefix_ + dir_path;
748  if (adj_list_type == AdjListType::ordered_by_source ||
749  adj_list_type == AdjListType::ordered_by_dest) {
750  GAR_ASSIGN_OR_RAISE_ERROR(
751  vertex_chunk_num_,
752  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
753  vertex_chunk_size_ = adj_list_type == AdjListType::ordered_by_source
754  ? edge_info_->GetSrcChunkSize()
755  : edge_info_->GetDstChunkSize();
756  } else {
757  std::string err_msg = "Invalid adj list type " +
758  std::string(AdjListTypeToString(adj_list_type)) +
759  " to construct AdjListOffsetReader.";
760  throw std::runtime_error(err_msg);
761  }
762 }
763 
765  seek_id_ = id;
766  IdType pre_chunk_index = chunk_index_;
767  chunk_index_ = id / vertex_chunk_size_;
768  if (chunk_index_ != pre_chunk_index) {
769  chunk_table_.reset();
770  }
771  if (chunk_index_ >= vertex_chunk_num_) {
772  return Status::IndexError("Internal vertex id ", id, "is out of range [0,",
773  vertex_chunk_num_ * vertex_chunk_size_,
774  "), of edge ", edge_info_->GetEdgeType(),
775  " of adj list type ",
776  AdjListTypeToString(adj_list_type_), ".");
777  }
778  return Status::OK();
779 }
780 
781 Result<std::shared_ptr<arrow::Array>>
783  if (chunk_table_ == nullptr) {
784  GAR_ASSIGN_OR_RAISE(
785  auto chunk_file_path,
786  edge_info_->GetAdjListOffsetFilePath(chunk_index_, adj_list_type_));
787  std::string path = prefix_ + chunk_file_path;
788  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
789  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
790  }
791  IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
792  return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
793 }
794 
796  if (++chunk_index_ >= vertex_chunk_num_) {
797  return Status::IndexError("vertex chunk index ", chunk_index_,
798  " is out-of-bounds for vertex chunk num ",
799  vertex_chunk_num_, " of edge ",
800  edge_info_->GetEdgeType(), " of adj list type ",
801  AdjListTypeToString(adj_list_type_), ".");
802  }
803  seek_id_ = chunk_index_ * vertex_chunk_size_;
804  chunk_table_.reset();
805 
806  return Status::OK();
807 }
808 
809 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
810 AdjListOffsetArrowChunkReader::Make(const std::shared_ptr<EdgeInfo>& edge_info,
811  AdjListType adj_list_type,
812  const std::string& prefix) {
813  if (!edge_info->HasAdjacentListType(adj_list_type)) {
814  return Status::KeyError(
815  "The adjacent list type ", AdjListTypeToString(adj_list_type),
816  " doesn't exist in edge ", edge_info->GetEdgeType(), ".");
817  }
818  return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
819  adj_list_type, prefix);
820 }
821 
822 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
824  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
825  const std::string& edge_type, const std::string& dst_type,
826  AdjListType adj_list_type) {
827  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
828  if (!edge_info) {
829  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
830  dst_type, " doesn't exist.");
831  }
832  return Make(edge_info, adj_list_type, graph_info->GetPrefix());
833 }
834 
836  const std::shared_ptr<EdgeInfo>& edge_info,
837  const std::shared_ptr<PropertyGroup>& property_group,
838  AdjListType adj_list_type, const std::string prefix,
839  const util::FilterOptions& options)
840  : edge_info_(std::move(edge_info)),
841  property_group_(std::move(property_group)),
842  adj_list_type_(adj_list_type),
843  prefix_(prefix),
844  vertex_chunk_index_(0),
845  chunk_index_(0),
846  seek_offset_(0),
847  schema_(nullptr),
848  chunk_table_(nullptr),
849  filter_options_(options),
850  chunk_num_(-1) /* -1 means uninitialized */ {
851  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
852  GAR_ASSIGN_OR_RAISE_ERROR(
853  auto pg_path_prefix,
854  edge_info->GetPropertyGroupPathPrefix(property_group, adj_list_type));
855  base_dir_ = prefix_ + pg_path_prefix;
856  GAR_ASSIGN_OR_RAISE_ERROR(
857  vertex_chunk_num_,
858  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
859  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
860  PropertyGroupToSchema(property_group, false));
861 }
862 
864  const AdjListPropertyArrowChunkReader& other)
865  : edge_info_(other.edge_info_),
866  property_group_(other.property_group_),
867  adj_list_type_(other.adj_list_type_),
868  prefix_(other.prefix_),
869  vertex_chunk_index_(other.vertex_chunk_index_),
870  chunk_index_(other.chunk_index_),
871  seek_offset_(other.seek_offset_),
872  schema_(other.schema_),
873  chunk_table_(nullptr),
874  filter_options_(other.filter_options_),
875  vertex_chunk_num_(other.vertex_chunk_num_),
876  chunk_num_(other.chunk_num_),
877  base_dir_(other.base_dir_),
878  fs_(other.fs_) {}
879 
881  if (adj_list_type_ != AdjListType::unordered_by_source &&
882  adj_list_type_ != AdjListType::ordered_by_source) {
883  return Status::Invalid("The seek_src operation is invalid in edge ",
884  edge_info_->GetEdgeType(), " reader with ",
885  AdjListTypeToString(adj_list_type_), " type.");
886  }
887 
888  IdType new_vertex_chunk_index = id / edge_info_->GetSrcChunkSize();
889  if (new_vertex_chunk_index >= vertex_chunk_num_) {
890  return Status::IndexError(
891  "The source internal id ", id, " is out of range [0,",
892  edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
893  edge_info_->GetEdgeType(), " reader.");
894  }
895  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
896  vertex_chunk_index_ = new_vertex_chunk_index;
897  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
898  chunk_table_.reset();
899  }
900 
901  if (adj_list_type_ == AdjListType::unordered_by_source) {
902  return seek(0); // start from first chunk
903  } else {
904  GAR_ASSIGN_OR_RAISE(auto range,
905  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
906  adj_list_type_, id));
907  return seek(range.first);
908  }
909  return Status::OK();
910 }
911 
913  if (adj_list_type_ != AdjListType::unordered_by_dest &&
914  adj_list_type_ != AdjListType::ordered_by_dest) {
915  return Status::Invalid("The seek_dst operation is invalid in edge ",
916  edge_info_->GetEdgeType(), " reader with ",
917  AdjListTypeToString(adj_list_type_), " type.");
918  }
919 
920  IdType new_vertex_chunk_index = id / edge_info_->GetDstChunkSize();
921  if (new_vertex_chunk_index >= vertex_chunk_num_) {
922  return Status::IndexError(
923  "The destination internal id ", id, " is out of range [0,",
924  edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
925  edge_info_->GetEdgeType(), " reader.");
926  }
927  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
928  vertex_chunk_index_ = new_vertex_chunk_index;
929  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
930  chunk_table_.reset();
931  }
932 
933  if (adj_list_type_ == AdjListType::unordered_by_dest) {
934  return seek(0); // start from the first chunk
935  } else {
936  GAR_ASSIGN_OR_RAISE(auto range,
937  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
938  adj_list_type_, id));
939  return seek(range.first);
940  }
941 }
942 
944  IdType pre_chunk_index = chunk_index_;
945  seek_offset_ = offset;
946  chunk_index_ = offset / edge_info_->GetChunkSize();
947  if (chunk_index_ != pre_chunk_index) {
948  chunk_table_.reset();
949  }
950  if (chunk_num_ < 0) {
951  // initialize chunk_num_
952  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
953  }
954  if (chunk_index_ >= chunk_num_) {
955  return Status::IndexError("The edge offset ", offset,
956  " is out of range [0,",
957  edge_info_->GetChunkSize() * chunk_num_,
958  "), edge type: ", edge_info_->GetEdgeType());
959  }
960  return Status::OK();
961 }
962 
963 Result<std::shared_ptr<arrow::Table>>
965  GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
966  if (chunk_table_ == nullptr) {
967  // check if the edge num of the current vertex chunk is 0
968  GAR_ASSIGN_OR_RAISE(auto edge_num,
969  util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
970  vertex_chunk_index_));
971  if (edge_num == 0) {
972  return nullptr;
973  }
974  GAR_ASSIGN_OR_RAISE(
975  auto chunk_file_path,
976  edge_info_->GetPropertyFilePath(property_group_, adj_list_type_,
977  vertex_chunk_index_, chunk_index_));
978  std::string path = prefix_ + chunk_file_path;
979  GAR_ASSIGN_OR_RAISE(
980  chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
981  filter_options_));
982  // TODO(acezen): filter pushdown doesn't support cast schema now
983  if (schema_ != nullptr && filter_options_.filter == nullptr) {
984  GAR_RETURN_NOT_OK(
985  CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
986  }
987  }
988  IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
989  return chunk_table_->Slice(row_offset);
990 }
991 
993  ++chunk_index_;
994  if (chunk_num_ < 0) {
995  // initialize chunk_num_
996  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
997  }
998  while (chunk_index_ >= chunk_num_) {
999  ++vertex_chunk_index_;
1000  if (vertex_chunk_index_ >= vertex_chunk_num_) {
1001  return Status::IndexError("vertex chunk index ", vertex_chunk_index_,
1002  " is out-of-bounds for vertex chunk num ",
1003  vertex_chunk_num_, " of edge ",
1004  edge_info_->GetEdgeType(), " of adj list type ",
1005  AdjListTypeToString(adj_list_type_),
1006  ", property group ", property_group_, ".");
1007  }
1008  chunk_index_ = 0;
1009  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
1010  }
1011  seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
1012  chunk_table_.reset();
1013  return Status::OK();
1014 }
1015 
1017  IdType vertex_chunk_index, IdType chunk_index) {
1018  if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
1019  vertex_chunk_index_ = vertex_chunk_index;
1020  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
1021  chunk_table_.reset();
1022  }
1023  if (chunk_index_ != chunk_index) {
1024  chunk_index_ = chunk_index;
1025  seek_offset_ = chunk_index * edge_info_->GetChunkSize();
1026  chunk_table_.reset();
1027  }
1028  return Status::OK();
1029 }
1030 
1031 void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) {
1032  filter_options_.filter = filter;
1033 }
1034 
1035 void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
1036  filter_options_.columns = column_names;
1037 }
1038 
1039 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1041  const std::shared_ptr<EdgeInfo>& edge_info,
1042  const std::shared_ptr<PropertyGroup>& property_group,
1043  AdjListType adj_list_type, const std::string& prefix,
1044  const util::FilterOptions& options) {
1045  if (!edge_info->HasAdjacentListType(adj_list_type)) {
1046  return Status::KeyError(
1047  "The adjacent list type ", AdjListTypeToString(adj_list_type),
1048  " doesn't exist in edge ", edge_info->GetEdgeType(), ".");
1049  }
1050  return std::make_shared<AdjListPropertyArrowChunkReader>(
1051  edge_info, property_group, adj_list_type, prefix, options);
1052 }
1053 
1054 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1056  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
1057  const std::string& edge_type, const std::string& dst_type,
1058  const std::shared_ptr<PropertyGroup>& property_group,
1059  AdjListType adj_list_type, const util::FilterOptions& options) {
1060  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1061  if (!edge_info) {
1062  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
1063  dst_type, " doesn't exist.");
1064  }
1065  return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
1066  options);
1067 }
1068 
1069 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
1071  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
1072  const std::string& edge_type, const std::string& dst_type,
1073  const std::string& property_name, AdjListType adj_list_type,
1074  const util::FilterOptions& options) {
1075  auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
1076  if (!edge_info) {
1077  return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
1078  dst_type, " doesn't exist.");
1079  }
1080  auto property_group = edge_info->GetPropertyGroup(property_name);
1081  if (!property_group) {
1082  return Status::KeyError("The property ", property_name,
1083  " doesn't exist in edge ", src_type, " ", edge_type,
1084  " ", dst_type, ".");
1085  }
1086  return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
1087  options);
1088 }
1089 
1090 Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
1091  GAR_ASSIGN_OR_RAISE(chunk_num_,
1092  util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
1093  vertex_chunk_index_));
1094  return Status::OK();
1095 }
1096 
1097 } // namespace graphar
The arrow chunk reader for adj list topology chunk.
Definition: chunk_reader.h:277
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
AdjListArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListArrowChunkReader.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
static Result< std::shared_ptr< AdjListArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek_dst(IdType offset)
Sets chunk position indicator for reader by destination vertex id.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Array > > GetChunk()
Get the current offset chunk as arrow::Array.
AdjListOffsetArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListOffsetArrowChunkReader.
static Result< std::shared_ptr< AdjListOffsetArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListOffsetArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk. if current chunk is the last chunk, will return Status::...
The arrow chunk reader for edge property group chunks.
Definition: chunk_reader.h:474
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek_dst(IdType id)
Sets chunk position indicator for reader by destination vertex id.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
static Result< std::shared_ptr< AdjListPropertyArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string &prefix, const util::FilterOptions &options={})
Create an AdjListPropertyArrowChunkReader instance from edge info.
AdjListPropertyArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string prefix, const util::FilterOptions &options={})
Initialize the AdjListPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
Status outcome object (success or error)
Definition: status.h:123
static Status IndexError(Args &&... args)
Definition: status.h:197
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
The arrow chunk reader for vertex property group.
Definition: chunk_reader.h:43
Result< std::shared_ptr< arrow::Table > > GetLabelChunk()
Return the current arrow label chunk table of chunk position indicator.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > MakeForLabels(const std::shared_ptr< GraphInfo > &graph_info, const std::string &type, const std::vector< std::string > &labels, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from graph info for labels.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Table > > GetChunk(GetChunkVersion version=GetChunkVersion::AUTO)
Return the current arrow chunk table of chunk position indicator.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > MakeForProperties(const std::shared_ptr< GraphInfo > &graph_info, const std::string &type, const std::vector< std::string > &property_names, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from graph info for properties.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.