Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
graph_reader.h
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #pragma once
21 
22 #include <any>
23 #include <limits>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <variant>
29 #include <vector>
30 
31 #include "graphar/arrow/chunk_reader.h"
32 #include "graphar/filesystem.h"
33 #include "graphar/graph_info.h"
34 #include "graphar/reader_util.h"
35 #include "graphar/types.h"
36 #include "graphar/util.h"
37 
38 // forward declarations
39 namespace arrow {
40 class ChunkedArray;
41 class Array;
42 } // namespace arrow
43 
44 namespace graphar {
45 
49 class Vertex {
50  public:
57  explicit Vertex(
58  IdType id,
59  std::vector<VertexPropertyArrowChunkReader>& readers); // NOLINT
60 
66  inline IdType id() const noexcept { return id_; }
67 
74  template <typename T>
75  Result<T> property(const std::string& property) const;
76 
83  inline bool IsValid(const std::string& property) const {
84  if (properties_.find(property) != properties_.end()) {
85  return properties_.at(property).has_value();
86  }
87  if (list_properties_.find(property) != list_properties_.end()) {
88  return true;
89  }
90  throw std::invalid_argument("Property with name " + property +
91  " does not exist in the vertex.");
92  }
93 
94  private:
95  IdType id_;
96  std::map<std::string, std::any> properties_;
97  std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
98 };
99 
103 class Edge {
104  public:
111  explicit Edge(AdjListArrowChunkReader& adj_list_reader, // NOLINT
112  std::vector<AdjListPropertyArrowChunkReader>&
113  property_readers); // NOLINT
114 
120  inline IdType source() const noexcept { return src_id_; }
121 
127  inline IdType destination() const noexcept { return dst_id_; }
128 
135  template <typename T>
136  Result<T> property(const std::string& property) const;
137 
144  inline bool IsValid(const std::string& property) const {
145  if (properties_.find(property) != properties_.end()) {
146  return properties_.at(property).has_value();
147  }
148  if (list_properties_.find(property) != list_properties_.end()) {
149  return true;
150  }
151  throw std::invalid_argument("Property with name " + property +
152  " does not exist in the edge.");
153  }
154 
155  private:
156  IdType src_id_, dst_id_;
157  std::map<std::string, std::any> properties_;
158  std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
159 };
160 
165 class VertexIter {
166  public:
174  explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
175  const std::string& prefix, IdType offset) noexcept {
176  for (const auto& pg : vertex_info->GetPropertyGroups()) {
177  readers_.emplace_back(vertex_info, pg, prefix);
178  }
179  cur_offset_ = offset;
180  }
181 
183  VertexIter(const VertexIter& other)
184  : readers_(other.readers_), cur_offset_(other.cur_offset_) {}
185 
187  Vertex operator*() noexcept {
188  for (auto& reader : readers_) {
189  reader.seek(cur_offset_);
190  }
191  return Vertex(cur_offset_, readers_);
192  }
193 
195  IdType id() { return cur_offset_; }
196 
198  template <typename T>
199  Result<T> property(const std::string& property) noexcept {
200  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
201  for (auto& reader : readers_) {
202  reader.seek(cur_offset_);
203  GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
204  column = util::GetArrowColumnByName(chunk_table, property);
205  if (column != nullptr) {
206  break;
207  }
208  }
209  if (column != nullptr) {
210  auto array = util::GetArrowArrayByChunkIndex(column, 0);
211  GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
212  return util::ValueGetter<T>::Value(data, 0);
213  }
214  return Status::KeyError("Property with name ", property,
215  " does not exist in the vertex.");
216  }
217 
219  VertexIter& operator++() noexcept {
220  ++cur_offset_;
221  return *this;
222  }
223 
226  VertexIter ret(*this);
227  ++cur_offset_;
228  return ret;
229  }
230 
232  VertexIter operator+(IdType offset) {
233  VertexIter ret(*this);
234  ret.cur_offset_ += offset;
235  return ret;
236  }
237 
239  VertexIter& operator+=(IdType offset) {
240  cur_offset_ += offset;
241  return *this;
242  }
243 
245  bool operator==(const VertexIter& rhs) const noexcept {
246  return cur_offset_ == rhs.cur_offset_;
247  }
248 
250  bool operator!=(const VertexIter& rhs) const noexcept {
251  return cur_offset_ != rhs.cur_offset_;
252  }
253 
254  private:
255  std::vector<VertexPropertyArrowChunkReader> readers_;
256  IdType cur_offset_;
257 };
258 
264  public:
271  explicit VerticesCollection(const std::shared_ptr<VertexInfo>& vertex_info,
272  const std::string& prefix)
273  : vertex_info_(std::move(vertex_info)), prefix_(prefix) {
274  // get the vertex num
275  std::string base_dir;
276  GAR_ASSIGN_OR_RAISE_ERROR(auto fs,
277  FileSystemFromUriOrPath(prefix, &base_dir));
278  GAR_ASSIGN_OR_RAISE_ERROR(auto file_path,
279  vertex_info->GetVerticesNumFilePath());
280  std::string vertex_num_path = base_dir + file_path;
281  GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
282  fs->ReadFileToValue<IdType>(vertex_num_path));
283  }
284 
286  VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); }
287 
289  VertexIter end() noexcept {
290  return VertexIter(vertex_info_, prefix_, vertex_num_);
291  }
292 
294  VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); }
295 
297  size_t size() const noexcept { return vertex_num_; }
298 
305  static Result<std::shared_ptr<VerticesCollection>> Make(
306  const std::shared_ptr<GraphInfo>& graph_info, const std::string& label) {
307  auto vertex_info = graph_info->GetVertexInfo(label);
308  if (!vertex_info) {
309  return Status::KeyError("The vertex ", label, " doesn't exist.");
310  }
311  return std::make_shared<VerticesCollection>(vertex_info,
312  graph_info->GetPrefix());
313  }
314 
315  private:
316  std::shared_ptr<VertexInfo> vertex_info_;
317  std::string prefix_;
318  IdType vertex_num_;
319 };
320 
325 class EdgeIter {
326  public:
340  explicit EdgeIter(const std::shared_ptr<EdgeInfo>& edge_info,
341  const std::string& prefix, AdjListType adj_list_type,
342  IdType global_chunk_index, IdType offset,
343  IdType chunk_begin, IdType chunk_end,
344  std::shared_ptr<util::IndexConverter> index_converter)
345  : adj_list_reader_(edge_info, adj_list_type, prefix),
346  global_chunk_index_(global_chunk_index),
347  cur_offset_(offset),
348  chunk_size_(edge_info->GetChunkSize()),
349  src_chunk_size_(edge_info->GetSrcChunkSize()),
350  dst_chunk_size_(edge_info->GetDstChunkSize()),
351  num_row_of_chunk_(0),
352  chunk_begin_(chunk_begin),
353  chunk_end_(chunk_end),
354  adj_list_type_(adj_list_type),
355  index_converter_(index_converter) {
356  vertex_chunk_index_ =
357  index_converter->GlobalChunkIndexToIndexPair(global_chunk_index).first;
358  adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
359  const auto& property_groups = edge_info->GetPropertyGroups();
360  for (const auto& pg : property_groups) {
361  property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix),
362  property_readers_.back().seek_chunk_index(vertex_chunk_index_);
363  }
364  if (adj_list_type == AdjListType::ordered_by_source ||
365  adj_list_type == AdjListType::ordered_by_dest) {
366  offset_reader_ = std::make_shared<AdjListOffsetArrowChunkReader>(
367  edge_info, adj_list_type, prefix);
368  }
369  }
370 
372  EdgeIter(const EdgeIter& other)
373  : adj_list_reader_(other.adj_list_reader_),
374  offset_reader_(other.offset_reader_),
375  property_readers_(other.property_readers_),
376  global_chunk_index_(other.global_chunk_index_),
377  vertex_chunk_index_(other.vertex_chunk_index_),
378  cur_offset_(other.cur_offset_),
379  chunk_size_(other.chunk_size_),
380  src_chunk_size_(other.src_chunk_size_),
381  dst_chunk_size_(other.dst_chunk_size_),
382  num_row_of_chunk_(other.num_row_of_chunk_),
383  chunk_begin_(other.chunk_begin_),
384  chunk_end_(other.chunk_end_),
385  adj_list_type_(other.adj_list_type_),
386  index_converter_(other.index_converter_) {}
387 
390  adj_list_reader_.seek(cur_offset_);
391  for (auto& reader : property_readers_) {
392  reader.seek(cur_offset_);
393  }
394  return Edge(adj_list_reader_, property_readers_);
395  }
396 
398  IdType source();
399 
401  IdType destination();
402 
404  template <typename T>
405  Result<T> property(const std::string& property) noexcept {
406  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
407  for (auto& reader : property_readers_) {
408  reader.seek(cur_offset_);
409  GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
410  column = util::GetArrowColumnByName(chunk_table, property);
411  if (column != nullptr) {
412  break;
413  }
414  }
415  if (column != nullptr) {
416  auto array = util::GetArrowArrayByChunkIndex(column, 0);
417  GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
418  return util::ValueGetter<T>::Value(data, 0);
419  }
420  return Status::KeyError("Property with name ", property,
421  " does not exist in the edge.");
422  }
423 
426  if (num_row_of_chunk_ == 0) {
427  adj_list_reader_.seek(cur_offset_);
428  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
429  adj_list_reader_.GetRowNumOfChunk());
430  }
431  auto st = adj_list_reader_.seek(++cur_offset_);
432  if (st.ok() && num_row_of_chunk_ != chunk_size_) {
433  // check the row offset is overflow
434  auto row_offset = cur_offset_ % chunk_size_;
435  if (row_offset >= num_row_of_chunk_) {
436  cur_offset_ = (cur_offset_ / chunk_size_ + 1) * chunk_size_;
437  adj_list_reader_.seek(cur_offset_);
438  st =
439  Status::KeyError("The row offset is overflow, move to next chunk.");
440  }
441  }
442  if (st.ok() && num_row_of_chunk_ == chunk_size_ &&
443  cur_offset_ % chunk_size_ == 0) {
444  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
445  adj_list_reader_.GetRowNumOfChunk());
446  ++global_chunk_index_;
447  }
448  if (st.IsKeyError()) {
449  st = adj_list_reader_.next_chunk();
450  ++global_chunk_index_;
451  ++vertex_chunk_index_;
452  if (!st.IsIndexError()) {
453  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
454  adj_list_reader_.GetRowNumOfChunk());
455  for (auto& reader : property_readers_) {
456  reader.next_chunk();
457  }
458  }
459  cur_offset_ = 0;
460  adj_list_reader_.seek(cur_offset_);
461  }
462  return *this;
463  }
464 
467  EdgeIter ret(*this);
468  this->operator++();
469  return ret;
470  }
471 
473  EdgeIter operator=(const EdgeIter& other) {
474  adj_list_reader_ = other.adj_list_reader_;
475  offset_reader_ = other.offset_reader_;
476  property_readers_ = other.property_readers_;
477  global_chunk_index_ = other.global_chunk_index_;
478  vertex_chunk_index_ = other.vertex_chunk_index_;
479  cur_offset_ = other.cur_offset_;
480  chunk_size_ = other.chunk_size_;
481  src_chunk_size_ = other.src_chunk_size_;
482  dst_chunk_size_ = other.dst_chunk_size_;
483  num_row_of_chunk_ = other.num_row_of_chunk_;
484  chunk_begin_ = other.chunk_begin_;
485  chunk_end_ = other.chunk_end_;
486  adj_list_type_ = other.adj_list_type_;
487  index_converter_ = other.index_converter_;
488  return *this;
489  }
490 
492  bool operator==(const EdgeIter& rhs) const noexcept {
493  return global_chunk_index_ == rhs.global_chunk_index_ &&
494  cur_offset_ == rhs.cur_offset_ &&
495  adj_list_type_ == rhs.adj_list_type_;
496  }
497 
499  bool operator!=(const EdgeIter& rhs) const noexcept {
500  return global_chunk_index_ != rhs.global_chunk_index_ ||
501  cur_offset_ != rhs.cur_offset_ ||
502  adj_list_type_ != rhs.adj_list_type_;
503  }
504 
506  IdType global_chunk_index() const { return global_chunk_index_; }
507 
509  IdType cur_offset() const { return cur_offset_; }
510 
519  bool first_src(const EdgeIter& from, IdType id);
520 
529  bool first_dst(const EdgeIter& from, IdType id);
530 
532  void to_begin() {
533  global_chunk_index_ = chunk_begin_;
534  cur_offset_ = 0;
535  vertex_chunk_index_ =
536  index_converter_->GlobalChunkIndexToIndexPair(global_chunk_index_)
537  .first;
538  refresh();
539  }
540 
542  bool is_end() const { return global_chunk_index_ >= chunk_end_; }
543 
545  bool next_src() {
546  if (is_end())
547  return false;
548  IdType id = this->source();
549  IdType pre_vertex_chunk_index = vertex_chunk_index_;
550  if (adj_list_type_ == AdjListType::ordered_by_source) {
551  this->operator++();
552  if (is_end() || this->source() != id)
553  return false;
554  else
555  return true;
556  }
557  this->operator++();
558  while (!is_end()) {
559  if (this->source() == id) {
560  return true;
561  }
562  if (adj_list_type_ == AdjListType::unordered_by_source) {
563  if (vertex_chunk_index_ > pre_vertex_chunk_index)
564  return false;
565  }
566  this->operator++();
567  }
568  return false;
569  }
570 
575  bool next_dst() {
576  if (is_end())
577  return false;
578  IdType id = this->destination();
579  IdType pre_vertex_chunk_index = vertex_chunk_index_;
580  if (adj_list_type_ == AdjListType::ordered_by_dest) {
581  this->operator++();
582  if (is_end() || this->destination() != id)
583  return false;
584  else
585  return true;
586  }
587  this->operator++();
588  while (!is_end()) {
589  if (this->destination() == id) {
590  return true;
591  }
592  if (adj_list_type_ == AdjListType::unordered_by_dest) {
593  if (vertex_chunk_index_ > pre_vertex_chunk_index)
594  return false;
595  }
596  this->operator++();
597  }
598  return false;
599  }
600 
605  bool next_src(IdType id) {
606  if (is_end())
607  return false;
608  this->operator++();
609  return this->first_src(*this, id);
610  }
611 
616  bool next_dst(IdType id) {
617  if (is_end())
618  return false;
619  this->operator++();
620  return this->first_dst(*this, id);
621  }
622 
623  private:
624  // Refresh the readers to point to the current position.
625  void refresh() {
626  adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
627  adj_list_reader_.seek(cur_offset_);
628  for (auto& reader : property_readers_) {
629  reader.seek_chunk_index(vertex_chunk_index_);
630  }
631  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
632  adj_list_reader_.GetRowNumOfChunk());
633  }
634 
635  private:
636  AdjListArrowChunkReader adj_list_reader_;
637  std::shared_ptr<AdjListOffsetArrowChunkReader> offset_reader_;
638  std::vector<AdjListPropertyArrowChunkReader> property_readers_;
639  IdType global_chunk_index_;
640  IdType vertex_chunk_index_;
641  IdType cur_offset_;
642  IdType chunk_size_;
643  IdType src_chunk_size_;
644  IdType dst_chunk_size_;
645  IdType num_row_of_chunk_;
646  IdType chunk_begin_, chunk_end_;
647  AdjListType adj_list_type_;
648  std::shared_ptr<util::IndexConverter> index_converter_;
649 
650  friend class OBSEdgeCollection;
651  friend class OBDEdgesCollection;
652  friend class UBSEdgesCollection;
653  friend class UBDEdgesCollection;
654 };
655 
660  public:
661  virtual ~EdgesCollection() {}
662 
664  virtual EdgeIter begin() {
665  if (begin_ == nullptr) {
666  EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_begin_, 0,
667  chunk_begin_, chunk_end_, index_converter_);
668  begin_ = std::make_shared<EdgeIter>(iter);
669  }
670  return *begin_;
671  }
672 
674  virtual EdgeIter end() {
675  if (end_ == nullptr) {
676  EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_end_, 0,
677  chunk_begin_, chunk_end_, index_converter_);
678  end_ = std::make_shared<EdgeIter>(iter);
679  }
680  return *end_;
681  }
682 
684  virtual size_t size() const noexcept { return edge_num_; }
685 
694  virtual EdgeIter find_src(IdType id, const EdgeIter& from) = 0;
695 
704  virtual EdgeIter find_dst(IdType id, const EdgeIter& from) = 0;
705 
718  static Result<std::shared_ptr<EdgesCollection>> Make(
719  const std::shared_ptr<GraphInfo>& graph_info,
720  const std::string& src_label, const std::string& edge_label,
721  const std::string& dst_label, AdjListType adj_list_type,
722  const IdType vertex_chunk_begin = 0,
723  const IdType vertex_chunk_end =
724  std::numeric_limits<int64_t>::max()) noexcept;
725 
726  protected:
736  explicit EdgesCollection(const std::shared_ptr<EdgeInfo>& edge_info,
737  const std::string& prefix, IdType vertex_chunk_begin,
738  IdType vertex_chunk_end, AdjListType adj_list_type)
739  : edge_info_(std::move(edge_info)),
740  prefix_(prefix),
741  adj_list_type_(adj_list_type) {
742  GAR_ASSIGN_OR_RAISE_ERROR(
743  auto vertex_chunk_num,
744  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
745  std::vector<IdType> edge_chunk_nums(vertex_chunk_num, 0);
746  if (vertex_chunk_end == std::numeric_limits<int64_t>::max()) {
747  vertex_chunk_end = vertex_chunk_num;
748  }
749  chunk_begin_ = 0;
750  chunk_end_ = 0;
751  edge_num_ = 0;
752  for (IdType i = 0; i < vertex_chunk_num; ++i) {
753  GAR_ASSIGN_OR_RAISE_ERROR(
754  edge_chunk_nums[i],
755  util::GetEdgeChunkNum(prefix, edge_info, adj_list_type_, i));
756  if (i < vertex_chunk_begin) {
757  chunk_begin_ += edge_chunk_nums[i];
758  chunk_end_ += edge_chunk_nums[i];
759  }
760  if (i >= vertex_chunk_begin && i < vertex_chunk_end) {
761  chunk_end_ += edge_chunk_nums[i];
762  GAR_ASSIGN_OR_RAISE_ERROR(
763  auto chunk_edge_num_,
764  util::GetEdgeNum(prefix, edge_info, adj_list_type_, i));
765  edge_num_ += chunk_edge_num_;
766  }
767  }
768  index_converter_ =
769  std::make_shared<util::IndexConverter>(std::move(edge_chunk_nums));
770  }
771 
772  std::shared_ptr<EdgeInfo> edge_info_;
773  std::string prefix_;
774  AdjListType adj_list_type_;
775  IdType chunk_begin_, chunk_end_;
776  std::shared_ptr<util::IndexConverter> index_converter_;
777  std::shared_ptr<EdgeIter> begin_, end_;
778  IdType edge_num_;
779 };
780 
786  using Base = EdgesCollection;
787 
788  public:
798  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
799  IdType vertex_chunk_begin = 0,
800  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
801  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
802  AdjListType::ordered_by_source) {}
803 
812  EdgeIter find_src(IdType id, const EdgeIter& from) override {
813  auto result =
814  util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_, id);
815  if (!result.status().ok()) {
816  return this->end();
817  }
818  auto begin_offset = result.value().first;
819  auto end_offset = result.value().second;
820  if (begin_offset >= end_offset) {
821  return this->end();
822  }
823  auto begin_global_chunk_index =
824  index_converter_->IndexPairToGlobalChunkIndex(
825  id / edge_info_->GetSrcChunkSize(),
826  begin_offset / edge_info_->GetChunkSize());
827  auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
828  id / edge_info_->GetSrcChunkSize(),
829  end_offset / edge_info_->GetChunkSize());
830  if (begin_global_chunk_index > from.global_chunk_index_) {
831  return EdgeIter(edge_info_, prefix_, adj_list_type_,
832  begin_global_chunk_index, begin_offset, chunk_begin_,
833  chunk_end_, index_converter_);
834  } else if (end_global_chunk_index < from.global_chunk_index_) {
835  return this->end();
836  } else {
837  if (begin_offset > from.cur_offset_) {
838  return EdgeIter(edge_info_, prefix_, adj_list_type_,
839  begin_global_chunk_index, begin_offset, chunk_begin_,
840  chunk_end_, index_converter_);
841  } else if (end_offset <= from.cur_offset_) {
842  return this->end();
843  } else {
844  return EdgeIter(edge_info_, prefix_, adj_list_type_,
845  from.global_chunk_index_, from.cur_offset_,
846  chunk_begin_, chunk_end_, index_converter_);
847  }
848  }
849  return this->end();
850  }
851 
860  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
861  EdgeIter iter(from);
862  auto end = this->end();
863  while (iter != end) {
864  auto edge = *iter;
865  if (edge.destination() == id) {
866  break;
867  }
868  ++iter;
869  }
870  return iter;
871  }
872 };
873 
878  using Base = EdgesCollection;
879 
880  public:
890  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
891  IdType vertex_chunk_begin = 0,
892  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
893  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
894  AdjListType::ordered_by_dest) {}
895 
904  EdgeIter find_src(IdType id, const EdgeIter& from) override {
905  EdgeIter iter(from);
906  auto end = this->end();
907  while (iter != end) {
908  auto edge = *iter;
909  if (edge.source() == id) {
910  break;
911  }
912  ++iter;
913  }
914  return iter;
915  }
916 
925  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
926  auto result =
927  util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_, id);
928  if (!result.status().ok()) {
929  return this->end();
930  }
931  auto begin_offset = result.value().first;
932  auto end_offset = result.value().second;
933  if (begin_offset >= end_offset) {
934  return this->end();
935  }
936  auto begin_global_chunk_index =
937  index_converter_->IndexPairToGlobalChunkIndex(
938  id / edge_info_->GetDstChunkSize(),
939  begin_offset / edge_info_->GetChunkSize());
940  auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
941  id / edge_info_->GetDstChunkSize(),
942  end_offset / edge_info_->GetChunkSize());
943  if (begin_global_chunk_index > from.global_chunk_index_) {
944  return EdgeIter(edge_info_, prefix_, adj_list_type_,
945  begin_global_chunk_index, begin_offset, chunk_begin_,
946  chunk_end_, index_converter_);
947  } else if (end_global_chunk_index < from.global_chunk_index_) {
948  return this->end();
949  } else {
950  if (begin_offset >= from.cur_offset_) {
951  return EdgeIter(edge_info_, prefix_, adj_list_type_,
952  begin_global_chunk_index, begin_offset, chunk_begin_,
953  chunk_end_, index_converter_);
954  } else if (end_offset <= from.cur_offset_) {
955  return this->end();
956  } else {
957  return EdgeIter(edge_info_, prefix_, adj_list_type_,
958  from.global_chunk_index_, from.cur_offset_,
959  chunk_begin_, chunk_end_, index_converter_);
960  }
961  }
962  return this->end();
963  }
964 };
965 
970  using Base = EdgesCollection;
971 
972  public:
982  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
983  IdType vertex_chunk_begin = 0,
984  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
985  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
986  AdjListType::unordered_by_source) {}
987 
996  EdgeIter find_src(IdType id, const EdgeIter& from) override {
997  EdgeIter iter(from);
998  auto end = this->end();
999  while (iter != end) {
1000  auto edge = *iter;
1001  if (edge.source() == id) {
1002  break;
1003  }
1004  ++iter;
1005  }
1006  return iter;
1007  }
1008 
1017  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1018  EdgeIter iter(from);
1019  auto end = this->end();
1020  while (iter != end) {
1021  auto edge = *iter;
1022  if (edge.destination() == id) {
1023  break;
1024  }
1025  ++iter;
1026  }
1027  return iter;
1028  }
1029 };
1030 
1035  using Base = EdgesCollection;
1036 
1037  public:
1047  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1048  IdType vertex_chunk_begin = 0,
1049  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
1050  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
1051  AdjListType::unordered_by_dest) {}
1052 
1061  EdgeIter find_src(IdType id, const EdgeIter& from) override {
1062  EdgeIter iter(from);
1063  auto end = this->end();
1064  while (iter != end) {
1065  auto edge = *iter;
1066  if (edge.source() == id) {
1067  break;
1068  }
1069  ++iter;
1070  }
1071  return iter;
1072  }
1073 
1082  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1083  EdgeIter iter(from);
1084  auto end = this->end();
1085  while (iter != end) {
1086  auto edge = *iter;
1087  if (edge.destination() == id) {
1088  break;
1089  }
1090  ++iter;
1091  }
1092  return iter;
1093  }
1094 };
1095 } // namespace graphar
The arrow chunk reader for adj list topology chunk.
Definition: chunk_reader.h:158
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Status next_chunk()
Sets chunk position indicator to next chunk.
Edge contains information of certain edge.
Definition: graph_reader.h:103
IdType source() const noexcept
Get source id of the edge.
Definition: graph_reader.h:120
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
IdType destination() const noexcept
Get destination id of the edge.
Definition: graph_reader.h:127
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
Definition: graph_reader.h:144
Result< T > property(const std::string &property) const
Get the property value of the edge.
EdgeInfo is a class to describe the edge information, including the source vertex label,...
Definition: graph_info.h:382
The iterator for traversing a type of edges.
Definition: graph_reader.h:325
EdgeIter & operator++()
Definition: graph_reader.h:425
IdType cur_offset() const
Definition: graph_reader.h:509
bool operator!=(const EdgeIter &rhs) const noexcept
Definition: graph_reader.h:499
EdgeIter(const EdgeIter &other)
Definition: graph_reader.h:372
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
EdgeIter operator=(const EdgeIter &other)
Definition: graph_reader.h:473
bool next_src(IdType id)
Definition: graph_reader.h:605
IdType global_chunk_index() const
Definition: graph_reader.h:506
bool is_end() const
Definition: graph_reader.h:542
bool next_dst(IdType id)
Definition: graph_reader.h:616
EdgeIter operator++(int)
Definition: graph_reader.h:466
bool operator==(const EdgeIter &rhs) const noexcept
Definition: graph_reader.h:492
Result< T > property(const std::string &property) noexcept
Definition: graph_reader.h:405
EdgeIter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, IdType global_chunk_index, IdType offset, IdType chunk_begin, IdType chunk_end, std::shared_ptr< util::IndexConverter > index_converter)
Definition: graph_reader.h:340
EdgesCollection is designed for reading a collection of edges.
Definition: graph_reader.h:659
virtual EdgeIter find_dst(IdType id, const EdgeIter &from)=0
virtual EdgeIter find_src(IdType id, const EdgeIter &from)=0
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_label, const std::string &edge_label, const std::string &dst_label, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge label.
virtual size_t size() const noexcept
Definition: graph_reader.h:684
virtual EdgeIter begin()
Definition: graph_reader.h:664
EdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin, IdType vertex_chunk_end, AdjListType adj_list_type)
Initialize the EdgesCollection with a range of chunks.
Definition: graph_reader.h:736
virtual EdgeIter end()
Definition: graph_reader.h:674
Ordered By Destination EdgesCollection implementation.
Definition: graph_reader.h:877
EdgeIter find_src(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:904
EdgeIter find_dst(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:925
OBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBDEdgesCollection with a range of chunks.
Definition: graph_reader.h:889
Ordered By Source EdgesCollection implementation.
Definition: graph_reader.h:785
OBSEdgeCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBSEdgeCollection with a range of chunks.
Definition: graph_reader.h:797
EdgeIter find_src(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:812
EdgeIter find_dst(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:860
static Status KeyError(Args &&... args)
Definition: status.h:172
Unordered By Destination EdgesCollection implementation.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
Unordered By Source EdgesCollection implementation.
Definition: graph_reader.h:969
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBSEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
Definition: graph_reader.h:981
EdgeIter find_src(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:996
Vertex contains information of certain vertex.
Definition: graph_reader.h:49
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
Definition: graph_reader.h:83
IdType id() const noexcept
Get the id of the vertex.
Definition: graph_reader.h:66
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Definition: graph_reader.cc:98
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)
Definition: graph_reader.cc:74
The iterator for traversing a type of vertices.
Definition: graph_reader.h:165
VertexIter & operator++() noexcept
Definition: graph_reader.h:219
bool operator==(const VertexIter &rhs) const noexcept
Definition: graph_reader.h:245
VertexIter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, IdType offset) noexcept
Definition: graph_reader.h:174
Result< T > property(const std::string &property) noexcept
Definition: graph_reader.h:199
Vertex operator*() noexcept
Definition: graph_reader.h:187
VertexIter operator++(int)
Definition: graph_reader.h:225
VertexIter operator+(IdType offset)
Definition: graph_reader.h:232
VertexIter(const VertexIter &other)
Definition: graph_reader.h:183
bool operator!=(const VertexIter &rhs) const noexcept
Definition: graph_reader.h:250
VertexIter & operator+=(IdType offset)
Definition: graph_reader.h:239
VerticesCollection is designed for reading a collection of vertices.
Definition: graph_reader.h:263
VertexIter find(IdType id)
Definition: graph_reader.h:294
size_t size() const noexcept
Definition: graph_reader.h:297
VertexIter begin() noexcept
Definition: graph_reader.h:286
VertexIter end() noexcept
Definition: graph_reader.h:289
static Result< std::shared_ptr< VerticesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &label)
Construct a VerticesCollection from graph info and vertex label.
Definition: graph_reader.h:305
VerticesCollection(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix)
Initialize the VerticesCollection.
Definition: graph_reader.h:271