31 #include "graphar/arrow/chunk_reader.h"
32 #include "graphar/filesystem.h"
33 #include "graphar/graph_info.h"
34 #include "graphar/reader_util.h"
35 #include "graphar/types.h"
36 #include "graphar/util.h"
59 std::vector<VertexPropertyArrowChunkReader>& readers);
66 inline IdType
id() const noexcept {
return id_; }
84 if (properties_.find(
property) != properties_.end()) {
85 return properties_.at(
property).has_value();
87 if (list_properties_.find(
property) != list_properties_.end()) {
90 throw std::invalid_argument(
"Property with name " +
property +
91 " does not exist in the vertex.");
96 std::map<std::string, std::any> properties_;
97 std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
112 std::vector<AdjListPropertyArrowChunkReader>&
120 inline IdType
source() const noexcept {
return src_id_; }
135 template <
typename T>
145 if (properties_.find(
property) != properties_.end()) {
146 return properties_.at(
property).has_value();
148 if (list_properties_.find(
property) != list_properties_.end()) {
151 throw std::invalid_argument(
"Property with name " +
property +
152 " does not exist in the edge.");
156 IdType src_id_, dst_id_;
157 std::map<std::string, std::any> properties_;
158 std::map<std::string, std::shared_ptr<arrow::Array>> list_properties_;
174 explicit VertexIter(
const std::shared_ptr<VertexInfo>& vertex_info,
175 const std::string& prefix, IdType offset) noexcept {
176 for (
const auto& pg : vertex_info->GetPropertyGroups()) {
177 readers_.emplace_back(vertex_info, pg, prefix);
179 cur_offset_ = offset;
184 : readers_(other.readers_), cur_offset_(other.cur_offset_) {}
188 for (
auto& reader : readers_) {
189 reader.seek(cur_offset_);
191 return Vertex(cur_offset_, readers_);
195 IdType
id() {
return cur_offset_; }
198 template <
typename T>
200 std::shared_ptr<arrow::ChunkedArray> column(
nullptr);
201 for (
auto& reader : readers_) {
202 reader.seek(cur_offset_);
203 GAR_ASSIGN_OR_RAISE(
auto chunk_table, reader.GetChunk());
204 column = util::GetArrowColumnByName(chunk_table,
property);
205 if (column !=
nullptr) {
209 if (column !=
nullptr) {
210 auto array = util::GetArrowArrayByChunkIndex(column, 0);
211 GAR_ASSIGN_OR_RAISE(
auto data, util::GetArrowArrayData(array));
215 " does not exist in the vertex.");
234 ret.cur_offset_ += offset;
240 cur_offset_ += offset;
246 return cur_offset_ == rhs.cur_offset_;
251 return cur_offset_ != rhs.cur_offset_;
255 std::vector<VertexPropertyArrowChunkReader> readers_;
272 const std::string& prefix)
273 : vertex_info_(std::move(vertex_info)), prefix_(prefix) {
275 std::string base_dir;
276 GAR_ASSIGN_OR_RAISE_ERROR(
auto fs,
277 FileSystemFromUriOrPath(prefix, &base_dir));
278 GAR_ASSIGN_OR_RAISE_ERROR(
auto file_path,
279 vertex_info->GetVerticesNumFilePath());
280 std::string vertex_num_path = base_dir + file_path;
281 GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
282 fs->ReadFileToValue<IdType>(vertex_num_path));
290 return VertexIter(vertex_info_, prefix_, vertex_num_);
297 size_t size() const noexcept {
return vertex_num_; }
305 static Result<std::shared_ptr<VerticesCollection>>
Make(
306 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& label) {
307 auto vertex_info = graph_info->GetVertexInfo(label);
311 return std::make_shared<VerticesCollection>(vertex_info,
312 graph_info->GetPrefix());
316 std::shared_ptr<VertexInfo> vertex_info_;
340 explicit EdgeIter(
const std::shared_ptr<EdgeInfo>& edge_info,
341 const std::string& prefix, AdjListType adj_list_type,
343 IdType chunk_begin, IdType chunk_end,
344 std::shared_ptr<util::IndexConverter> index_converter)
345 : adj_list_reader_(edge_info, adj_list_type, prefix),
348 chunk_size_(edge_info->GetChunkSize()),
349 src_chunk_size_(edge_info->GetSrcChunkSize()),
350 dst_chunk_size_(edge_info->GetDstChunkSize()),
351 num_row_of_chunk_(0),
352 chunk_begin_(chunk_begin),
353 chunk_end_(chunk_end),
354 adj_list_type_(adj_list_type),
355 index_converter_(index_converter) {
356 vertex_chunk_index_ =
359 const auto& property_groups = edge_info->GetPropertyGroups();
360 for (
const auto& pg : property_groups) {
361 property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix),
362 property_readers_.back().seek_chunk_index(vertex_chunk_index_);
364 if (adj_list_type == AdjListType::ordered_by_source ||
365 adj_list_type == AdjListType::ordered_by_dest) {
366 offset_reader_ = std::make_shared<AdjListOffsetArrowChunkReader>(
367 edge_info, adj_list_type, prefix);
373 : adj_list_reader_(other.adj_list_reader_),
374 offset_reader_(other.offset_reader_),
375 property_readers_(other.property_readers_),
376 global_chunk_index_(other.global_chunk_index_),
377 vertex_chunk_index_(other.vertex_chunk_index_),
378 cur_offset_(other.cur_offset_),
379 chunk_size_(other.chunk_size_),
380 src_chunk_size_(other.src_chunk_size_),
381 dst_chunk_size_(other.dst_chunk_size_),
382 num_row_of_chunk_(other.num_row_of_chunk_),
383 chunk_begin_(other.chunk_begin_),
384 chunk_end_(other.chunk_end_),
385 adj_list_type_(other.adj_list_type_),
386 index_converter_(other.index_converter_) {}
390 adj_list_reader_.
seek(cur_offset_);
391 for (
auto& reader : property_readers_) {
392 reader.seek(cur_offset_);
394 return Edge(adj_list_reader_, property_readers_);
404 template <
typename T>
406 std::shared_ptr<arrow::ChunkedArray> column(
nullptr);
407 for (
auto& reader : property_readers_) {
408 reader.seek(cur_offset_);
409 GAR_ASSIGN_OR_RAISE(
auto chunk_table, reader.GetChunk());
410 column = util::GetArrowColumnByName(chunk_table,
property);
411 if (column !=
nullptr) {
415 if (column !=
nullptr) {
416 auto array = util::GetArrowArrayByChunkIndex(column, 0);
417 GAR_ASSIGN_OR_RAISE(
auto data, util::GetArrowArrayData(array));
421 " does not exist in the edge.");
426 if (num_row_of_chunk_ == 0) {
427 adj_list_reader_.
seek(cur_offset_);
428 GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
431 auto st = adj_list_reader_.
seek(++cur_offset_);
432 if (st.ok() && num_row_of_chunk_ != chunk_size_) {
434 auto row_offset = cur_offset_ % chunk_size_;
435 if (row_offset >= num_row_of_chunk_) {
436 cur_offset_ = (cur_offset_ / chunk_size_ + 1) * chunk_size_;
437 adj_list_reader_.
seek(cur_offset_);
442 if (st.ok() && num_row_of_chunk_ == chunk_size_ &&
443 cur_offset_ % chunk_size_ == 0) {
444 GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
446 ++global_chunk_index_;
448 if (st.IsKeyError()) {
450 ++global_chunk_index_;
451 ++vertex_chunk_index_;
452 if (!st.IsIndexError()) {
453 GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
455 for (
auto& reader : property_readers_) {
460 adj_list_reader_.
seek(cur_offset_);
474 adj_list_reader_ = other.adj_list_reader_;
475 offset_reader_ = other.offset_reader_;
476 property_readers_ = other.property_readers_;
477 global_chunk_index_ = other.global_chunk_index_;
478 vertex_chunk_index_ = other.vertex_chunk_index_;
479 cur_offset_ = other.cur_offset_;
480 chunk_size_ = other.chunk_size_;
481 src_chunk_size_ = other.src_chunk_size_;
482 dst_chunk_size_ = other.dst_chunk_size_;
483 num_row_of_chunk_ = other.num_row_of_chunk_;
484 chunk_begin_ = other.chunk_begin_;
485 chunk_end_ = other.chunk_end_;
486 adj_list_type_ = other.adj_list_type_;
487 index_converter_ = other.index_converter_;
493 return global_chunk_index_ == rhs.global_chunk_index_ &&
494 cur_offset_ == rhs.cur_offset_ &&
495 adj_list_type_ == rhs.adj_list_type_;
500 return global_chunk_index_ != rhs.global_chunk_index_ ||
501 cur_offset_ != rhs.cur_offset_ ||
502 adj_list_type_ != rhs.adj_list_type_;
533 global_chunk_index_ = chunk_begin_;
535 vertex_chunk_index_ =
536 index_converter_->GlobalChunkIndexToIndexPair(global_chunk_index_)
542 bool is_end()
const {
return global_chunk_index_ >= chunk_end_; }
548 IdType
id = this->
source();
549 IdType pre_vertex_chunk_index = vertex_chunk_index_;
550 if (adj_list_type_ == AdjListType::ordered_by_source) {
559 if (this->
source() ==
id) {
562 if (adj_list_type_ == AdjListType::unordered_by_source) {
563 if (vertex_chunk_index_ > pre_vertex_chunk_index)
579 IdType pre_vertex_chunk_index = vertex_chunk_index_;
580 if (adj_list_type_ == AdjListType::ordered_by_dest) {
592 if (adj_list_type_ == AdjListType::unordered_by_dest) {
593 if (vertex_chunk_index_ > pre_vertex_chunk_index)
627 adj_list_reader_.
seek(cur_offset_);
628 for (
auto& reader : property_readers_) {
629 reader.seek_chunk_index(vertex_chunk_index_);
631 GAR_ASSIGN_OR_RAISE_ERROR(num_row_of_chunk_,
636 AdjListArrowChunkReader adj_list_reader_;
637 std::shared_ptr<AdjListOffsetArrowChunkReader> offset_reader_;
638 std::vector<AdjListPropertyArrowChunkReader> property_readers_;
639 IdType global_chunk_index_;
640 IdType vertex_chunk_index_;
643 IdType src_chunk_size_;
644 IdType dst_chunk_size_;
645 IdType num_row_of_chunk_;
646 IdType chunk_begin_, chunk_end_;
647 AdjListType adj_list_type_;
648 std::shared_ptr<util::IndexConverter> index_converter_;
650 friend class OBSEdgeCollection;
651 friend class OBDEdgesCollection;
652 friend class UBSEdgesCollection;
653 friend class UBDEdgesCollection;
665 if (begin_ ==
nullptr) {
666 EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_begin_, 0,
667 chunk_begin_, chunk_end_, index_converter_);
668 begin_ = std::make_shared<EdgeIter>(iter);
675 if (end_ ==
nullptr) {
676 EdgeIter iter(edge_info_, prefix_, adj_list_type_, chunk_end_, 0,
677 chunk_begin_, chunk_end_, index_converter_);
678 end_ = std::make_shared<EdgeIter>(iter);
684 virtual size_t size() const noexcept {
return edge_num_; }
718 static Result<std::shared_ptr<EdgesCollection>>
Make(
719 const std::shared_ptr<GraphInfo>& graph_info,
720 const std::string& src_label,
const std::string& edge_label,
721 const std::string& dst_label, AdjListType adj_list_type,
722 const IdType vertex_chunk_begin = 0,
723 const IdType vertex_chunk_end =
724 std::numeric_limits<int64_t>::max()) noexcept;
737 const std::
string& prefix, IdType vertex_chunk_begin,
738 IdType vertex_chunk_end, AdjListType adj_list_type)
739 : edge_info_(std::move(edge_info)),
741 adj_list_type_(adj_list_type) {
742 GAR_ASSIGN_OR_RAISE_ERROR(
743 auto vertex_chunk_num,
744 util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
745 std::vector<IdType> edge_chunk_nums(vertex_chunk_num, 0);
746 if (vertex_chunk_end == std::numeric_limits<int64_t>::max()) {
747 vertex_chunk_end = vertex_chunk_num;
752 for (IdType i = 0; i < vertex_chunk_num; ++i) {
753 GAR_ASSIGN_OR_RAISE_ERROR(
755 util::GetEdgeChunkNum(prefix, edge_info, adj_list_type_, i));
756 if (i < vertex_chunk_begin) {
757 chunk_begin_ += edge_chunk_nums[i];
758 chunk_end_ += edge_chunk_nums[i];
760 if (i >= vertex_chunk_begin && i < vertex_chunk_end) {
761 chunk_end_ += edge_chunk_nums[i];
762 GAR_ASSIGN_OR_RAISE_ERROR(
763 auto chunk_edge_num_,
764 util::GetEdgeNum(prefix, edge_info, adj_list_type_, i));
765 edge_num_ += chunk_edge_num_;
769 std::make_shared<util::IndexConverter>(std::move(edge_chunk_nums));
772 std::shared_ptr<EdgeInfo> edge_info_;
774 AdjListType adj_list_type_;
775 IdType chunk_begin_, chunk_end_;
776 std::shared_ptr<util::IndexConverter> index_converter_;
777 std::shared_ptr<EdgeIter> begin_, end_;
798 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
799 IdType vertex_chunk_begin = 0,
800 IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
801 :
Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
802 AdjListType::ordered_by_source) {}
814 util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_,
id);
815 if (!result.status().ok()) {
818 auto begin_offset = result.value().first;
819 auto end_offset = result.value().second;
820 if (begin_offset >= end_offset) {
823 auto begin_global_chunk_index =
824 index_converter_->IndexPairToGlobalChunkIndex(
825 id / edge_info_->GetSrcChunkSize(),
826 begin_offset / edge_info_->GetChunkSize());
827 auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
828 id / edge_info_->GetSrcChunkSize(),
829 end_offset / edge_info_->GetChunkSize());
830 if (begin_global_chunk_index > from.global_chunk_index_) {
831 return EdgeIter(edge_info_, prefix_, adj_list_type_,
832 begin_global_chunk_index, begin_offset, chunk_begin_,
833 chunk_end_, index_converter_);
834 }
else if (end_global_chunk_index < from.global_chunk_index_) {
837 if (begin_offset > from.cur_offset_) {
838 return EdgeIter(edge_info_, prefix_, adj_list_type_,
839 begin_global_chunk_index, begin_offset, chunk_begin_,
840 chunk_end_, index_converter_);
841 }
else if (end_offset <= from.cur_offset_) {
844 return EdgeIter(edge_info_, prefix_, adj_list_type_,
845 from.global_chunk_index_, from.cur_offset_,
846 chunk_begin_, chunk_end_, index_converter_);
863 while (iter !=
end) {
865 if (edge.destination() ==
id) {
890 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
891 IdType vertex_chunk_begin = 0,
892 IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
893 :
Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
894 AdjListType::ordered_by_dest) {}
907 while (iter !=
end) {
909 if (edge.source() ==
id) {
927 util::GetAdjListOffsetOfVertex(edge_info_, prefix_, adj_list_type_,
id);
928 if (!result.status().ok()) {
931 auto begin_offset = result.value().first;
932 auto end_offset = result.value().second;
933 if (begin_offset >= end_offset) {
936 auto begin_global_chunk_index =
937 index_converter_->IndexPairToGlobalChunkIndex(
938 id / edge_info_->GetDstChunkSize(),
939 begin_offset / edge_info_->GetChunkSize());
940 auto end_global_chunk_index = index_converter_->IndexPairToGlobalChunkIndex(
941 id / edge_info_->GetDstChunkSize(),
942 end_offset / edge_info_->GetChunkSize());
943 if (begin_global_chunk_index > from.global_chunk_index_) {
944 return EdgeIter(edge_info_, prefix_, adj_list_type_,
945 begin_global_chunk_index, begin_offset, chunk_begin_,
946 chunk_end_, index_converter_);
947 }
else if (end_global_chunk_index < from.global_chunk_index_) {
950 if (begin_offset >= from.cur_offset_) {
951 return EdgeIter(edge_info_, prefix_, adj_list_type_,
952 begin_global_chunk_index, begin_offset, chunk_begin_,
953 chunk_end_, index_converter_);
954 }
else if (end_offset <= from.cur_offset_) {
957 return EdgeIter(edge_info_, prefix_, adj_list_type_,
958 from.global_chunk_index_, from.cur_offset_,
959 chunk_begin_, chunk_end_, index_converter_);
982 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
983 IdType vertex_chunk_begin = 0,
984 IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
985 :
Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
986 AdjListType::unordered_by_source) {}
999 while (iter !=
end) {
1001 if (edge.source() ==
id) {
1020 while (iter !=
end) {
1022 if (edge.destination() ==
id) {
1047 const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
1048 IdType vertex_chunk_begin = 0,
1049 IdType vertex_chunk_end = std::numeric_limits<int64_t>::max())
1050 :
Base(edge_info, prefix, vertex_chunk_begin, vertex_chunk_end,
1051 AdjListType::unordered_by_dest) {}
1064 while (iter !=
end) {
1066 if (edge.source() ==
id) {
1085 while (iter !=
end) {
1087 if (edge.destination() ==
id) {
The arrow chunk reader for adj list topology chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< IdType > GetRowNumOfChunk()
Get the number of rows of the current chunk table.
Status seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index=0)
Sets chunk position to the specific vertex chunk and edge chunk.
Status next_chunk()
Sets chunk position indicator to next chunk.
Edge contains information of certain edge.
IdType source() const noexcept
Get source id of the edge.
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
IdType destination() const noexcept
Get destination id of the edge.
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
Result< T > property(const std::string &property) const
Get the property value of the edge.
EdgeInfo is a class to describe the edge information, including the source vertex label,...
The iterator for traversing a type of edges.
IdType cur_offset() const
bool operator!=(const EdgeIter &rhs) const noexcept
EdgeIter(const EdgeIter &other)
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
EdgeIter operator=(const EdgeIter &other)
IdType global_chunk_index() const
bool operator==(const EdgeIter &rhs) const noexcept
Result< T > property(const std::string &property) noexcept
EdgeIter(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, AdjListType adj_list_type, IdType global_chunk_index, IdType offset, IdType chunk_begin, IdType chunk_end, std::shared_ptr< util::IndexConverter > index_converter)
EdgesCollection is designed for reading a collection of edges.
virtual EdgeIter find_dst(IdType id, const EdgeIter &from)=0
virtual EdgeIter find_src(IdType id, const EdgeIter &from)=0
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_label, const std::string &edge_label, const std::string &dst_label, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge label.
virtual size_t size() const noexcept
EdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin, IdType vertex_chunk_end, AdjListType adj_list_type)
Initialize the EdgesCollection with a range of chunks.
Ordered By Destination EdgesCollection implementation.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
OBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBDEdgesCollection with a range of chunks.
Ordered By Source EdgesCollection implementation.
OBSEdgeCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the OBSEdgeCollection with a range of chunks.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
static Status KeyError(Args &&... args)
Unordered By Destination EdgesCollection implementation.
EdgeIter find_src(IdType id, const EdgeIter &from) override
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBDEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
Unordered By Source EdgesCollection implementation.
EdgeIter find_dst(IdType id, const EdgeIter &from) override
UBSEdgesCollection(const std::shared_ptr< EdgeInfo > &edge_info, const std::string &prefix, IdType vertex_chunk_begin=0, IdType vertex_chunk_end=std::numeric_limits< int64_t >::max())
Initialize the EdgesCollection with a range of chunks.
EdgeIter find_src(IdType id, const EdgeIter &from) override
Vertex contains information of certain vertex.
bool IsValid(const std::string &property) const
Return true if value at the property is valid (not null).
IdType id() const noexcept
Get the id of the vertex.
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)
The iterator for traversing a type of vertices.
VertexIter & operator++() noexcept
bool operator==(const VertexIter &rhs) const noexcept
VertexIter(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix, IdType offset) noexcept
Result< T > property(const std::string &property) noexcept
Vertex operator*() noexcept
VertexIter operator++(int)
VertexIter operator+(IdType offset)
VertexIter(const VertexIter &other)
bool operator!=(const VertexIter &rhs) const noexcept
VertexIter & operator+=(IdType offset)
VerticesCollection is designed for reading a collection of vertices.
VertexIter find(IdType id)
size_t size() const noexcept
VertexIter begin() noexcept
VertexIter end() noexcept
static Result< std::shared_ptr< VerticesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &label)
Construct a VerticesCollection from graph info and vertex label.
VerticesCollection(const std::shared_ptr< VertexInfo > &vertex_info, const std::string &prefix)
Initialize the VerticesCollection.