20 #include "graphar/high-level/graph_reader.h"
21 #include "graphar/convert_to_arrow_type.h"
22 #include "graphar/types.h"
27 Status CastToAny(std::shared_ptr<arrow::Array> array,
29 if (array->IsNull(0)) {
33 using ArrayType =
typename TypeToArrowType<type>::ArrayType;
34 auto column = std::dynamic_pointer_cast<ArrayType>(array);
35 any = column->GetView(0);
40 Status CastToAny<Type::STRING>(std::shared_ptr<arrow::Array> array,
42 using ArrayType =
typename TypeToArrowType<Type::STRING>::ArrayType;
43 auto column = std::dynamic_pointer_cast<ArrayType>(array);
44 any = column->GetString(0);
48 Status TryToCastToAny(
const std::shared_ptr<DataType>& type,
49 std::shared_ptr<arrow::Array> array,
53 return CastToAny<Type::BOOL>(array, any);
55 return CastToAny<Type::INT32>(array, any);
57 return CastToAny<Type::INT64>(array, any);
59 return CastToAny<Type::FLOAT>(array, any);
61 return CastToAny<Type::DOUBLE>(array, any);
63 return CastToAny<Type::STRING>(array, any);
65 return CastToAny<Type::DATE>(array, any);
67 return CastToAny<Type::TIMESTAMP>(array, any);
75 std::vector<VertexPropertyArrowChunkReader>& readers)
78 for (
auto& reader : readers) {
79 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk_table, reader.GetChunk());
80 auto schema = chunk_table->schema();
81 for (
int i = 0; i < schema->num_fields(); ++i) {
82 auto field = chunk_table->field(i);
83 if (field->type()->id() == arrow::Type::LIST) {
84 auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
85 chunk_table->column(i)->chunk(0));
86 list_properties_[field->name()] = list_array->value_slice(0);
88 auto type = DataType::ArrowDataTypeToDataType(field->type());
89 GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
90 chunk_table->column(i)->chunk(0),
91 properties_[field->name()]));
99 if constexpr (std::is_final<T>::value) {
100 auto it = list_properties_.find(
property);
101 if (it == list_properties_.end()) {
105 auto array = std::dynamic_pointer_cast<
108 const typename T::ValueType* values = array->raw_values();
109 return T(values, array->length());
111 if (properties_.find(
property) == properties_.end()) {
113 " does not exist in the vertex.");
116 if (!properties_.at(
property).has_value())
118 T ret = std::any_cast<T>(properties_.at(
property));
120 }
catch (
const std::bad_any_cast& e) {
122 property,
" is not matched ", e.what());
129 if (properties_.find(
property) == properties_.end()) {
131 " does not exist in the vertex.");
134 if (!properties_.at(
property).has_value())
136 Date ret(std::any_cast<Date::c_type>(properties_.at(
property)));
138 }
catch (
const std::bad_any_cast& e) {
140 " is not matched ", e.what());
146 if (properties_.find(
property) == properties_.end()) {
148 " does not exist in the vertex.");
151 if (!properties_.at(
property).has_value())
153 Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(
property)));
155 }
catch (
const std::bad_any_cast& e) {
157 " is not matched ", e.what());
163 auto it = list_properties_.find(
property);
164 if (it == list_properties_.end()) {
167 auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
168 return StringArray(array->raw_value_offsets(), array->raw_data(),
174 std::vector<AdjListPropertyArrowChunkReader>& property_readers) {
176 GAR_ASSIGN_OR_RAISE_ERROR(
auto adj_list_chunk_table,
178 src_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
179 adj_list_chunk_table->column(0)->chunk(0))
181 dst_id_ = std::dynamic_pointer_cast<arrow::Int64Array>(
182 adj_list_chunk_table->column(1)->chunk(0))
184 for (
auto& reader : property_readers) {
186 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk_table, reader.GetChunk());
187 auto schema = chunk_table->schema();
188 for (
int i = 0; i < schema->num_fields(); ++i) {
189 auto field = chunk_table->field(i);
190 if (field->type()->id() == arrow::Type::LIST) {
191 auto list_array = std::dynamic_pointer_cast<arrow::ListArray>(
192 chunk_table->column(i)->chunk(0));
193 list_properties_[field->name()] = list_array->value_slice(0);
195 auto type = DataType::ArrowDataTypeToDataType(field->type());
196 GAR_RAISE_ERROR_NOT_OK(TryToCastToAny(type,
197 chunk_table->column(i)->chunk(0),
198 properties_[field->name()]));
204 template <
typename T>
206 if constexpr (std::is_final<T>::value) {
207 auto it = list_properties_.find(
property);
208 if (it == list_properties_.end()) {
212 auto array = std::dynamic_pointer_cast<
215 const typename T::ValueType* values = array->raw_values();
216 return T(values, array->length());
218 if (properties_.find(
property) == properties_.end()) {
220 " does not exist in the edge.");
223 if (!properties_.at(
property).has_value())
225 T ret = std::any_cast<T>(properties_.at(
property));
227 }
catch (
const std::bad_any_cast& e) {
229 property,
" is not matched ", e.what());
236 if (properties_.find(
property) == properties_.end()) {
238 " does not exist in the edge.");
241 if (!properties_.at(
property).has_value())
243 Date ret(std::any_cast<Date::c_type>(properties_.at(
property)));
245 }
catch (
const std::bad_any_cast& e) {
247 " is not matched ", e.what());
252 Result<Timestamp>
Edge::property(
const std::string& property)
const {
253 if (properties_.find(
property) == properties_.end()) {
255 " does not exist in the edge.");
258 if (!properties_.at(
property).has_value())
260 Timestamp ret(std::any_cast<Timestamp::c_type>(properties_.at(
property)));
262 }
catch (
const std::bad_any_cast& e) {
264 " is not matched ", e.what());
269 Result<StringArray>
Edge::property(
const std::string& property)
const {
270 auto it = list_properties_.find(
property);
271 if (it == list_properties_.end()) {
274 auto array = std::dynamic_pointer_cast<arrow::StringArray>(it->second);
275 return StringArray(array->raw_value_offsets(), array->raw_data(),
279 #define INSTANTIATE_PROPERTY(T) \
280 template Result<T> Vertex::property<T>(const std::string& name) const; \
281 template Result<T> Edge::property<T>(const std::string& name) const;
283 INSTANTIATE_PROPERTY(
bool)
284 INSTANTIATE_PROPERTY(
const bool&)
285 INSTANTIATE_PROPERTY(int32_t)
286 INSTANTIATE_PROPERTY(
const int32_t&)
287 INSTANTIATE_PROPERTY(Int32Array)
288 INSTANTIATE_PROPERTY(int64_t)
289 INSTANTIATE_PROPERTY(
const int64_t&)
290 INSTANTIATE_PROPERTY(Int64Array)
291 INSTANTIATE_PROPERTY(
float)
292 INSTANTIATE_PROPERTY(
const float&)
293 INSTANTIATE_PROPERTY(FloatArray)
294 INSTANTIATE_PROPERTY(
double)
295 INSTANTIATE_PROPERTY(
const double&)
296 INSTANTIATE_PROPERTY(DoubleArray)
297 INSTANTIATE_PROPERTY(std::string)
298 INSTANTIATE_PROPERTY(
const std::string&)
301 adj_list_reader_.seek(cur_offset_);
302 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk, adj_list_reader_.GetChunk());
303 auto src_column = chunk->column(0);
304 return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
309 adj_list_reader_.
seek(cur_offset_);
310 GAR_ASSIGN_OR_RAISE_ERROR(
auto chunk, adj_list_reader_.
GetChunk());
311 auto src_column = chunk->column(1);
312 return std::dynamic_pointer_cast<arrow::Int64Array>(src_column->chunk(0))
321 if (adj_list_type_ == AdjListType::ordered_by_dest ||
322 adj_list_type_ == AdjListType::unordered_by_dest) {
323 if (from.global_chunk_index_ >= chunk_end_) {
326 if (from.global_chunk_index_ == global_chunk_index_) {
327 cur_offset_ = from.cur_offset_;
328 }
else if (from.global_chunk_index_ < chunk_begin_) {
331 global_chunk_index_ = from.global_chunk_index_;
332 cur_offset_ = from.cur_offset_;
333 vertex_chunk_index_ = from.vertex_chunk_index_;
345 if (adj_list_type_ == AdjListType::unordered_by_source) {
346 IdType expect_chunk_index =
347 index_converter_->IndexPairToGlobalChunkIndex(
id / src_chunk_size_, 0);
348 if (expect_chunk_index > chunk_end_)
350 if (from.global_chunk_index_ >= chunk_end_) {
353 bool need_refresh =
false;
354 if (from.global_chunk_index_ == global_chunk_index_) {
355 cur_offset_ = from.cur_offset_;
356 }
else if (from.global_chunk_index_ < chunk_begin_) {
359 global_chunk_index_ = from.global_chunk_index_;
360 cur_offset_ = from.cur_offset_;
361 vertex_chunk_index_ = from.vertex_chunk_index_;
364 if (global_chunk_index_ < expect_chunk_index) {
365 global_chunk_index_ = expect_chunk_index;
367 vertex_chunk_index_ =
id / src_chunk_size_;
375 if (vertex_chunk_index_ >
id / src_chunk_size_)
383 auto st = offset_reader_->seek(
id);
387 auto maybe_offset_chunk = offset_reader_->GetChunk();
388 if (!maybe_offset_chunk.status().ok()) {
392 std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
393 auto begin_offset =
static_cast<IdType
>(offset_array->Value(0));
394 auto end_offset =
static_cast<IdType
>(offset_array->Value(1));
395 if (begin_offset >= end_offset) {
398 auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
399 auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
400 vertex_chunk_index_of_id, begin_offset / chunk_size_);
401 auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
402 vertex_chunk_index_of_id, end_offset / chunk_size_);
403 if (begin_global_index <= from.global_chunk_index_ &&
404 from.global_chunk_index_ <= end_global_index) {
405 if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
406 global_chunk_index_ = from.global_chunk_index_;
407 cur_offset_ = from.cur_offset_;
408 vertex_chunk_index_ = from.vertex_chunk_index_;
411 }
else if (from.cur_offset_ <= begin_offset) {
412 global_chunk_index_ = begin_global_index;
413 cur_offset_ = begin_offset;
414 vertex_chunk_index_ = vertex_chunk_index_of_id;
420 }
else if (from.global_chunk_index_ < begin_global_index) {
421 global_chunk_index_ = begin_global_index;
422 cur_offset_ = begin_offset;
423 vertex_chunk_index_ = vertex_chunk_index_of_id;
436 if (adj_list_type_ == AdjListType::ordered_by_source ||
437 adj_list_type_ == AdjListType::unordered_by_source) {
438 if (from.global_chunk_index_ >= chunk_end_) {
441 if (from.global_chunk_index_ == global_chunk_index_) {
442 cur_offset_ = from.cur_offset_;
443 }
else if (from.global_chunk_index_ < chunk_begin_) {
446 global_chunk_index_ = from.global_chunk_index_;
447 cur_offset_ = from.cur_offset_;
448 vertex_chunk_index_ = from.vertex_chunk_index_;
460 if (adj_list_type_ == AdjListType::unordered_by_dest) {
461 IdType expect_chunk_index =
462 index_converter_->IndexPairToGlobalChunkIndex(
id / dst_chunk_size_, 0);
463 if (expect_chunk_index > chunk_end_)
465 if (from.global_chunk_index_ >= chunk_end_) {
468 bool need_refresh =
false;
469 if (from.global_chunk_index_ == global_chunk_index_) {
470 cur_offset_ = from.cur_offset_;
471 }
else if (from.global_chunk_index_ < chunk_begin_) {
474 global_chunk_index_ = from.global_chunk_index_;
475 cur_offset_ = from.cur_offset_;
476 vertex_chunk_index_ = from.vertex_chunk_index_;
479 if (global_chunk_index_ < expect_chunk_index) {
480 global_chunk_index_ = expect_chunk_index;
482 vertex_chunk_index_ =
id / dst_chunk_size_;
490 if (vertex_chunk_index_ >
id / dst_chunk_size_)
498 auto st = offset_reader_->seek(
id);
502 auto maybe_offset_chunk = offset_reader_->GetChunk();
503 if (!maybe_offset_chunk.status().ok()) {
507 std::dynamic_pointer_cast<arrow::Int64Array>(maybe_offset_chunk.value());
508 auto begin_offset =
static_cast<IdType
>(offset_array->Value(0));
509 auto end_offset =
static_cast<IdType
>(offset_array->Value(1));
510 if (begin_offset >= end_offset) {
513 auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex();
514 auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex(
515 vertex_chunk_index_of_id, begin_offset / chunk_size_);
516 auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex(
517 vertex_chunk_index_of_id, end_offset / chunk_size_);
518 if (begin_global_index <= from.global_chunk_index_ &&
519 from.global_chunk_index_ <= end_global_index) {
520 if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) {
521 global_chunk_index_ = from.global_chunk_index_;
522 cur_offset_ = from.cur_offset_;
523 vertex_chunk_index_ = from.vertex_chunk_index_;
526 }
else if (from.cur_offset_ <= begin_offset) {
527 global_chunk_index_ = begin_global_index;
528 cur_offset_ = begin_offset;
529 vertex_chunk_index_ = vertex_chunk_index_of_id;
535 }
else if (from.global_chunk_index_ < begin_global_index) {
536 global_chunk_index_ = begin_global_index;
537 cur_offset_ = begin_offset;
538 vertex_chunk_index_ = vertex_chunk_index_of_id;
547 const std::shared_ptr<GraphInfo>& graph_info,
const std::string& src_label,
548 const std::string& edge_label,
const std::string& dst_label,
549 AdjListType adj_list_type,
const IdType vertex_chunk_begin,
550 const IdType vertex_chunk_end) noexcept {
551 auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
554 dst_label,
" doesn't exist.");
556 if (!edge_info->HasAdjacentListType(adj_list_type)) {
558 AdjListTypeToString(adj_list_type),
561 switch (adj_list_type) {
562 case AdjListType::ordered_by_source:
563 return std::make_shared<OBSEdgeCollection>(
564 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
566 case AdjListType::ordered_by_dest:
567 return std::make_shared<OBDEdgesCollection>(
568 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
570 case AdjListType::unordered_by_source:
571 return std::make_shared<UBSEdgesCollection>(
572 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
574 case AdjListType::unordered_by_dest:
575 return std::make_shared<UBDEdgesCollection>(
576 edge_info, graph_info->GetPrefix(), vertex_chunk_begin,
The arrow chunk reader for adj list topology chunk.
Status seek(IdType offset)
Sets chunk position indicator for reader by edge index.
Result< std::shared_ptr< arrow::Table > > GetChunk()
Return the current chunk of chunk position indicator as arrow::Table, if the chunk is empty,...
Edge(AdjListArrowChunkReader &adj_list_reader, std::vector< AdjListPropertyArrowChunkReader > &property_readers)
Result< T > property(const std::string &property) const
Get the property value of the edge.
The iterator for traversing a type of edges.
bool first_dst(const EdgeIter &from, IdType id)
bool first_src(const EdgeIter &from, IdType id)
static Result< std::shared_ptr< EdgesCollection > > Make(const std::shared_ptr< GraphInfo > &graph_info, const std::string &src_label, const std::string &edge_label, const std::string &dst_label, AdjListType adj_list_type, const IdType vertex_chunk_begin=0, const IdType vertex_chunk_end=std::numeric_limits< int64_t >::max()) noexcept
Construct an EdgesCollection from graph info and edge label.
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Result< T > property(const std::string &property) const
Get the property value of the vertex.
Vertex(IdType id, std::vector< VertexPropertyArrowChunkReader > &readers)