Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
graph_reader.h
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #pragma once
21 
22 #include <any>
23 #include <limits>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "graphar/arrow/chunk_reader.h"
31 #include "graphar/filesystem.h"
32 #include "graphar/graph_info.h"
33 #include "graphar/reader_util.h"
34 #include "graphar/types.h"
35 #include "graphar/util.h"
36 
37 // forward declarations
38 namespace arrow {
39 class ChunkedArray;
40 class Array;
41 } // namespace arrow
42 
43 namespace graphar {
44 
48 class Vertex {
49  public:
56  explicit Vertex(
57  IdType id,
58  std::vector<VertexPropertyArrowChunkReader>& readers); // NOLINT
59 
65  inline IdType id() const noexcept { return id_; }
66 
73  template <typename T>
74  Result<T> property(const std::string& property) const;
75 
80  template <typename T>
81  Result<T> label() const;
82 
89  inline bool IsValid(const std::string& property) const {
90  if (properties_.find(property) != properties_.end()) {
91  return properties_.at(property).has_value();
92  }
93  if (list_properties_.find(property) != list_properties_.end()) {
94  return true;
95  }
96  throw std::invalid_argument("Property with name " + property +
97  " does not exist in the vertex.");
98  }
99 
100  private:
101  IdType id_;
102  std::map<std::string, std::any> properties_;
103  std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
104 };
105 
109 class Edge {
110  public:
117  explicit Edge(AdjListArrowChunkReader& adj_list_reader, // NOLINT
118  std::vector<AdjListPropertyArrowChunkReader>&
119  property_readers); // NOLINT
120 
126  inline IdType source() const noexcept { return src_id_; }
127 
133  inline IdType destination() const noexcept { return dst_id_; }
134 
141  template <typename T>
142  Result<T> property(const std::string& property) const;
143 
150  inline bool IsValid(const std::string& property) const {
151  if (properties_.find(property) != properties_.end()) {
152  return properties_.at(property).has_value();
153  }
154  if (list_properties_.find(property) != list_properties_.end()) {
155  return true;
156  }
157  throw std::invalid_argument("Property with name " + property +
158  " does not exist in the edge.");
159  }
160 
161  private:
162  IdType src_id_, dst_id_;
163  std::map<std::string, std::any> properties_;
164  std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
165 };
166 
171 class VertexIter {
172  public:
180  explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
181  const std::string& prefix, IdType offset,
182  const std::vector<std::string>& labels,
183  const bool& is_filtered = false,
184  const std::vector<IdType>& filtered_ids = {}) noexcept {
185  if (!labels.empty()) {
186  labels_ = labels;
187  label_reader_ =
188  VertexPropertyArrowChunkReader(vertex_info, labels, prefix);
189  }
190  for (const auto& pg : vertex_info->GetPropertyGroups()) {
191  readers_.emplace_back(vertex_info, pg, prefix);
192  }
193  is_filtered_ = is_filtered;
194  filtered_ids_ = filtered_ids;
195  cur_offset_ = offset;
196  }
197 
199  VertexIter(const VertexIter& other)
200  : readers_(other.readers_),
201  cur_offset_(other.cur_offset_),
202  labels_(other.labels_),
203  label_reader_(other.label_reader_),
204  is_filtered_(other.is_filtered_),
205  filtered_ids_(other.filtered_ids_) {}
206 
208  Vertex operator*() noexcept {
209  if (is_filtered_) {
210  for (auto& reader : readers_) {
211  reader.seek(filtered_ids_[cur_offset_]);
212  }
213  } else {
214  for (auto& reader : readers_) {
215  reader.seek(cur_offset_);
216  }
217  }
218 
219  return Vertex(cur_offset_, readers_);
220  }
221 
223  IdType id() {
224  if (is_filtered_) {
225  return filtered_ids_[cur_offset_];
226  } else {
227  return cur_offset_;
228  }
229  }
230 
232  template <typename T>
233  Result<T> property(const std::string& property) noexcept {
234  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
235  if (is_filtered_) {
236  for (auto& reader : readers_) {
237  reader.seek(filtered_ids_[cur_offset_]);
238  GAR_ASSIGN_OR_RAISE(auto chunk_table,
239  reader.GetChunk(graphar::GetChunkVersion::V1));
240  column = util::GetArrowColumnByName(chunk_table, property);
241  if (column != nullptr) {
242  break;
243  }
244  }
245  } else {
246  for (auto& reader : readers_) {
247  reader.seek(cur_offset_);
248  GAR_ASSIGN_OR_RAISE(auto chunk_table,
249  reader.GetChunk(graphar::GetChunkVersion::V1));
250  column = util::GetArrowColumnByName(chunk_table, property);
251  if (column != nullptr) {
252  break;
253  }
254  }
255  }
256 
257  if (column != nullptr) {
258  auto array = util::GetArrowArrayByChunkIndex(column, 0);
259  GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
260  return util::ValueGetter<T>::Value(data, 0);
261  }
262  return Status::KeyError("Property with name ", property,
263  " does not exist in the vertex.");
264  }
265 
267  Result<bool> hasLabel(const std::string& label) noexcept;
268 
270  Result<std::vector<std::string>> label() noexcept;
271 
273  VertexIter& operator++() noexcept {
274  ++cur_offset_;
275  return *this;
276  }
277 
280  VertexIter ret(*this);
281  ++cur_offset_;
282  return ret;
283  }
284 
286  VertexIter operator+(IdType offset) {
287  VertexIter ret(*this);
288  ret.cur_offset_ += offset;
289  return ret;
290  }
291 
293  VertexIter& operator+=(IdType offset) {
294  cur_offset_ += offset;
295  return *this;
296  }
297 
299  bool operator==(const VertexIter& rhs) const noexcept {
300  return cur_offset_ == rhs.cur_offset_;
301  }
302 
304  bool operator!=(const VertexIter& rhs) const noexcept {
305  return cur_offset_ != rhs.cur_offset_;
306  }
307 
308  private:
309  std::vector<VertexPropertyArrowChunkReader> readers_;
310  VertexPropertyArrowChunkReader label_reader_;
311  std::vector<std::string> labels_;
312  IdType cur_offset_;
313  bool is_filtered_;
314  std::vector<IdType> filtered_ids_;
315 };
316 
322  public:
330  explicit VerticesCollection(const std::shared_ptr<VertexInfo>& vertex_info,
331  const std::string& prefix,
332  const bool is_filtered = false,
333  std::vector<IdType> filtered_ids = {})
334  : vertex_info_(vertex_info),
335  prefix_(prefix),
336  labels_(vertex_info->GetLabels()),
337  is_filtered_(is_filtered),
338  filtered_ids_(std::move(filtered_ids)) {
339  // get the vertex num
340  std::string base_dir;
341  GAR_ASSIGN_OR_RAISE_ERROR(auto fs,
342  FileSystemFromUriOrPath(prefix, &base_dir));
343  GAR_ASSIGN_OR_RAISE_ERROR(auto file_path,
344  vertex_info->GetVerticesNumFilePath());
345  std::string vertex_num_path = base_dir + file_path;
346  GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
347  fs->ReadFileToValue<IdType>(vertex_num_path));
348  }
349 
351  VertexIter begin() noexcept {
352  return VertexIter(vertex_info_, prefix_, 0, labels_, is_filtered_,
353  filtered_ids_);
354  }
355 
357  VertexIter end() noexcept {
358  if (is_filtered_)
359  return VertexIter(vertex_info_, prefix_, filtered_ids_.size(), labels_,
360  is_filtered_, filtered_ids_);
361  return VertexIter(vertex_info_, prefix_, vertex_num_, labels_, is_filtered_,
362  filtered_ids_);
363  }
364 
366  VertexIter find(IdType id) {
367  return VertexIter(vertex_info_, prefix_, id, labels_);
368  }
369 
371  size_t size() const noexcept {
372  if (is_filtered_)
373  return filtered_ids_.size();
374  else
375  return vertex_num_;
376  }
377 
379  Result<std::vector<IdType>> filter(
380  const std::vector<std::string>& filter_labels,
381  std::vector<IdType>* new_valid_chunk = nullptr);
382 
383  Result<std::vector<IdType>> filter_by_acero(
384  const std::vector<std::string>& filter_labels) const;
385 
386  Result<std::vector<IdType>> filter(
387  const std::string& property_name,
388  std::shared_ptr<Expression> filter_expression,
389  std::vector<IdType>* new_valid_chunk = nullptr);
390 
401  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
402  const std::string& filter_label,
403  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
404 
405  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabelbyAcero(
406  const std::string& filter_label,
407  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
408 
417  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
418  const std::string& filter_label,
419  const std::shared_ptr<VerticesCollection>& vertices_collection);
420 
431  static Result<std::shared_ptr<VerticesCollection>> verticesWithMultipleLabels(
432  const std::vector<std::string>& filter_labels,
433  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
434 
435  static Result<std::shared_ptr<VerticesCollection>>
436  verticesWithMultipleLabelsbyAcero(
437  const std::vector<std::string>& filter_labels,
438  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
439 
440  static Result<std::shared_ptr<VerticesCollection>> verticesWithProperty(
441  const std::string property_name, const graphar::util::Filter filter,
442  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
443 
444  static Result<std::shared_ptr<VerticesCollection>> verticesWithProperty(
445  const std::string property_name, const graphar::util::Filter filter,
446  const std::shared_ptr<VerticesCollection>& vertices_collection);
447 
456  static Result<std::shared_ptr<VerticesCollection>> verticesWithMultipleLabels(
457  const std::vector<std::string>& filter_labels,
458  const std::shared_ptr<VerticesCollection>& vertices_collection);
459 
466  static Result<std::shared_ptr<VerticesCollection>> Make(
467  const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
468  auto vertex_info = graph_info->GetVertexInfo(type);
469  auto labels = vertex_info->GetLabels();
470  if (!vertex_info) {
471  return Status::KeyError("The vertex ", type, " doesn't exist.");
472  }
473  return std::make_shared<VerticesCollection>(vertex_info,
474  graph_info->GetPrefix());
475  }
476 
477  private:
478  std::shared_ptr<VertexInfo> vertex_info_;
479  std::string prefix_;
480  std::vector<std::string> labels_;
481  bool is_filtered_;
482  std::vector<IdType> filtered_ids_;
483  std::vector<IdType> valid_chunk_;
484  IdType vertex_num_;
485 };
486 
491 class EdgeIter {
492  public:
506  explicit EdgeIter(const std::shared_ptr<EdgeInfo>& edge_info,
507  const std::string& prefix, AdjListType adj_list_type,
508  IdType global_chunk_index, IdType offset,
509  IdType chunk_begin, IdType chunk_end,
510  std::shared_ptr<util::IndexConverter> index_converter)
511  : adj_list_reader_(edge_info, adj_list_type, prefix),
512  global_chunk_index_(global_chunk_index),
513  cur_offset_(offset),
514  chunk_size_(edge_info->GetChunkSize()),
515  src_chunk_size_(edge_info->GetSrcChunkSize()),
516  dst_chunk_size_(edge_info->GetDstChunkSize()),
517  num_row_of_chunk_(0),
518  chunk_begin_(chunk_begin),
519  chunk_end_(chunk_end),
520  adj_list_type_(adj_list_type),
521  index_converter_(index_converter) {
522  vertex_chunk_index_ =
523  index_converter->GlobalChunkIndexToIndexPair(global_chunk_index).first;
524  adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
525  const auto& property_groups = edge_info->GetPropertyGroups();
526  for (const auto& pg : property_groups) {
527  property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix),
528  property_readers_.back().seek_chunk_index(vertex_chunk_index_);
529  }
530  if (adj_list_type == AdjListType::ordered_by_source ||
531  adj_list_type == AdjListType::ordered_by_dest) {
532  offset_reader_ = std::make_shared<AdjListOffsetArrowChunkReader>(
533  edge_info, adj_list_type, prefix);
534  }
535  }
536 
538  EdgeIter(const EdgeIter& other)
539  : adj_list_reader_(other.adj_list_reader_),
540  offset_reader_(other.offset_reader_),
541  property_readers_(other.property_readers_),
542  global_chunk_index_(other.global_chunk_index_),
543  vertex_chunk_index_(other.vertex_chunk_index_),
544  cur_offset_(other.cur_offset_),
545  chunk_size_(other.chunk_size_),
546  src_chunk_size_(other.src_chunk_size_),
547  dst_chunk_size_(other.dst_chunk_size_),
548  num_row_of_chunk_(other.num_row_of_chunk_),
549  chunk_begin_(other.chunk_begin_),
550  chunk_end_(other.chunk_end_),
551  adj_list_type_(other.adj_list_type_),
552  index_converter_(other.index_converter_) {}
553 
556  adj_list_reader_.seek(cur_offset_);
557  for (auto& reader : property_readers_) {
558  reader.seek(cur_offset_);
559  }
560  return Edge(adj_list_reader_, property_readers_);
561  }
562 
564  IdType source();
565 
567  IdType destination();
568 
570  template <typename T>
571  Result<T> property(const std::string& property) noexcept {
572  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
573  for (auto& reader : property_readers_) {
574  reader.seek(cur_offset_);
575  GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
576  column = util::GetArrowColumnByName(chunk_table, property);
577  if (column != nullptr) {
578  break;
579  }
580  }
581  if (column != nullptr) {
582  auto array = util::GetArrowArrayByChunkIndex(column, 0);
583  GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
584  return util::ValueGetter<T>::Value(data, 0);
585  }
586  return Status::KeyError("Property with name ", property,
587  " does not exist in the edge.");
588  }
589 
592  if (num_row_of_chunk_ == 0) {
593  adj_list_reader_.seek(cur_offset_);
594  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
595  adj_list_reader_.GetRowNumOfChunk());
596  }
597  auto st = adj_list_reader_.seek(++cur_offset_);
598  if (st.ok() && num_row_of_chunk_ != chunk_size_) {
599  // check the row offset is overflow
600  auto row_offset = cur_offset_ % chunk_size_;
601  if (row_offset >= num_row_of_chunk_) {
602  cur_offset_ = (cur_offset_ / chunk_size_ + 1) * chunk_size_;
603  adj_list_reader_.seek(cur_offset_);
604  st =
605  Status::KeyError("The row offset is overflow, move to next chunk.");
606  }
607  }
608  if (st.ok() && num_row_of_chunk_ == chunk_size_ &&
609  cur_offset_ % chunk_size_ == 0) {
610  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
611  adj_list_reader_.GetRowNumOfChunk());
612  ++global_chunk_index_;
613  // The reader also need to be updated at the boundaries of chunks of size
614  // chunk_size.
615  for (auto& reader : property_readers_) {
616  reader.next_chunk();
617  }
618  }
619  if (st.IsKeyError()) {
620  st = adj_list_reader_.next_chunk();
621  ++global_chunk_index_;
622  ++vertex_chunk_index_;
623  if (!st.IsIndexError()) {
624  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
625  adj_list_reader_.GetRowNumOfChunk());
626  for (auto& reader : property_readers_) {
627  reader.next_chunk();
628  }
629  }
630  cur_offset_ = 0;
631  adj_list_reader_.seek(cur_offset_);
632  }
633  return *this;
634  }
635 
638  EdgeIter ret(*this);
639  this->operator++();
640  return ret;
641  }
642 
644  EdgeIter operator=(const EdgeIter& other) {
645  adj_list_reader_ = other.adj_list_reader_;
646  offset_reader_ = other.offset_reader_;
647  property_readers_ = other.property_readers_;
648  global_chunk_index_ = other.global_chunk_index_;
649  vertex_chunk_index_ = other.vertex_chunk_index_;
650  cur_offset_ = other.cur_offset_;
651  chunk_size_ = other.chunk_size_;
652  src_chunk_size_ = other.src_chunk_size_;
653  dst_chunk_size_ = other.dst_chunk_size_;
654  num_row_of_chunk_ = other.num_row_of_chunk_;
655  chunk_begin_ = other.chunk_begin_;
656  chunk_end_ = other.chunk_end_;
657  adj_list_type_ = other.adj_list_type_;
658  index_converter_ = other.index_converter_;
659  return *this;
660  }
661 
663  bool operator==(const EdgeIter& rhs) const noexcept {
664  return global_chunk_index_ == rhs.global_chunk_index_ &&
665  cur_offset_ == rhs.cur_offset_ &&
666  adj_list_type_ == rhs.adj_list_type_;
667  }
668 
670  bool operator!=(const EdgeIter& rhs) const noexcept {
671  return global_chunk_index_ != rhs.global_chunk_index_ ||
672  cur_offset_ != rhs.cur_offset_ ||
673  adj_list_type_ != rhs.adj_list_type_;
674  }
675 
677  IdType global_chunk_index() const { return global_chunk_index_; }
678 
680  IdType cur_offset() const { return cur_offset_; }
681 
690  bool first_src(const EdgeIter& from, IdType id);
691 
700  bool first_dst(const EdgeIter& from, IdType id);
701 
703  void to_begin() {
704  global_chunk_index_ = chunk_begin_;
705  cur_offset_ = 0;
706  vertex_chunk_index_ =
707  index_converter_->GlobalChunkIndexToIndexPair(global_chunk_index_)
708  .first;
709  refresh();
710  }
711 
713  bool is_end() const { return global_chunk_index_ >= chunk_end_; }
714 
716  bool next_src() {
717  if (is_end())
718  return false;
719  IdType id = this->source();
720  IdType pre_vertex_chunk_index = vertex_chunk_index_;
721  if (adj_list_type_ == AdjListType::ordered_by_source) {
722  this->operator++();
723  if (is_end() || this->source() != id)
724  return false;
725  else
726  return true;
727  }
728  this->operator++();
729  while (!is_end()) {
730  if (this->source() == id) {
731  return true;
732  }
733  if (adj_list_type_ == AdjListType::unordered_by_source) {
734  if (vertex_chunk_index_ > pre_vertex_chunk_index)
735  return false;
736  }
737  this->operator++();
738  }
739  return false;
740  }
741 
746  bool next_dst() {
747  if (is_end())
748  return false;
749  IdType id = this->destination();
750  IdType pre_vertex_chunk_index = vertex_chunk_index_;
751  if (adj_list_type_ == AdjListType::ordered_by_dest) {
752  this->operator++();
753  if (is_end() || this->destination() != id)
754  return false;
755  else
756  return true;
757  }
758  this->operator++();
759  while (!is_end()) {
760  if (this->destination() == id) {
761  return true;
762  }
763  if (adj_list_type_ == AdjListType::unordered_by_dest) {
764  if (vertex_chunk_index_ > pre_vertex_chunk_index)
765  return false;
766  }
767  this->operator++();
768  }
769  return false;
770  }
771 
776  bool next_src(IdType id) {
777  if (is_end())
778  return false;
779  this->operator++();
780  return this->first_src(*this, id);
781  }
782 
787  bool next_dst(IdType id) {
788  if (is_end())
789  return false;
790  this->operator++();
791  return this->first_dst(*this, id);
792  }
793 
794  private:
795  // Refresh the readers to point to the current position.
796  void refresh() {
797  adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
798  adj_list_reader_.seek(cur_offset_);
799  for (auto& reader : property_readers_) {
800  reader.seek_chunk_index(vertex_chunk_index_);
801  }
802  GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
803  adj_list_reader_.GetRowNumOfChunk());
804  }
805 
806  private:
807  AdjListArrowChunkReader adj_list_reader_;
808  std::shared_ptr<AdjListOffsetArrowChunkReader> offset_reader_;
809  std::vector<AdjListPropertyArrowChunkReader> property_readers_;
810  IdType global_chunk_index_;
811  IdType vertex_chunk_index_;
812  IdType cur_offset_;
813  IdType chunk_size_;
814  IdType src_chunk_size_;
815  IdType dst_chunk_size_;
816  IdType num_row_of_chunk_;
817  IdType chunk_begin_, chunk_end_;
818  AdjListType adj_list_type_;
819  std::shared_ptr<util::IndexConverter> index_converter_;
820 
821  friend class OBSEdgeCollection;
822  friend class OBDEdgesCollection;
823  friend class UBSEdgesCollection;
824  friend class UBDEdgesCollection;
825 };
826 
831  public:
832  virtual ~EdgesCollection() {}
833 
835  virtual EdgeIter begin() {
836  if (begin_ == nullptr) {
837  EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_begin_, 0,
838  chunk_begin_, chunk_end_, index_converter_);
839  begin_ = std::make_shared<EdgeIter>(iter);
840  }
841  return *begin_;
842  }
843 
845  virtual EdgeIter end() {
846  if (end_ == nullptr) {
847  EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_end_, 0,
848  chunk_begin_, chunk_end_, index_converter_);
849  end_ = std::make_shared<EdgeIter>(iter);
850  }
851  return *end_;
852  }
853 
855  virtual size_t size() const noexcept { return edge_num_; }
856 
865  virtual EdgeIter find_src(IdType id, const EdgeIter& from) = 0;
866 
875  virtual EdgeIter find_dst(IdType id, const EdgeIter& from) = 0;
876 
889  static Result<std::shared_ptr<EdgesCollection>> Make(
890  const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
891  const std::string& edge_type, const std::string& dst_type,
892  AdjListType adj_list_type, const IdType vertex_chunk_begin = 0,
893  const IdType vertex_chunk_end =
894  std::numeric_limits<int64_t>::max()) noexcept;
895 
896  protected:
906  explicit EdgesCollection(const std::shared_ptr<EdgeInfo>& edge_info,
907  const std::string& prefix, IdType vertex_chunk_begin,
908  IdType vertex_chunk_end, AdjListType adj_list_type)
909  : edge_info_(edge_info), prefix_(prefix), adj_list_type_(adj_list_type) {
910  GAR_ASSIGN_OR_RAISE_ERROR(
911  auto vertex_chunk_num,
912  util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
913  std::vector<IdType> edge_chunk_nums(vertex_chunk_num, 0);
914  if (vertex_chunk_end == std::numeric_limits<int64_t>::max()) {
915  vertex_chunk_end = vertex_chunk_num;
916  }
917  chunk_begin_ = 0;
918  chunk_end_ = 0;
919  edge_num_ = 0;
920  for (IdType i = 0; i < vertex_chunk_num; ++i) {
921  GAR_ASSIGN_OR_RAISE_ERROR(
922  edge_chunk_nums[i],
923  util::GetEdgeChunkNum(prefix, edge_info, adj_list_type_, i));
924  if (i < vertex_chunk_begin) {
925  chunk_begin_ += edge_chunk_nums[i];
926  chunk_end_ += edge_chunk_nums[i];
927  }
928  if (i >= vertex_chunk_begin && i < vertex_chunk_end) {
929  chunk_end_ += edge_chunk_nums[i];
930  GAR_ASSIGN_OR_RAISE_ERROR(
931  auto chunk_edge_num_,
932  util::GetEdgeNum(prefix, edge_info, adj_list_type_, i));
933  edge_num_ += chunk_edge_num_;
934  }
935  }
936  index_converter_ =
937  std::make_shared<util::IndexConverter>(std::move(edge_chunk_nums));
938  }
939 
940  std::shared_ptr<EdgeInfo> edge_info_;
941  std::string prefix_;
942  AdjListType adj_list_type_;
943  IdType chunk_begin_, chunk_end_;
944  std::shared_ptr<util::IndexConverter> index_converter_;
945  std::shared_ptr<EdgeIter> begin_, end_;
946  IdType edge_num_;
947 };
948 
954  using Base = EdgesCollection;
955 
956  public:
966  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
967  IdType vertex_chunk_begin = 0,
968  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
969  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
970  AdjListType::ordered_by_source) {}
971 
980  EdgeIter find_src(IdType id, const EdgeIter& from) override {
981  auto result =
982  util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_, id);
983  if (!result.status().ok()) {
984  return this->end();
985  }
986  auto begin_offset = result.value().first;
987  auto end_offset = result.value().second;
988  if (begin_offset >= end_offset) {
989  return this->end();
990  }
991  auto begin_global_chunk_index =
992  index_converter_->IndexPairToGlobalChunkIndex(
993  id / edge_info_->GetSrcChunkSize(),
994  begin_offset / edge_info_->GetChunkSize());
995  auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
996  id / edge_info_->GetSrcChunkSize(),
997  end_offset / edge_info_->GetChunkSize());
998  if (begin_global_chunk_index > from.global_chunk_index_) {
999  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1000  begin_global_chunk_index, begin_offset, chunk_begin_,
1001  chunk_end_, index_converter_);
1002  } else if (end_global_chunk_index < from.global_chunk_index_) {
1003  return this->end();
1004  } else {
1005  if (begin_offset > from.cur_offset_) {
1006  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1007  begin_global_chunk_index, begin_offset, chunk_begin_,
1008  chunk_end_, index_converter_);
1009  } else if (end_offset <= from.cur_offset_) {
1010  return this->end();
1011  } else {
1012  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1013  from.global_chunk_index_, from.cur_offset_,
1014  chunk_begin_, chunk_end_, index_converter_);
1015  }
1016  }
1017  return this->end();
1018  }
1019 
1028  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1029  EdgeIter iter(from);
1030  auto end = this->end();
1031  while (iter != end) {
1032  auto edge = *iter;
1033  if (edge.destination() == id) {
1034  break;
1035  }
1036  ++iter;
1037  }
1038  return iter;
1039  }
1040 };
1041 
1046  using Base = EdgesCollection;
1047 
1048  public:
1058  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1059  IdType vertex_chunk_begin = 0,
1060  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
1061  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
1062  AdjListType::ordered_by_dest) {}
1063 
1072  EdgeIter find_src(IdType id, const EdgeIter& from) override {
1073  EdgeIter iter(from);
1074  auto end = this->end();
1075  while (iter != end) {
1076  auto edge = *iter;
1077  if (edge.source() == id) {
1078  break;
1079  }
1080  ++iter;
1081  }
1082  return iter;
1083  }
1084 
1093  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1094  auto result =
1095  util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_, id);
1096  if (!result.status().ok()) {
1097  return this->end();
1098  }
1099  auto begin_offset = result.value().first;
1100  auto end_offset = result.value().second;
1101  if (begin_offset >= end_offset) {
1102  return this->end();
1103  }
1104  auto begin_global_chunk_index =
1105  index_converter_->IndexPairToGlobalChunkIndex(
1106  id / edge_info_->GetDstChunkSize(),
1107  begin_offset / edge_info_->GetChunkSize());
1108  auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
1109  id / edge_info_->GetDstChunkSize(),
1110  end_offset / edge_info_->GetChunkSize());
1111  if (begin_global_chunk_index > from.global_chunk_index_) {
1112  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1113  begin_global_chunk_index, begin_offset, chunk_begin_,
1114  chunk_end_, index_converter_);
1115  } else if (end_global_chunk_index < from.global_chunk_index_) {
1116  return this->end();
1117  } else {
1118  if (begin_offset >= from.cur_offset_) {
1119  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1120  begin_global_chunk_index, begin_offset, chunk_begin_,
1121  chunk_end_, index_converter_);
1122  } else if (end_offset <= from.cur_offset_) {
1123  return this->end();
1124  } else {
1125  return EdgeIter(edge_info_, prefix_, adj_list_type_,
1126  from.global_chunk_index_, from.cur_offset_,
1127  chunk_begin_, chunk_end_, index_converter_);
1128  }
1129  }
1130  return this->end();
1131  }
1132 };
1133 
1138  using Base = EdgesCollection;
1139 
1140  public:
1150  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1151  IdType vertex_chunk_begin = 0,
1152  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
1153  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
1154  AdjListType::unordered_by_source) {}
1155 
1164  EdgeIter find_src(IdType id, const EdgeIter& from) override {
1165  EdgeIter iter(from);
1166  auto end = this->end();
1167  while (iter != end) {
1168  auto edge = *iter;
1169  if (edge.source() == id) {
1170  break;
1171  }
1172  ++iter;
1173  }
1174  return iter;
1175  }
1176 
1185  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1186  EdgeIter iter(from);
1187  auto end = this->end();
1188  while (iter != end) {
1189  auto edge = *iter;
1190  if (edge.destination() == id) {
1191  break;
1192  }
1193  ++iter;
1194  }
1195  return iter;
1196  }
1197 };
1198 
1203  using Base = EdgesCollection;
1204 
1205  public:
1215  const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
1216  IdType vertex_chunk_begin = 0,
1217  IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
1218  : Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
1219  AdjListType::unordered_by_dest) {}
1220 
1229  EdgeIter find_src(IdType id, const EdgeIter& from) override {
1230  EdgeIter iter(from);
1231  auto end = this->end();
1232  while (iter != end) {
1233  auto edge = *iter;
1234  if (edge.source() == id) {
1235  break;
1236  }
1237  ++iter;
1238  }
1239  return iter;
1240  }
1241 
1250  EdgeIter find_dst(IdType id, const EdgeIter& from) override {
1251  EdgeIter iter(from);
1252  auto end = this->end();
1253  while (iter != end) {
1254  auto edge = *iter;
1255  if (edge.destination() == id) {
1256  break;
1257  }
1258  ++iter;
1259  }
1260  return iter;
1261  }
1262 };
1263 } // namespace graphar
The arrow chunk reader for adj list topology chunk.
Definition: chunk_reader.h:277
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Status next_chunk()
Sets chunk position indicator to next chunk.
Edge contains information of certain edge.
Definition: graph_reader.h:109
IdType source() const noexcept
Get source id of the edge.
Definition: graph_reader.h:126
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
IdType destination() const noexcept
Get destination id of the edge.
Definition: graph_reader.h:133
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
Definition: graph_reader.h:150
Result< T > property(const std::string &property) const
Get the property value of the edge.
EdgeInfo is a class to describe the edge information, including the source vertex type,...
Definition: graph_info.h:408
The iterator for traversing a type of edges.
Definition: graph_reader.h:491
EdgeIter & operator++()
Definition: graph_reader.h:591
IdType cur_offset() const
Definition: graph_reader.h:680
bool operator!=(const EdgeIter &rhs) const noexcept
Definition: graph_reader.h:670
EdgeIter(const EdgeIter &other)
Definition: graph_reader.h:538
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
EdgeIter operator=(const EdgeIter &other)
Definition: graph_reader.h:644
bool next_src(IdType id)
Definition: graph_reader.h:776
IdType global_chunk_index() const
Definition: graph_reader.h:677
bool is_end() const
Definition: graph_reader.h:713
bool next_dst(IdType id)
Definition: graph_reader.h:787
EdgeIter operator++(int)
Definition: graph_reader.h:637
bool operator==(const EdgeIter &rhs) const noexcept
Definition: graph_reader.h:663
Result< T > property(const std::string &property) noexcept
Definition: graph_reader.h:571
EdgeIter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, IdType global_chunk_index, IdType offset, IdType chunk_begin, IdType chunk_end, std::shared_ptr< util::IndexConverter > index_converter)
Definition: graph_reader.h:506
EdgesCollection is designed for reading a collection of edges.
Definition: graph_reader.h:830
virtual EdgeIter find_dst(IdType id, const EdgeIter &from)=0
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_type, const std::string &edge_type, const std::string &dst_type, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge type.
virtual EdgeIter find_src(IdType id, const EdgeIter &from)=0
virtual size_t size() const noexcept
Definition: graph_reader.h:855
virtual EdgeIter begin()
Definition: graph_reader.h:835
EdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin, IdType vertex_chunk_end, AdjListType adj_list_type)
Initialize the EdgesCollection with a range of chunks.
Definition: graph_reader.h:906
virtual EdgeIter end()
Definition: graph_reader.h:845
Ordered By Destination EdgesCollection implementation.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
OBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBDEdgesCollection with a range of chunks.
Ordered By Source EdgesCollection implementation.
Definition: graph_reader.h:953
OBSEdgeCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBSEdgeCollection with a range of chunks.
Definition: graph_reader.h:965
EdgeIter find_src(IdType id, const EdgeIter &from) override
Definition: graph_reader.h:980
EdgeIter find_dst(IdType id, const EdgeIter &from) override
static Status KeyError(Args &&... args)
Definition: status.h:172
Unordered By Destination EdgesCollection implementation.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
Unordered By Source EdgesCollection implementation.
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBSEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
EdgeIter find_src(IdType id, const EdgeIter &from) override
Vertex contains information of certain vertex.
Definition: graph_reader.h:48
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
Definition: graph_reader.h:89
IdType id() const noexcept
Get the id of the vertex.
Definition: graph_reader.h:65
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Result< T > label() const
Get the label of the vertex.
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)
Definition: graph_reader.cc:79
The iterator for traversing a type of vertices.
Definition: graph_reader.h:171
Result< std::vector< std::string > > label() noexcept
Result< bool > hasLabel(const std::string &label) noexcept
VertexIter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, IdType offset, const std::vector< std::string > &labels, const bool &is_filtered=false, const std::vector< IdType > &filtered_ids={}) noexcept
Definition: graph_reader.h:180
bool operator==(const VertexIter &rhs) const noexcept
Definition: graph_reader.h:299
Result< T > property(const std::string &property) noexcept
Definition: graph_reader.h:233
Vertex operator*() noexcept
Definition: graph_reader.h:208
VertexIter operator++(int)
Definition: graph_reader.h:279
VertexIter operator+(IdType offset)
Definition: graph_reader.h:286
VertexIter(const VertexIter &other)
Definition: graph_reader.h:199
bool operator!=(const VertexIter &rhs) const noexcept
Definition: graph_reader.h:304
VertexIter & operator+=(IdType offset)
Definition: graph_reader.h:293
The arrow chunk reader for vertex property group.
Definition: chunk_reader.h:43
VerticesCollection is designed for reading a collection of vertices.
Definition: graph_reader.h:321
static Result< std::shared_ptr< VerticesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Construct a VerticesCollection from graph info and vertex label.
Definition: graph_reader.h:466
VertexIter find(IdType id)
Definition: graph_reader.h:366
static Result< std::shared_ptr< VerticesCollection > > verticesWithMultipleLabels(const std::vector< std::string > &filter_labels, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with multiple labels.
static Result< std::shared_ptr< VerticesCollection > > verticesWithLabel(const std::string &filter_label, const std::shared_ptr< GraphInfo > &graph_info, const std::string &type)
Query vertices with a specific label.
size_t size() const noexcept
Definition: graph_reader.h:371
VertexIter begin() noexcept
Definition: graph_reader.h:351
VertexIter end() noexcept
Definition: graph_reader.h:357
Result< std::vector< IdType > > filter(const std::vector< std::string > &filter_labels, std::vector< IdType > *new_valid_chunk=nullptr)
VerticesCollection(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, const bool is_filtered=false, std::vector< IdType > filtered_ids={})
Initialize the VerticesCollection.
Definition: graph_reader.h:330