20 #include "arrow/api.h"
22 #include "graphar/convert_to_arrow_type.h"
23 #include "graphar/general_params.h"
24 #include "graphar/high-level/edges_builder.h"
25 #include "graphar/result.h"
27 namespace graphar::builder {
31 EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, validate_level_);
33 IdType num_vertex_chunks =
34 (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
35 for (IdType i = 0; i < num_vertex_chunks; i++)
36 if (edges_.find(i) == edges_.end()) {
37 std::vector<Edge> empty_chunk_edges;
38 edges_[i] = empty_chunk_edges;
41 if (adj_list_type_ == AdjListType::ordered_by_source ||
42 adj_list_type_ == AdjListType::ordered_by_dest) {
43 for (
auto& chunk_edges : edges_) {
44 IdType vertex_chunk_index = chunk_edges.first;
46 if (adj_list_type_ == AdjListType::ordered_by_source)
47 sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_src);
48 if (adj_list_type_ == AdjListType::ordered_by_dest)
49 sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_dst);
53 getOffsetTable(vertex_chunk_index, chunk_edges.second));
61 IdType vertex_chunk_num =
62 (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
63 for (IdType vertex_chunk_index = 0; vertex_chunk_index < vertex_chunk_num;
64 vertex_chunk_index++) {
65 if (edges_.find(vertex_chunk_index) == edges_.end()) {
66 GAR_RETURN_NOT_OK(writer.
WriteEdgesNum(vertex_chunk_index, 0));
69 vertex_chunk_index, edges_[vertex_chunk_index].size()));
73 for (
auto& chunk_edges : edges_) {
74 IdType vertex_chunk_index = chunk_edges.first;
76 GAR_ASSIGN_OR_RAISE(
auto input_table, convertToTable(chunk_edges.second));
78 GAR_RETURN_NOT_OK(writer.
WriteTable(input_table, vertex_chunk_index, 0));
79 chunk_edges.second.clear();
86 ValidateLevel validate_level)
const {
88 if (validate_level == ValidateLevel::default_validate)
89 validate_level = validate_level_;
91 if (validate_level == ValidateLevel::no_validate)
98 "The edge builder has been saved, can not add "
99 "new edges any more");
102 if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
104 "Adj list type ", AdjListTypeToString(adj_list_type_),
105 " does not exist in the ", edge_info_->GetEdgeLabel(),
" edge info.");
109 if (validate_level == ValidateLevel::strong_validate) {
112 if (!edge_info_->HasProperty(property.first)) {
114 " is not contained in the ",
115 edge_info_->GetEdgeLabel(),
" edge info.");
118 auto type = edge_info_->GetPropertyType(property.first).value();
119 bool invalid_type =
false;
120 switch (type->id()) {
122 if (property.second.type() !=
123 typeid(
typename TypeToArrowType<Type::BOOL>::CType)) {
128 if (property.second.type() !=
129 typeid(
typename TypeToArrowType<Type::INT32>::CType)) {
134 if (property.second.type() !=
135 typeid(
typename TypeToArrowType<Type::INT64>::CType)) {
140 if (property.second.type() !=
141 typeid(
typename TypeToArrowType<Type::FLOAT>::CType)) {
146 if (property.second.type() !=
147 typeid(
typename TypeToArrowType<Type::DOUBLE>::CType)) {
152 if (property.second.type() !=
153 typeid(
typename TypeToArrowType<Type::STRING>::CType)) {
159 if (property.second.type() !=
160 typeid(
typename TypeToArrowType<Type::DATE>::CType::c_type)) {
164 case Type::TIMESTAMP:
166 if (property.second.type() !=
167 typeid(
typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type)) {
176 "Invalid data type for property ", property.first +
", defined as ",
177 type->ToTypeName(),
", but got ", property.second.type().name());
185 Status EdgesBuilder::tryToAppend(
186 const std::string& property_name,
187 std::shared_ptr<arrow::Array>& array,
188 const std::vector<Edge>& edges) {
189 using CType =
typename TypeToArrowType<type>::CType;
190 arrow::MemoryPool* pool = arrow::default_memory_pool();
191 typename TypeToArrowType<type>::BuilderType builder(pool);
192 for (
const auto& e : edges) {
194 RETURN_NOT_ARROW_OK(builder.AppendNull());
197 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
200 array = builder.Finish().ValueOrDie();
205 Status EdgesBuilder::tryToAppend<Type::TIMESTAMP>(
206 const std::string& property_name,
207 std::shared_ptr<arrow::Array>& array,
208 const std::vector<Edge>& edges) {
209 using CType =
typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type;
210 arrow::MemoryPool* pool = arrow::default_memory_pool();
211 typename TypeToArrowType<Type::TIMESTAMP>::BuilderType builder(
212 arrow::timestamp(arrow::TimeUnit::MILLI), pool);
213 for (
const auto& e : edges) {
215 RETURN_NOT_ARROW_OK(builder.AppendNull());
218 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
221 array = builder.Finish().ValueOrDie();
226 Status EdgesBuilder::tryToAppend<Type::DATE>(
227 const std::string& property_name,
228 std::shared_ptr<arrow::Array>& array,
229 const std::vector<Edge>& edges) {
230 using CType =
typename TypeToArrowType<Type::DATE>::CType::c_type;
231 arrow::MemoryPool* pool = arrow::default_memory_pool();
232 typename TypeToArrowType<Type::DATE>::BuilderType builder(pool);
233 for (
const auto& e : edges) {
235 RETURN_NOT_ARROW_OK(builder.AppendNull());
238 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
241 array = builder.Finish().ValueOrDie();
245 Status EdgesBuilder::appendToArray(
246 const std::shared_ptr<DataType>& type,
const std::string& property_name,
247 std::shared_ptr<arrow::Array>& array,
248 const std::vector<Edge>& edges) {
249 switch (type->id()) {
251 return tryToAppend<Type::BOOL>(property_name, array, edges);
253 return tryToAppend<Type::INT32>(property_name, array, edges);
255 return tryToAppend<Type::INT64>(property_name, array, edges);
257 return tryToAppend<Type::FLOAT>(property_name, array, edges);
259 return tryToAppend<Type::DOUBLE>(property_name, array, edges);
261 return tryToAppend<Type::STRING>(property_name, array, edges);
263 return tryToAppend<Type::DATE>(property_name, array, edges);
264 case Type::TIMESTAMP:
265 return tryToAppend<Type::TIMESTAMP>(property_name, array, edges);
272 Status EdgesBuilder::tryToAppend(
274 std::shared_ptr<arrow::Array>& array,
275 const std::vector<Edge>& edges) {
276 arrow::MemoryPool* pool = arrow::default_memory_pool();
277 typename arrow::TypeTraits<arrow::Int64Type>::BuilderType builder(pool);
278 for (
const auto& e : edges) {
279 RETURN_NOT_ARROW_OK(builder.Append(std::any_cast<int64_t>(
282 array = builder.Finish().ValueOrDie();
286 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::convertToTable(
287 const std::vector<Edge>& edges) {
288 const auto& property_groups = edge_info_->GetPropertyGroups();
289 std::vector<std::shared_ptr<arrow::Array>> arrays;
290 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
292 std::shared_ptr<arrow::Array> array;
293 schema_vector.push_back(arrow::field(
294 GeneralParams::kSrcIndexCol, DataType::DataTypeToArrowDataType(int64())));
295 GAR_RETURN_NOT_OK(tryToAppend(1, array, edges));
296 arrays.push_back(array);
298 schema_vector.push_back(arrow::field(
299 GeneralParams::kDstIndexCol, DataType::DataTypeToArrowDataType(int64())));
300 GAR_RETURN_NOT_OK(tryToAppend(0, array, edges));
301 arrays.push_back(array);
303 for (
auto& property_group : property_groups) {
304 for (
auto& property : property_group->GetProperties()) {
306 schema_vector.push_back(arrow::field(
307 property.name, DataType::DataTypeToArrowDataType(property.type)));
309 std::shared_ptr<arrow::Array> array;
311 appendToArray(property.type, property.name, array, edges));
312 arrays.push_back(array);
315 auto schema = std::make_shared<arrow::Schema>(schema_vector);
316 return arrow::Table::Make(schema, arrays);
319 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::getOffsetTable(
320 IdType vertex_chunk_index,
const std::vector<Edge>& edges) {
321 arrow::Int64Builder builder;
322 IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
323 end_index = begin_index + vertex_chunk_size_;
324 RETURN_NOT_ARROW_OK(builder.Append(0));
326 std::vector<std::shared_ptr<arrow::Array>> arrays;
327 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
328 schema_vector.push_back(arrow::field(
329 GeneralParams::kOffsetCol, DataType::DataTypeToArrowDataType(int64())));
332 for (IdType i = begin_index; i < end_index; i++) {
333 while (index < edges.size()) {
334 int64_t x = (adj_list_type_ == AdjListType::ordered_by_source
335 ? edges[index].GetSource()
336 : edges[index].GetDestination());
343 RETURN_NOT_ARROW_OK(builder.Append(index));
345 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto array, builder.Finish());
346 arrays.push_back(array);
347 auto schema = std::make_shared<arrow::Schema>(schema_vector);
348 return arrow::Table::Make(schema, arrays);
The writer for edge (adj list, offset and property group) chunks.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Edge is designed for constructing edges builder.
const std::any & GetProperty(const std::string &property) const
Get a property of the edge.
const std::unordered_map< std::string, std::any > & GetProperties() const
Get all properties of the edge.
IdType GetSource() const noexcept
Get source id of the edge.
bool Empty() const noexcept
Check if the edge is empty.
IdType GetDestination() const noexcept
Get destination id of the edge.
bool ContainProperty(const std::string &property) const
Check if the edge contains a property.
Status Dump()
Dump the collection into files.