Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
edges_builder.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include "arrow/api.h"
21 
22 #include "graphar/convert_to_arrow_type.h"
23 #include "graphar/general_params.h"
24 #include "graphar/high-level/edges_builder.h"
25 #include "graphar/result.h"
26 
27 namespace graphar::builder {
28 
30  // construct the writer
31  EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, validate_level_);
32  // construct empty edge collections for vertex chunks without edges
33  IdType num_vertex_chunks =
34  (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
35  for (IdType i = 0; i < num_vertex_chunks; i++)
36  if (edges_.find(i) == edges_.end()) {
37  std::vector<Edge> empty_chunk_edges;
38  edges_[i] = empty_chunk_edges;
39  }
40  // dump the offsets
41  if (adj_list_type_ == AdjListType::ordered_by_source ||
42  adj_list_type_ == AdjListType::ordered_by_dest) {
43  for (auto& chunk_edges : edges_) {
44  IdType vertex_chunk_index = chunk_edges.first;
45  // sort the edges
46  if (adj_list_type_ == AdjListType::ordered_by_source)
47  sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_src);
48  if (adj_list_type_ == AdjListType::ordered_by_dest)
49  sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_dst);
50  // construct and write offset chunk
51  GAR_ASSIGN_OR_RAISE(
52  auto offset_table,
53  getOffsetTable(vertex_chunk_index, chunk_edges.second));
54  GAR_RETURN_NOT_OK(
55  writer.WriteOffsetChunk(offset_table, vertex_chunk_index));
56  }
57  }
58  // dump the vertex num
59  GAR_RETURN_NOT_OK(writer.WriteVerticesNum(num_vertices_));
60  // dump the edge nums
61  IdType vertex_chunk_num =
62  (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
63  for (IdType vertex_chunk_index = 0; vertex_chunk_index < vertex_chunk_num;
64  vertex_chunk_index++) {
65  if (edges_.find(vertex_chunk_index) == edges_.end()) {
66  GAR_RETURN_NOT_OK(writer.WriteEdgesNum(vertex_chunk_index, 0));
67  } else {
68  GAR_RETURN_NOT_OK(writer.WriteEdgesNum(
69  vertex_chunk_index, edges_[vertex_chunk_index].size()));
70  }
71  }
72  // dump the edges
73  for (auto& chunk_edges : edges_) {
74  IdType vertex_chunk_index = chunk_edges.first;
75  // convert to table
76  GAR_ASSIGN_OR_RAISE(auto input_table, convertToTable(chunk_edges.second));
77  // write table
78  GAR_RETURN_NOT_OK(writer.WriteTable(input_table, vertex_chunk_index, 0));
79  chunk_edges.second.clear();
80  }
81  is_saved_ = true;
82  return Status::OK();
83 }
84 
85 Status EdgesBuilder::validate(const Edge& e,
86  ValidateLevel validate_level) const {
87  // use the builder's validate level
88  if (validate_level == ValidateLevel::default_validate)
89  validate_level = validate_level_;
90  // no validate
91  if (validate_level == ValidateLevel::no_validate)
92  return Status::OK();
93 
94  // weak validate
95  // can not add new edges after dumping
96  if (is_saved_) {
97  return Status::Invalid(
98  "The edge builder has been saved, can not add "
99  "new edges any more");
100  }
101  // adj list type not exits in edge info
102  if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
103  return Status::KeyError(
104  "Adj list type ", AdjListTypeToString(adj_list_type_),
105  " does not exist in the ", edge_info_->GetEdgeLabel(), " edge info.");
106  }
107 
108  // strong validate
109  if (validate_level == ValidateLevel::strong_validate) {
110  for (auto& property : e.GetProperties()) {
111  // check if the property is contained
112  if (!edge_info_->HasProperty(property.first)) {
113  return Status::KeyError("Property with name ", property.first,
114  " is not contained in the ",
115  edge_info_->GetEdgeLabel(), " edge info.");
116  }
117  // check if the property type is correct
118  auto type = edge_info_->GetPropertyType(property.first).value();
119  bool invalid_type = false;
120  switch (type->id()) {
121  case Type::BOOL:
122  if (property.second.type() !=
123  typeid(typename TypeToArrowType<Type::BOOL>::CType)) {
124  invalid_type = true;
125  }
126  break;
127  case Type::INT32:
128  if (property.second.type() !=
129  typeid(typename TypeToArrowType<Type::INT32>::CType)) {
130  invalid_type = true;
131  }
132  break;
133  case Type::INT64:
134  if (property.second.type() !=
135  typeid(typename TypeToArrowType<Type::INT64>::CType)) {
136  invalid_type = true;
137  }
138  break;
139  case Type::FLOAT:
140  if (property.second.type() !=
141  typeid(typename TypeToArrowType<Type::FLOAT>::CType)) {
142  invalid_type = true;
143  }
144  break;
145  case Type::DOUBLE:
146  if (property.second.type() !=
147  typeid(typename TypeToArrowType<Type::DOUBLE>::CType)) {
148  invalid_type = true;
149  }
150  break;
151  case Type::STRING:
152  if (property.second.type() !=
153  typeid(typename TypeToArrowType<Type::STRING>::CType)) {
154  invalid_type = true;
155  }
156  break;
157  case Type::DATE:
158  // date is stored as int32_t
159  if (property.second.type() !=
160  typeid(typename TypeToArrowType<Type::DATE>::CType::c_type)) {
161  invalid_type = true;
162  }
163  break;
164  case Type::TIMESTAMP:
165  // timestamp is stored as int64_t
166  if (property.second.type() !=
167  typeid(typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type)) {
168  invalid_type = true;
169  }
170  break;
171  default:
172  return Status::TypeError("Unsupported property type.");
173  }
174  if (invalid_type) {
175  return Status::TypeError(
176  "Invalid data type for property ", property.first + ", defined as ",
177  type->ToTypeName(), ", but got ", property.second.type().name());
178  }
179  }
180  }
181  return Status::OK();
182 }
183 
184 template <Type type>
185 Status EdgesBuilder::tryToAppend(
186  const std::string& property_name,
187  std::shared_ptr<arrow::Array>& array, // NOLINT
188  const std::vector<Edge>& edges) {
189  using CType = typename TypeToArrowType<type>::CType;
190  arrow::MemoryPool* pool = arrow::default_memory_pool();
191  typename TypeToArrowType<type>::BuilderType builder(pool);
192  for (const auto& e : edges) {
193  if (e.Empty() || (!e.ContainProperty(property_name))) {
194  RETURN_NOT_ARROW_OK(builder.AppendNull());
195  } else {
196  RETURN_NOT_ARROW_OK(
197  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
198  }
199  }
200  array = builder.Finish().ValueOrDie();
201  return Status::OK();
202 }
203 
204 template <>
205 Status EdgesBuilder::tryToAppend<Type::TIMESTAMP>(
206  const std::string& property_name,
207  std::shared_ptr<arrow::Array>& array, // NOLINT
208  const std::vector<Edge>& edges) {
209  using CType = typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type;
210  arrow::MemoryPool* pool = arrow::default_memory_pool();
211  typename TypeToArrowType<Type::TIMESTAMP>::BuilderType builder(
212  arrow::timestamp(arrow::TimeUnit::MILLI), pool);
213  for (const auto& e : edges) {
214  if (e.Empty() || (!e.ContainProperty(property_name))) {
215  RETURN_NOT_ARROW_OK(builder.AppendNull());
216  } else {
217  RETURN_NOT_ARROW_OK(
218  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
219  }
220  }
221  array = builder.Finish().ValueOrDie();
222  return Status::OK();
223 }
224 
225 template <>
226 Status EdgesBuilder::tryToAppend<Type::DATE>(
227  const std::string& property_name,
228  std::shared_ptr<arrow::Array>& array, // NOLINT
229  const std::vector<Edge>& edges) {
230  using CType = typename TypeToArrowType<Type::DATE>::CType::c_type;
231  arrow::MemoryPool* pool = arrow::default_memory_pool();
232  typename TypeToArrowType<Type::DATE>::BuilderType builder(pool);
233  for (const auto& e : edges) {
234  if (e.Empty() || (!e.ContainProperty(property_name))) {
235  RETURN_NOT_ARROW_OK(builder.AppendNull());
236  } else {
237  RETURN_NOT_ARROW_OK(
238  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
239  }
240  }
241  array = builder.Finish().ValueOrDie();
242  return Status::OK();
243 }
244 
245 Status EdgesBuilder::appendToArray(
246  const std::shared_ptr<DataType>& type, const std::string& property_name,
247  std::shared_ptr<arrow::Array>& array, // NOLINT
248  const std::vector<Edge>& edges) {
249  switch (type->id()) {
250  case Type::BOOL:
251  return tryToAppend<Type::BOOL>(property_name, array, edges);
252  case Type::INT32:
253  return tryToAppend<Type::INT32>(property_name, array, edges);
254  case Type::INT64:
255  return tryToAppend<Type::INT64>(property_name, array, edges);
256  case Type::FLOAT:
257  return tryToAppend<Type::FLOAT>(property_name, array, edges);
258  case Type::DOUBLE:
259  return tryToAppend<Type::DOUBLE>(property_name, array, edges);
260  case Type::STRING:
261  return tryToAppend<Type::STRING>(property_name, array, edges);
262  case Type::DATE:
263  return tryToAppend<Type::DATE>(property_name, array, edges);
264  case Type::TIMESTAMP:
265  return tryToAppend<Type::TIMESTAMP>(property_name, array, edges);
266  default:
267  return Status::TypeError("Unsupported property type.");
268  }
269  return Status::OK();
270 }
271 
272 Status EdgesBuilder::tryToAppend(
273  int src_or_dest,
274  std::shared_ptr<arrow::Array>& array, // NOLINT
275  const std::vector<Edge>& edges) {
276  arrow::MemoryPool* pool = arrow::default_memory_pool();
277  typename arrow::TypeTraits<arrow::Int64Type>::BuilderType builder(pool);
278  for (const auto& e : edges) {
279  RETURN_NOT_ARROW_OK(builder.Append(std::any_cast<int64_t>(
280  src_or_dest == 1 ? e.GetSource() : e.GetDestination())));
281  }
282  array = builder.Finish().ValueOrDie();
283  return Status::OK();
284 }
285 
286 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::convertToTable(
287  const std::vector<Edge>& edges) {
288  const auto& property_groups = edge_info_->GetPropertyGroups();
289  std::vector<std::shared_ptr<arrow::Array>> arrays;
290  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
291  // add src
292  std::shared_ptr<arrow::Array> array;
293  schema_vector.push_back(arrow::field(
294  GeneralParams::kSrcIndexCol, DataType::DataTypeToArrowDataType(int64())));
295  GAR_RETURN_NOT_OK(tryToAppend(1, array, edges));
296  arrays.push_back(array);
297  // add dst
298  schema_vector.push_back(arrow::field(
299  GeneralParams::kDstIndexCol, DataType::DataTypeToArrowDataType(int64())));
300  GAR_RETURN_NOT_OK(tryToAppend(0, array, edges));
301  arrays.push_back(array);
302  // add properties
303  for (auto& property_group : property_groups) {
304  for (auto& property : property_group->GetProperties()) {
305  // add a column to schema
306  schema_vector.push_back(arrow::field(
307  property.name, DataType::DataTypeToArrowDataType(property.type)));
308  // add a column to data
309  std::shared_ptr<arrow::Array> array;
310  GAR_RETURN_NOT_OK(
311  appendToArray(property.type, property.name, array, edges));
312  arrays.push_back(array);
313  }
314  }
315  auto schema = std::make_shared<arrow::Schema>(schema_vector);
316  return arrow::Table::Make(schema, arrays);
317 }
318 
319 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::getOffsetTable(
320  IdType vertex_chunk_index, const std::vector<Edge>& edges) {
321  arrow::Int64Builder builder;
322  IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
323  end_index = begin_index + vertex_chunk_size_;
324  RETURN_NOT_ARROW_OK(builder.Append(0));
325 
326  std::vector<std::shared_ptr<arrow::Array>> arrays;
327  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
328  schema_vector.push_back(arrow::field(
329  GeneralParams::kOffsetCol, DataType::DataTypeToArrowDataType(int64())));
330 
331  size_t index = 0;
332  for (IdType i = begin_index; i < end_index; i++) {
333  while (index < edges.size()) {
334  int64_t x = (adj_list_type_ == AdjListType::ordered_by_source
335  ? edges[index].GetSource()
336  : edges[index].GetDestination());
337  if (x <= i) {
338  index++;
339  } else {
340  break;
341  }
342  }
343  RETURN_NOT_ARROW_OK(builder.Append(index));
344  }
345  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish());
346  arrays.push_back(array);
347  auto schema = std::make_shared<arrow::Schema>(schema_vector);
348  return arrow::Table::Make(schema, arrays);
349 }
350 
351 } // namespace graphar::builder
The writer for edge (adj list, offset and property group) chunks.
Definition: chunk_writer.h:257
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
Definition: status.h:123
static Status TypeError(Args &&... args)
Definition: status.h:178
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
Edge is designed for constructing edges builder.
Definition: edges_builder.h:45
const std::any & GetProperty(const std::string &property) const
Get a property of the edge.
Definition: edges_builder.h:95
const std::unordered_map< std::string, std::any > & GetProperties() const
Get all properties of the edge.
IdType GetSource() const noexcept
Get source id of the edge.
Definition: edges_builder.h:68
bool Empty() const noexcept
Check if the edge is empty.
Definition: edges_builder.h:61
IdType GetDestination() const noexcept
Get destination id of the edge.
Definition: edges_builder.h:75
bool ContainProperty(const std::string &property) const
Check if the edge contains a property.
Status Dump()
Dump the collection into files.