Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
edges_builder.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include "arrow/api.h"
21 
22 #include "graphar/convert_to_arrow_type.h"
23 #include "graphar/general_params.h"
24 #include "graphar/high-level/edges_builder.h"
25 #include "graphar/result.h"
26 
27 namespace graphar::builder {
28 
30  // construct the writer
31  EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, writer_options_,
32  validate_level_);
33  // construct empty edge collections for vertex chunks without edges
34  IdType num_vertex_chunks =
35  (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
36  for (IdType i = 0; i < num_vertex_chunks; i++)
37  if (edges_.find(i) == edges_.end()) {
38  std::vector<Edge> empty_chunk_edges;
39  edges_[i] = empty_chunk_edges;
40  }
41  // dump the offsets
42  if (adj_list_type_ == AdjListType::ordered_by_source ||
43  adj_list_type_ == AdjListType::ordered_by_dest) {
44  for (auto& chunk_edges : edges_) {
45  IdType vertex_chunk_index = chunk_edges.first;
46  // sort the edges
47  if (adj_list_type_ == AdjListType::ordered_by_source)
48  sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_src);
49  if (adj_list_type_ == AdjListType::ordered_by_dest)
50  sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_dst);
51  // construct and write offset chunk
52  GAR_ASSIGN_OR_RAISE(
53  auto offset_table,
54  getOffsetTable(vertex_chunk_index, chunk_edges.second));
55  GAR_RETURN_NOT_OK(
56  writer.WriteOffsetChunk(offset_table, vertex_chunk_index));
57  }
58  }
59  // dump the vertex num
60  GAR_RETURN_NOT_OK(writer.WriteVerticesNum(num_vertices_));
61  // dump the edge nums
62  IdType vertex_chunk_num =
63  (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
64  for (IdType vertex_chunk_index = 0; vertex_chunk_index < vertex_chunk_num;
65  vertex_chunk_index++) {
66  if (edges_.find(vertex_chunk_index) == edges_.end()) {
67  GAR_RETURN_NOT_OK(writer.WriteEdgesNum(vertex_chunk_index, 0));
68  } else {
69  GAR_RETURN_NOT_OK(writer.WriteEdgesNum(
70  vertex_chunk_index, edges_[vertex_chunk_index].size()));
71  }
72  }
73  // dump the edges
74  for (auto& chunk_edges : edges_) {
75  IdType vertex_chunk_index = chunk_edges.first;
76  // convert to table
77  GAR_ASSIGN_OR_RAISE(auto input_table, convertToTable(chunk_edges.second));
78  // write table
79  GAR_RETURN_NOT_OK(writer.WriteTable(input_table, vertex_chunk_index, 0));
80  chunk_edges.second.clear();
81  }
82  is_saved_ = true;
83  return Status::OK();
84 }
85 
86 Status EdgesBuilder::validate(const Edge& e,
87  ValidateLevel validate_level) const {
88  // use the builder's validate level
89  if (validate_level == ValidateLevel::default_validate)
90  validate_level = validate_level_;
91  // no validate
92  if (validate_level == ValidateLevel::no_validate)
93  return Status::OK();
94 
95  // weak validate
96  // can not add new edges after dumping
97  if (is_saved_) {
98  return Status::Invalid(
99  "The edge builder has been saved, can not add "
100  "new edges any more");
101  }
102  // adj list type not exits in edge info
103  if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
104  return Status::KeyError(
105  "Adj list type ", AdjListTypeToString(adj_list_type_),
106  " does not exist in the ", edge_info_->GetEdgeType(), " edge info.");
107  }
108 
109  // strong validate
110  if (validate_level == ValidateLevel::strong_validate) {
111  for (auto& property : e.GetProperties()) {
112  // check if the property is contained
113  if (!edge_info_->HasProperty(property.first)) {
114  return Status::KeyError("Property with name ", property.first,
115  " is not contained in the ",
116  edge_info_->GetEdgeType(), " edge info.");
117  }
118  // check if the property type is correct
119  auto type = edge_info_->GetPropertyType(property.first).value();
120  bool invalid_type = false;
121  switch (type->id()) {
122  case Type::BOOL:
123  if (property.second.type() !=
124  typeid(typename TypeToArrowType<Type::BOOL>::CType)) {
125  invalid_type = true;
126  }
127  break;
128  case Type::INT32:
129  if (property.second.type() !=
130  typeid(typename TypeToArrowType<Type::INT32>::CType)) {
131  invalid_type = true;
132  }
133  break;
134  case Type::INT64:
135  if (property.second.type() !=
136  typeid(typename TypeToArrowType<Type::INT64>::CType)) {
137  invalid_type = true;
138  }
139  break;
140  case Type::FLOAT:
141  if (property.second.type() !=
142  typeid(typename TypeToArrowType<Type::FLOAT>::CType)) {
143  invalid_type = true;
144  }
145  break;
146  case Type::DOUBLE:
147  if (property.second.type() !=
148  typeid(typename TypeToArrowType<Type::DOUBLE>::CType)) {
149  invalid_type = true;
150  }
151  break;
152  case Type::STRING:
153  if (property.second.type() !=
154  typeid(typename TypeToArrowType<Type::STRING>::CType)) {
155  invalid_type = true;
156  }
157  break;
158  case Type::DATE:
159  // date is stored as int32_t
160  if (property.second.type() !=
161  typeid(typename TypeToArrowType<Type::DATE>::CType::c_type)) {
162  invalid_type = true;
163  }
164  break;
165  case Type::TIMESTAMP:
166  // timestamp is stored as int64_t
167  if (property.second.type() !=
168  typeid(typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type)) {
169  invalid_type = true;
170  }
171  break;
172  default:
173  return Status::TypeError("Unsupported property type.");
174  }
175  if (invalid_type) {
176  return Status::TypeError(
177  "Invalid data type for property ", property.first + ", defined as ",
178  type->ToTypeName(), ", but got ", property.second.type().name());
179  }
180  }
181  }
182  return Status::OK();
183 }
184 
185 template <Type type>
186 Status EdgesBuilder::tryToAppend(
187  const std::string& property_name,
188  std::shared_ptr<arrow::Array>& array, // NOLINT
189  const std::vector<Edge>& edges) {
190  using CType = typename TypeToArrowType<type>::CType;
191  arrow::MemoryPool* pool = arrow::default_memory_pool();
192  typename TypeToArrowType<type>::BuilderType builder(pool);
193  for (const auto& e : edges) {
194  if (e.Empty() || (!e.ContainProperty(property_name))) {
195  RETURN_NOT_ARROW_OK(builder.AppendNull());
196  } else {
197  RETURN_NOT_ARROW_OK(
198  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
199  }
200  }
201  array = builder.Finish().ValueOrDie();
202  return Status::OK();
203 }
204 
205 template <>
206 Status EdgesBuilder::tryToAppend<Type::TIMESTAMP>(
207  const std::string& property_name,
208  std::shared_ptr<arrow::Array>& array, // NOLINT
209  const std::vector<Edge>& edges) {
210  using CType = typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type;
211  arrow::MemoryPool* pool = arrow::default_memory_pool();
212  typename TypeToArrowType<Type::TIMESTAMP>::BuilderType builder(
213  arrow::timestamp(arrow::TimeUnit::MILLI), pool);
214  for (const auto& e : edges) {
215  if (e.Empty() || (!e.ContainProperty(property_name))) {
216  RETURN_NOT_ARROW_OK(builder.AppendNull());
217  } else {
218  RETURN_NOT_ARROW_OK(
219  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
220  }
221  }
222  array = builder.Finish().ValueOrDie();
223  return Status::OK();
224 }
225 
226 template <>
227 Status EdgesBuilder::tryToAppend<Type::DATE>(
228  const std::string& property_name,
229  std::shared_ptr<arrow::Array>& array, // NOLINT
230  const std::vector<Edge>& edges) {
231  using CType = typename TypeToArrowType<Type::DATE>::CType::c_type;
232  arrow::MemoryPool* pool = arrow::default_memory_pool();
233  typename TypeToArrowType<Type::DATE>::BuilderType builder(pool);
234  for (const auto& e : edges) {
235  if (e.Empty() || (!e.ContainProperty(property_name))) {
236  RETURN_NOT_ARROW_OK(builder.AppendNull());
237  } else {
238  RETURN_NOT_ARROW_OK(
239  builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
240  }
241  }
242  array = builder.Finish().ValueOrDie();
243  return Status::OK();
244 }
245 
246 Status EdgesBuilder::appendToArray(
247  const std::shared_ptr<DataType>& type, const std::string& property_name,
248  std::shared_ptr<arrow::Array>& array, // NOLINT
249  const std::vector<Edge>& edges) {
250  switch (type->id()) {
251  case Type::BOOL:
252  return tryToAppend<Type::BOOL>(property_name, array, edges);
253  case Type::INT32:
254  return tryToAppend<Type::INT32>(property_name, array, edges);
255  case Type::INT64:
256  return tryToAppend<Type::INT64>(property_name, array, edges);
257  case Type::FLOAT:
258  return tryToAppend<Type::FLOAT>(property_name, array, edges);
259  case Type::DOUBLE:
260  return tryToAppend<Type::DOUBLE>(property_name, array, edges);
261  case Type::STRING:
262  return tryToAppend<Type::STRING>(property_name, array, edges);
263  case Type::DATE:
264  return tryToAppend<Type::DATE>(property_name, array, edges);
265  case Type::TIMESTAMP:
266  return tryToAppend<Type::TIMESTAMP>(property_name, array, edges);
267  default:
268  return Status::TypeError("Unsupported property type.");
269  }
270  return Status::OK();
271 }
272 
273 Status EdgesBuilder::tryToAppend(
274  int src_or_dest,
275  std::shared_ptr<arrow::Array>& array, // NOLINT
276  const std::vector<Edge>& edges) {
277  arrow::MemoryPool* pool = arrow::default_memory_pool();
278  typename arrow::TypeTraits<arrow::Int64Type>::BuilderType builder(pool);
279  for (const auto& e : edges) {
280  RETURN_NOT_ARROW_OK(builder.Append(std::any_cast<int64_t>(
281  src_or_dest == 1 ? e.GetSource() : e.GetDestination())));
282  }
283  array = builder.Finish().ValueOrDie();
284  return Status::OK();
285 }
286 
287 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::convertToTable(
288  const std::vector<Edge>& edges) {
289  const auto& property_groups = edge_info_->GetPropertyGroups();
290  std::vector<std::shared_ptr<arrow::Array>> arrays;
291  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
292  // add src
293  std::shared_ptr<arrow::Array> array;
294  schema_vector.push_back(arrow::field(
295  GeneralParams::kSrcIndexCol, DataType::DataTypeToArrowDataType(int64())));
296  GAR_RETURN_NOT_OK(tryToAppend(1, array, edges));
297  arrays.push_back(array);
298  // add dst
299  schema_vector.push_back(arrow::field(
300  GeneralParams::kDstIndexCol, DataType::DataTypeToArrowDataType(int64())));
301  GAR_RETURN_NOT_OK(tryToAppend(0, array, edges));
302  arrays.push_back(array);
303  // add properties
304  for (auto& property_group : property_groups) {
305  for (auto& property : property_group->GetProperties()) {
306  // add a column to schema
307  schema_vector.push_back(arrow::field(
308  property.name, DataType::DataTypeToArrowDataType(property.type)));
309  // add a column to data
310  std::shared_ptr<arrow::Array> array;
311  GAR_RETURN_NOT_OK(
312  appendToArray(property.type, property.name, array, edges));
313  arrays.push_back(array);
314  }
315  }
316  auto schema = std::make_shared<arrow::Schema>(schema_vector);
317  return arrow::Table::Make(schema, arrays);
318 }
319 
320 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::getOffsetTable(
321  IdType vertex_chunk_index, const std::vector<Edge>& edges) {
322  arrow::Int64Builder builder;
323  IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
324  end_index = begin_index + vertex_chunk_size_;
325  RETURN_NOT_ARROW_OK(builder.Append(0));
326 
327  std::vector<std::shared_ptr<arrow::Array>> arrays;
328  std::vector<std::shared_ptr<arrow::Field>> schema_vector;
329  schema_vector.push_back(arrow::field(
330  GeneralParams::kOffsetCol, DataType::DataTypeToArrowDataType(int64())));
331 
332  size_t index = 0;
333  for (IdType i = begin_index; i < end_index; i++) {
334  while (index < edges.size()) {
335  int64_t x = (adj_list_type_ == AdjListType::ordered_by_source
336  ? edges[index].GetSource()
337  : edges[index].GetDestination());
338  if (x <= i) {
339  index++;
340  } else {
341  break;
342  }
343  }
344  RETURN_NOT_ARROW_OK(builder.Append(index));
345  }
346  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish());
347  arrays.push_back(array);
348  auto schema = std::make_shared<arrow::Schema>(schema_vector);
349  return arrow::Table::Make(schema, arrays);
350 }
351 
352 } // namespace graphar::builder
The writer for edge (adj list, offset and property group) chunks.
Definition: chunk_writer.h:320
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
Definition: status.h:123
static Status TypeError(Args &&... args)
Definition: status.h:178
static Status KeyError(Args &&... args)
Definition: status.h:172
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157
Edge is designed for constructing edges builder.
Definition: edges_builder.h:45
const std::any & GetProperty(const std::string &property) const
Get a property of the edge.
Definition: edges_builder.h:95
const std::unordered_map< std::string, std::any > & GetProperties() const
Get all properties of the edge.
IdType GetSource() const noexcept
Get source id of the edge.
Definition: edges_builder.h:68
bool Empty() const noexcept
Check if the edge is empty.
Definition: edges_builder.h:61
IdType GetDestination() const noexcept
Get destination id of the edge.
Definition: edges_builder.h:75
bool ContainProperty(const std::string &property) const
Check if the edge contains a property.
Status Dump()
Dump the collection into files.