Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
chunk_reader.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <utility>
21 
22 #include "arrow/api.h"
23 #include "arrow/compute/api.h"
24 
25 #include "graphar/arrow/chunk_reader.h"
26 #include "graphar/filesystem.h"
27 #include "graphar/general_params.h"
28 #include "graphar/graph_info.h"
29 #include "graphar/reader_util.h"
30 #include "graphar/result.h"
31 #include "graphar/status.h"
32 #include "graphar/types.h"
33 #include "graphar/util.h"
34 
35 namespace graphar {
36 
37 namespace {
38 
39 Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
40  const std::shared_ptr<PropertyGroup> pg,
41  bool contain_index_column = false) {
42  std::vector<std::shared_ptr<arrow::Field>> fields;
43  if (contain_index_column) {
44  fields.push_back(std::make_shared<arrow::Field>(
45  GeneralParams::kVertexIndexCol, arrow::int64()));
46  }
47  for (const auto& prop : pg->GetProperties()) {
48  fields.push_back(std::make_shared<arrow::Field>(
49  prop.name, DataType::DataTypeToArrowDataType(prop.type)));
50  }
51  return arrow::schema(fields);
52 }
53 
54 Status GeneralCast(const std::shared_ptr<arrow::Array>& in,
55  const std::shared_ptr<arrow::DataType>& to_type,
56  std::shared_ptr<arrow::Array>* out) {
57  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
58  arrow::compute::Cast(*in, to_type));
59  return Status::OK();
60 }
61 
62 Status CastStringToLargeString(const std::shared_ptr<arrow::Array>& in,
63  const std::shared_ptr<arrow::DataType>& to_type,
64  std::shared_ptr<arrow::Array>* out) {
65  auto array_data = in->data()->Copy();
66  auto offset = array_data->buffers[1];
67  using from_offset_type = typename arrow::StringArray::offset_type;
68  using to_string_offset_type = typename arrow::LargeStringArray::offset_type;
69  auto raw_value_offsets_ =
70  offset == NULLPTR
71  ? NULLPTR
72  : reinterpret_cast<const from_offset_type*>(offset->data());
73  std::vector<to_string_offset_type> to_offset(offset->size() /
74  sizeof(from_offset_type));
75  for (size_t i = 0; i < to_offset.size(); ++i) {
76  to_offset[i] = raw_value_offsets_[i];
77  }
78  std::shared_ptr<arrow::Buffer> buffer;
79  arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
80  RETURN_NOT_ARROW_OK(
81  buffer_builder.Append(to_offset.data(), to_offset.size()));
82  RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
83  array_data->type = to_type;
84  array_data->buffers[1] = buffer;
85  *out = arrow::MakeArray(array_data);
86  RETURN_NOT_ARROW_OK((*out)->ValidateFull());
87  return Status::OK();
88 }
89 
90 // helper function to cast arrow::Table with a schema
91 Status CastTableWithSchema(const std::shared_ptr<arrow::Table>& table,
92  const std::shared_ptr<arrow::Schema>& schema,
93  std::shared_ptr<arrow::Table>* out_table) {
94  if (table->schema()->Equals(*schema)) {
95  *out_table = table;
96  }
97  std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
98  for (int64_t i = 0; i < table->num_columns(); ++i) {
99  auto column = table->column(i);
100  if (table->field(i)->type()->Equals(schema->field(i)->type())) {
101  columns.push_back(column);
102  continue;
103  }
104  auto from_t = table->field(i)->type();
105  auto to_t = schema->field(i)->type();
106  std::vector<std::shared_ptr<arrow::Array>> chunks;
107  // process cast for each chunk
108  for (int64_t j = 0; j < column->num_chunks(); ++j) {
109  auto chunk = column->chunk(j);
110  std::shared_ptr<arrow::Array> out;
111  if (arrow::compute::CanCast(*from_t, *to_t)) {
112  GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
113  chunks.push_back(out);
114  } else if (from_t->Equals(arrow::utf8()) &&
115  to_t->Equals(arrow::large_utf8())) {
116  GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
117  chunks.push_back(out);
118  }
119  }
120  columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
121  }
122 
123  *out_table = arrow::Table::Make(schema, columns);
124  return Status::OK();
125 }
126 } // namespace
127 
129  const std::shared_ptr<VertexInfo>& vertex_info,
130  const std::shared_ptr<PropertyGroup>& property_group,
131  const std::string& prefix, const util::FilterOptions& options)
132  : vertex_info_(std::move(vertex_info)),
133  property_group_(std::move(property_group)),
134  chunk_index_(0),
135  seek_id_(0),
136  schema_(nullptr),
137  chunk_table_(nullptr),
138  filter_options_(options) {
139  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
140  GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
141  vertex_info->GetPathPrefix(property_group));
142  std::string base_dir = prefix_ + pg_path_prefix;
143  GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
144  util::GetVertexChunkNum(prefix_, vertex_info));
145  GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
146  util::GetVertexNum(prefix_, vertex_info_));
147  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
148  PropertyGroupToSchema(property_group_, true));
149 }
150 
152  seek_id_ = id;
153  IdType pre_chunk_index = chunk_index_;
154  chunk_index_ = id / vertex_info_->GetChunkSize();
155  if (chunk_index_ != pre_chunk_index) {
156  // TODO(@acezen): use a cache to avoid reloading the same chunk, could use
157  // a LRU cache.
158  chunk_table_.reset();
159  }
160  if (chunk_index_ >= chunk_num_) {
161  return Status::IndexError("Internal vertex id ", id, " is out of range [0,",
162  chunk_num_ * vertex_info_->GetChunkSize(),
163  ") of vertex ", vertex_info_->GetLabel());
164  }
165  return Status::OK();
166 }
167 
168 Result<std::shared_ptr<arrow::Table>>
170  GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
171  if (chunk_table_ == nullptr) {
172  GAR_ASSIGN_OR_RAISE(
173  auto chunk_file_path,
174  vertex_info_->GetFilePath(property_group_, chunk_index_));
175  std::string path = prefix_ + chunk_file_path;
176  GAR_ASSIGN_OR_RAISE(
177  chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
178  filter_options_));
179  // TODO(acezen): filter pushdown doesn't support cast schema now
180  if (schema_ != nullptr && filter_options_.filter == nullptr) {
181  GAR_RETURN_NOT_OK(
182  CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
183  }
184  }
185  IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
186  return chunk_table_->Slice(row_offset);
187 }
188 
190  if (++chunk_index_ >= chunk_num_) {
191  return Status::IndexError(
192  "vertex chunk index ", chunk_index_, " is out-of-bounds for vertex ",
193  vertex_info_->GetLabel(), " chunk num ", chunk_num_);
194  }
195  seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
196  chunk_table_.reset();
197 
198  return Status::OK();
199 }
200 
201 void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
202  filter_options_.filter = filter;
203 }
204 
205 void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
206  filter_options_.columns = column_names;
207 }
208 
209 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
211  const std::shared_ptr<VertexInfo>& vertex_info,
212  const std::shared_ptr<PropertyGroup>& property_group,
213  const std::string& prefix, const util::FilterOptions& options) {
214  return std::make_shared<VertexPropertyArrowChunkReader>(
215  vertex_info, property_group, prefix, options);
216 }
217 
218 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
220  const std::shared_ptr<GraphInfo>& graph_info, const std::string& label,
221  const std::shared_ptr<PropertyGroup>& property_group,
222  const util::FilterOptions& options) {
223  auto vertex_info = graph_info->GetVertexInfo(label);
224  if (!vertex_info) {
225  return Status::KeyError("The vertex type ", label,
226  " doesn't exist in graph ", graph_info->GetName(),
227  ".");
228  }
229  return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
230 }
231 
232 Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
234  const std::shared_ptr<GraphInfo>& graph_info, const std::string& label,
235  const std::string& property_name, const util::FilterOptions& options) {
236  auto vertex_info = graph_info->GetVertexInfo(label);
237  if (!vertex_info) {
238  return Status::KeyError("The vertex type ", label,
239  " doesn't exist in graph ", graph_info->GetName(),
240  ".");
241  }
242  auto property_group = vertex_info->GetPropertyGroup(property_name);
243  if (!property_group) {
244  return Status::KeyError("The property ", property_name,
245  " doesn't exist in vertex type ", label, ".");
246  }
247  return Make(vertex_info, property_group, graph_info->GetPrefix(), options);
248 }
249 
251  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
252  const std::string& prefix)
253  : edge_info_(edge_info),
254  adj_list_type_(adj_list_type),
255  prefix_(prefix),
256  vertex_chunk_index_(0),
257  chunk_index_(0),
258  seek_offset_(0),
259  chunk_table_(nullptr),
260  chunk_num_(-1) /* -1 means uninitialized */ {
261  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
262  GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
263  edge_info->GetAdjListPathPrefix(adj_list_type));
264  base_dir_ = prefix_ + adj_list_path_prefix;
265  GAR_ASSIGN_OR_RAISE_ERROR(
266  vertex_chunk_num_,
267  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
268 }
269 
271  const AdjListArrowChunkReader& other)
272  : edge_info_(other.edge_info_),
273  adj_list_type_(other.adj_list_type_),
274  prefix_(other.prefix_),
275  vertex_chunk_index_(other.vertex_chunk_index_),
276  chunk_index_(other.chunk_index_),
277  seek_offset_(other.seek_offset_),
278  chunk_table_(nullptr),
279  vertex_chunk_num_(other.vertex_chunk_num_),
280  chunk_num_(other.chunk_num_),
281  base_dir_(other.base_dir_),
282  fs_(other.fs_) {}
283 
285  if (adj_list_type_ != AdjListType::unordered_by_source &&
286  adj_list_type_ != AdjListType::ordered_by_source) {
287  return Status::Invalid("The seek_src operation is invalid in edge ",
288  edge_info_->GetEdgeLabel(), " reader with ",
289  AdjListTypeToString(adj_list_type_), " type.");
290  }
291 
292  IdType new_vertex_chunk_index = id / edge_info_->GetSrcChunkSize();
293  if (new_vertex_chunk_index >= vertex_chunk_num_) {
294  return Status::IndexError(
295  "The source internal id ", id, " is out of range [0,",
296  edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
297  edge_info_->GetEdgeLabel(), " reader.");
298  }
299  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
300  // initialize or update chunk_num_
301  vertex_chunk_index_ = new_vertex_chunk_index;
302  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
303  chunk_table_.reset();
304  }
305 
306  if (adj_list_type_ == AdjListType::unordered_by_source) {
307  return seek(0); // start from first chunk
308  } else {
309  GAR_ASSIGN_OR_RAISE(auto range,
310  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
311  adj_list_type_, id));
312  return seek(range.first);
313  }
314  return Status::OK();
315 }
316 
318  if (adj_list_type_ != AdjListType::unordered_by_dest &&
319  adj_list_type_ != AdjListType::ordered_by_dest) {
320  return Status::Invalid("The seek_dst operation is invalid in edge ",
321  edge_info_->GetEdgeLabel(), " reader with ",
322  AdjListTypeToString(adj_list_type_), " type.");
323  }
324 
325  IdType new_vertex_chunk_index = id / edge_info_->GetDstChunkSize();
326  if (new_vertex_chunk_index >= vertex_chunk_num_) {
327  return Status::IndexError(
328  "The destination internal id ", id, " is out of range [0,",
329  edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
330  edge_info_->GetEdgeLabel(), " reader.");
331  }
332  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
333  // initialize or update chunk_num_
334  vertex_chunk_index_ = new_vertex_chunk_index;
335  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
336  chunk_table_.reset();
337  }
338 
339  if (adj_list_type_ == AdjListType::unordered_by_dest) {
340  return seek(0); // start from the first chunk
341  } else {
342  GAR_ASSIGN_OR_RAISE(auto range,
343  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
344  adj_list_type_, id));
345  return seek(range.first);
346  }
347 }
348 
350  seek_offset_ = offset;
351  IdType pre_chunk_index = chunk_index_;
352  chunk_index_ = offset / edge_info_->GetChunkSize();
353  if (chunk_index_ != pre_chunk_index) {
354  chunk_table_.reset();
355  }
356  if (chunk_num_ < 0) {
357  // initialize chunk_num_
358  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
359  }
360  if (chunk_index_ >= chunk_num_) {
361  return Status::IndexError("The edge offset ", offset,
362  " is out of range [0,",
363  edge_info_->GetChunkSize() * chunk_num_,
364  "), edge label: ", edge_info_->GetEdgeLabel());
365  }
366  return Status::OK();
367 }
368 
369 Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {
370  if (chunk_table_ == nullptr) {
371  // check if the edge num of the current vertex chunk is 0
372  GAR_ASSIGN_OR_RAISE(auto edge_num,
373  util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
374  vertex_chunk_index_));
375  if (edge_num == 0) {
376  return nullptr;
377  }
378  GAR_ASSIGN_OR_RAISE(auto chunk_file_path,
379  edge_info_->GetAdjListFilePath(
380  vertex_chunk_index_, chunk_index_, adj_list_type_));
381  std::string path = prefix_ + chunk_file_path;
382  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
383  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
384  }
385  IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
386  return chunk_table_->Slice(row_offset);
387 }
388 
390  ++chunk_index_;
391  if (chunk_num_ < 0) {
392  // initialize chunk_num_
393  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
394  }
395  while (chunk_index_ >= chunk_num_) {
396  ++vertex_chunk_index_;
397  if (vertex_chunk_index_ >= vertex_chunk_num_) {
398  return Status::IndexError("vertex chunk index ", vertex_chunk_index_,
399  " is out-of-bounds for vertex chunk num ",
400  vertex_chunk_num_);
401  }
402  chunk_index_ = 0;
403  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
404  }
405  seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
406  chunk_table_.reset();
407  return Status::OK();
408 }
409 
411  IdType chunk_index) {
412  if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
413  vertex_chunk_index_ = vertex_chunk_index;
414  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
415  chunk_table_.reset();
416  }
417  if (chunk_index_ != chunk_index) {
418  chunk_index_ = chunk_index;
419  seek_offset_ = chunk_index * edge_info_->GetChunkSize();
420  chunk_table_.reset();
421  }
422  return Status::OK();
423 }
424 
426  if (chunk_table_ == nullptr) {
427  GAR_ASSIGN_OR_RAISE(auto chunk_file_path,
428  edge_info_->GetAdjListFilePath(
429  vertex_chunk_index_, chunk_index_, adj_list_type_));
430  std::string path = prefix_ + chunk_file_path;
431  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
432  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
433  }
434  return chunk_table_->num_rows();
435 }
436 
437 Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
438  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
439  const std::string& prefix) {
440  if (!edge_info->HasAdjacentListType(adj_list_type)) {
441  return Status::KeyError(
442  "The adjacent list type ", AdjListTypeToString(adj_list_type),
443  " doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
444  }
445  return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
446  prefix);
447 }
448 
449 Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
450  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label,
451  const std::string& edge_label, const std::string& dst_label,
452  AdjListType adj_list_type) {
453  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
454  if (!edge_info) {
455  return Status::KeyError("The edge ", src_label, " ", edge_label, " ",
456  dst_label, " doesn't exist.");
457  }
458  return Make(edge_info, adj_list_type, graph_info->GetPrefix());
459 }
460 
461 Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
462  GAR_ASSIGN_OR_RAISE(chunk_num_,
463  util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
464  vertex_chunk_index_));
465  return Status::OK();
466 }
467 
469  const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
470  const std::string& prefix)
471  : edge_info_(std::move(edge_info)),
472  adj_list_type_(adj_list_type),
473  prefix_(prefix),
474  chunk_index_(0),
475  seek_id_(0),
476  chunk_table_(nullptr) {
477  std::string base_dir;
478  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
479  GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path,
480  edge_info->GetOffsetPathPrefix(adj_list_type));
481  base_dir_ = prefix_ + dir_path;
482  if (adj_list_type == AdjListType::ordered_by_source ||
483  adj_list_type == AdjListType::ordered_by_dest) {
484  GAR_ASSIGN_OR_RAISE_ERROR(
485  vertex_chunk_num_,
486  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
487  vertex_chunk_size_ = adj_list_type == AdjListType::ordered_by_source
488  ? edge_info_->GetSrcChunkSize()
489  : edge_info_->GetDstChunkSize();
490  } else {
491  std::string err_msg = "Invalid adj list type " +
492  std::string(AdjListTypeToString(adj_list_type)) +
493  " to construct AdjListOffsetReader.";
494  throw std::runtime_error(err_msg);
495  }
496 }
497 
499  seek_id_ = id;
500  IdType pre_chunk_index = chunk_index_;
501  chunk_index_ = id / vertex_chunk_size_;
502  if (chunk_index_ != pre_chunk_index) {
503  chunk_table_.reset();
504  }
505  if (chunk_index_ >= vertex_chunk_num_) {
506  return Status::IndexError("Internal vertex id ", id, "is out of range [0,",
507  vertex_chunk_num_ * vertex_chunk_size_,
508  "), of edge ", edge_info_->GetEdgeLabel(),
509  " of adj list type ",
510  AdjListTypeToString(adj_list_type_), ".");
511  }
512  return Status::OK();
513 }
514 
515 Result<std::shared_ptr<arrow::Array>>
517  if (chunk_table_ == nullptr) {
518  GAR_ASSIGN_OR_RAISE(
519  auto chunk_file_path,
520  edge_info_->GetAdjListOffsetFilePath(chunk_index_, adj_list_type_));
521  std::string path = prefix_ + chunk_file_path;
522  auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
523  GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
524  }
525  IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
526  return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
527 }
528 
530  if (++chunk_index_ >= vertex_chunk_num_) {
531  return Status::IndexError("vertex chunk index ", chunk_index_,
532  " is out-of-bounds for vertex chunk num ",
533  vertex_chunk_num_, " of edge ",
534  edge_info_->GetEdgeLabel(), " of adj list type ",
535  AdjListTypeToString(adj_list_type_), ".");
536  }
537  seek_id_ = chunk_index_ * vertex_chunk_size_;
538  chunk_table_.reset();
539 
540  return Status::OK();
541 }
542 
543 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
544 AdjListOffsetArrowChunkReader::Make(const std::shared_ptr<EdgeInfo>& edge_info,
545  AdjListType adj_list_type,
546  const std::string& prefix) {
547  if (!edge_info->HasAdjacentListType(adj_list_type)) {
548  return Status::KeyError(
549  "The adjacent list type ", AdjListTypeToString(adj_list_type),
550  " doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
551  }
552  return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
553  adj_list_type, prefix);
554 }
555 
556 Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
558  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label,
559  const std::string& edge_label, const std::string& dst_label,
560  AdjListType adj_list_type) {
561  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
562  if (!edge_info) {
563  return Status::KeyError("The edge ", src_label, " ", edge_label, " ",
564  dst_label, " doesn't exist.");
565  }
566  return Make(edge_info, adj_list_type, graph_info->GetPrefix());
567 }
568 
570  const std::shared_ptr<EdgeInfo>& edge_info,
571  const std::shared_ptr<PropertyGroup>& property_group,
572  AdjListType adj_list_type, const std::string prefix,
573  const util::FilterOptions& options)
574  : edge_info_(std::move(edge_info)),
575  property_group_(std::move(property_group)),
576  adj_list_type_(adj_list_type),
577  prefix_(prefix),
578  vertex_chunk_index_(0),
579  chunk_index_(0),
580  seek_offset_(0),
581  schema_(nullptr),
582  chunk_table_(nullptr),
583  filter_options_(options),
584  chunk_num_(-1) /* -1 means uninitialized */ {
585  GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
586  GAR_ASSIGN_OR_RAISE_ERROR(
587  auto pg_path_prefix,
588  edge_info->GetPropertyGroupPathPrefix(property_group, adj_list_type));
589  base_dir_ = prefix_ + pg_path_prefix;
590  GAR_ASSIGN_OR_RAISE_ERROR(
591  vertex_chunk_num_,
592  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
593  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
594  PropertyGroupToSchema(property_group, false));
595 }
596 
598  const AdjListPropertyArrowChunkReader& other)
599  : edge_info_(other.edge_info_),
600  property_group_(other.property_group_),
601  adj_list_type_(other.adj_list_type_),
602  prefix_(other.prefix_),
603  vertex_chunk_index_(other.vertex_chunk_index_),
604  chunk_index_(other.chunk_index_),
605  seek_offset_(other.seek_offset_),
606  schema_(other.schema_),
607  chunk_table_(nullptr),
608  filter_options_(other.filter_options_),
609  vertex_chunk_num_(other.vertex_chunk_num_),
610  chunk_num_(other.chunk_num_),
611  base_dir_(other.base_dir_),
612  fs_(other.fs_) {}
613 
615  if (adj_list_type_ != AdjListType::unordered_by_source &&
616  adj_list_type_ != AdjListType::ordered_by_source) {
617  return Status::Invalid("The seek_src operation is invalid in edge ",
618  edge_info_->GetEdgeLabel(), " reader with ",
619  AdjListTypeToString(adj_list_type_), " type.");
620  }
621 
622  IdType new_vertex_chunk_index = id / edge_info_->GetSrcChunkSize();
623  if (new_vertex_chunk_index >= vertex_chunk_num_) {
624  return Status::IndexError(
625  "The source internal id ", id, " is out of range [0,",
626  edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
627  edge_info_->GetEdgeLabel(), " reader.");
628  }
629  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
630  vertex_chunk_index_ = new_vertex_chunk_index;
631  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
632  chunk_table_.reset();
633  }
634 
635  if (adj_list_type_ == AdjListType::unordered_by_source) {
636  return seek(0); // start from first chunk
637  } else {
638  GAR_ASSIGN_OR_RAISE(auto range,
639  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
640  adj_list_type_, id));
641  return seek(range.first);
642  }
643  return Status::OK();
644 }
645 
647  if (adj_list_type_ != AdjListType::unordered_by_dest &&
648  adj_list_type_ != AdjListType::ordered_by_dest) {
649  return Status::Invalid("The seek_dst operation is invalid in edge ",
650  edge_info_->GetEdgeLabel(), " reader with ",
651  AdjListTypeToString(adj_list_type_), " type.");
652  }
653 
654  IdType new_vertex_chunk_index = id / edge_info_->GetDstChunkSize();
655  if (new_vertex_chunk_index >= vertex_chunk_num_) {
656  return Status::IndexError(
657  "The destination internal id ", id, " is out of range [0,",
658  edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
659  edge_info_->GetEdgeLabel(), " reader.");
660  }
661  if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
662  vertex_chunk_index_ = new_vertex_chunk_index;
663  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
664  chunk_table_.reset();
665  }
666 
667  if (adj_list_type_ == AdjListType::unordered_by_dest) {
668  return seek(0); // start from the first chunk
669  } else {
670  GAR_ASSIGN_OR_RAISE(auto range,
671  util::GetAdjListOffsetOfVertex(edge_info_, prefix_,
672  adj_list_type_, id));
673  return seek(range.first);
674  }
675 }
676 
678  IdType pre_chunk_index = chunk_index_;
679  seek_offset_ = offset;
680  chunk_index_ = offset / edge_info_->GetChunkSize();
681  if (chunk_index_ != pre_chunk_index) {
682  chunk_table_.reset();
683  }
684  if (chunk_num_ < 0) {
685  // initialize chunk_num_
686  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
687  }
688  if (chunk_index_ >= chunk_num_) {
689  return Status::IndexError("The edge offset ", offset,
690  " is out of range [0,",
691  edge_info_->GetChunkSize() * chunk_num_,
692  "), edge label: ", edge_info_->GetEdgeLabel());
693  }
694  return Status::OK();
695 }
696 
697 Result<std::shared_ptr<arrow::Table>>
699  GAR_RETURN_NOT_OK(util::CheckFilterOptions(filter_options_, property_group_));
700  if (chunk_table_ == nullptr) {
701  // check if the edge num of the current vertex chunk is 0
702  GAR_ASSIGN_OR_RAISE(auto edge_num,
703  util::GetEdgeNum(prefix_, edge_info_, adj_list_type_,
704  vertex_chunk_index_));
705  if (edge_num == 0) {
706  return nullptr;
707  }
708  GAR_ASSIGN_OR_RAISE(
709  auto chunk_file_path,
710  edge_info_->GetPropertyFilePath(property_group_, adj_list_type_,
711  vertex_chunk_index_, chunk_index_));
712  std::string path = prefix_ + chunk_file_path;
713  GAR_ASSIGN_OR_RAISE(
714  chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
715  filter_options_));
716  // TODO(acezen): filter pushdown doesn't support cast schema now
717  if (schema_ != nullptr && filter_options_.filter == nullptr) {
718  GAR_RETURN_NOT_OK(
719  CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
720  }
721  }
722  IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
723  return chunk_table_->Slice(row_offset);
724 }
725 
727  ++chunk_index_;
728  if (chunk_num_ < 0) {
729  // initialize chunk_num_
730  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
731  }
732  while (chunk_index_ >= chunk_num_) {
733  ++vertex_chunk_index_;
734  if (vertex_chunk_index_ >= vertex_chunk_num_) {
735  return Status::IndexError(
736  "vertex chunk index ", vertex_chunk_index_,
737  " is out-of-bounds for vertex chunk num ", vertex_chunk_num_,
738  " of edge ", edge_info_->GetEdgeLabel(), " of adj list type ",
739  AdjListTypeToString(adj_list_type_), ", property group ",
740  property_group_, ".");
741  }
742  chunk_index_ = 0;
743  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
744  }
745  seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
746  chunk_table_.reset();
747  return Status::OK();
748 }
749 
751  IdType vertex_chunk_index, IdType chunk_index) {
752  if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
753  vertex_chunk_index_ = vertex_chunk_index;
754  GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
755  chunk_table_.reset();
756  }
757  if (chunk_index_ != chunk_index) {
758  chunk_index_ = chunk_index;
759  seek_offset_ = chunk_index * edge_info_->GetChunkSize();
760  chunk_table_.reset();
761  }
762  return Status::OK();
763 }
764 
765 void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) {
766  filter_options_.filter = filter;
767 }
768 
769 void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
770  filter_options_.columns = column_names;
771 }
772 
773 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
775  const std::shared_ptr<EdgeInfo>& edge_info,
776  const std::shared_ptr<PropertyGroup>& property_group,
777  AdjListType adj_list_type, const std::string& prefix,
778  const util::FilterOptions& options) {
779  if (!edge_info->HasAdjacentListType(adj_list_type)) {
780  return Status::KeyError(
781  "The adjacent list type ", AdjListTypeToString(adj_list_type),
782  " doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
783  }
784  return std::make_shared<AdjListPropertyArrowChunkReader>(
785  edge_info, property_group, adj_list_type, prefix, options);
786 }
787 
788 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
790  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label,
791  const std::string& edge_label, const std::string& dst_label,
792  const std::shared_ptr<PropertyGroup>& property_group,
793  AdjListType adj_list_type, const util::FilterOptions& options) {
794  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
795  if (!edge_info) {
796  return Status::KeyError("The edge ", src_label, " ", edge_label, " ",
797  dst_label, " doesn't exist.");
798  }
799  return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
800  options);
801 }
802 
803 Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
805  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label,
806  const std::string& edge_label, const std::string& dst_label,
807  const std::string& property_name, AdjListType adj_list_type,
808  const util::FilterOptions& options) {
809  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
810  if (!edge_info) {
811  return Status::KeyError("The edge ", src_label, " ", edge_label, " ",
812  dst_label, " doesn't exist.");
813  }
814  auto property_group = edge_info->GetPropertyGroup(property_name);
815  if (!property_group) {
816  return Status::KeyError("The property ", property_name,
817  " doesn't exist in edge ", src_label, " ",
818  edge_label, " ", dst_label, ".");
819  }
820  return Make(edge_info, property_group, adj_list_type, graph_info->GetPrefix(),
821  options);
822 }
823 
824 Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
825  GAR_ASSIGN_OR_RAISE(chunk_num_,
826  util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
827  vertex_chunk_index_));
828  return Status::OK();
829 }
830 
831 } // namespace graphar
The arrow chunk reader for adj list topology chunk.
Definition: chunk_reader.h:158
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
AdjListArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListArrowChunkReader.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
static Result< std::shared_ptr< AdjListArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek_dst(IdType offset)
Sets chunk position indicator for reader by destination vertex id.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
Result< std::shared_ptr< arrow::Array > > GetChunk()
Get the current offset chunk as arrow::Array.
AdjListOffsetArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Initialize the AdjListOffsetArrowChunkReader.
static Result< std::shared_ptr< AdjListOffsetArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, AdjListType adj_list_type, const std::string &prefix)
Create an AdjListOffsetArrowChunkReader instance from edge info.
Status next_chunk()
Sets chunk position indicator to next chunk. if current chunk is the last chunk, will return Status::...
The arrow chunk reader for edge property group chunks.
Definition: chunk_reader.h:355
Status seek_src(IdType id)
Sets chunk position indicator for reader by source vertex id.
Status seek_dst(IdType id)
Sets chunk position indicator for reader by destination vertex id.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
static Result< std::shared_ptr< AdjListPropertyArrowChunkReader > > Make(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string &prefix, const util::FilterOptions &options={})
Create an AdjListPropertyArrowChunkReader instance from edge info.
AdjListPropertyArrowChunkReader(const std::shared_ptr< EdgeInfo > &edge_info, const std::shared_ptr< PropertyGroup > &property_group, AdjListType adj_list_type, const std::string prefix, const util::FilterOptions &options={})
Initialize the AdjListPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
Status outcome object (success or error)
Definition: status.h:123
static Status IndexError(Args &&... args)
Definition: status.h:197
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current arrow chunk table of chunk position indicator.
VertexPropertyArrowChunkReader(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Initialize the VertexPropertyArrowChunkReader.
void Filter(util::Filter filter=nullptr)
Apply the row filter to the table. No parameter call Filter() will clear the filter.
Status next_chunk()
Sets chunk position indicator to next chunk.
Status seek(IdType id)
Sets chunk position indicator for reader by internal vertex id. If internal vertex id is not found,...
void Select(util::ColumnNames column_names=std::nullopt)
Apply the projection to the table to be read. No parameter call Select() will clear the projection.
static Result< std::shared_ptr< VertexPropertyArrowChunkReader > > Make(const std::shared_ptr< VertexInfo > &vertex_info, const std::shared_ptr< PropertyGroup > &property_group, const std::string &prefix, const util::FilterOptions &options={})
Create a VertexPropertyArrowChunkReader instance from vertex info.