20 #include "arrow/api.h"
22 #include "graphar/convert_to_arrow_type.h"
23 #include "graphar/general_params.h"
24 #include "graphar/high-level/edges_builder.h"
25 #include "graphar/result.h"
27 namespace graphar::builder {
31 EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, writer_options_,
34 IdType num_vertex_chunks =
35 (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
36 for (IdType i = 0; i < num_vertex_chunks; i++)
37 if (edges_.find(i) == edges_.end()) {
38 std::vector<Edge> empty_chunk_edges;
39 edges_[i] = empty_chunk_edges;
42 if (adj_list_type_ == AdjListType::ordered_by_source ||
43 adj_list_type_ == AdjListType::ordered_by_dest) {
44 for (
auto& chunk_edges : edges_) {
45 IdType vertex_chunk_index = chunk_edges.first;
47 if (adj_list_type_ == AdjListType::ordered_by_source)
48 sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_src);
49 if (adj_list_type_ == AdjListType::ordered_by_dest)
50 sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_dst);
54 getOffsetTable(vertex_chunk_index, chunk_edges.second));
62 IdType vertex_chunk_num =
63 (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
64 for (IdType vertex_chunk_index = 0; vertex_chunk_index < vertex_chunk_num;
65 vertex_chunk_index++) {
66 if (edges_.find(vertex_chunk_index) == edges_.end()) {
67 GAR_RETURN_NOT_OK(writer.
WriteEdgesNum(vertex_chunk_index, 0));
70 vertex_chunk_index, edges_[vertex_chunk_index].size()));
74 for (
auto& chunk_edges : edges_) {
75 IdType vertex_chunk_index = chunk_edges.first;
77 GAR_ASSIGN_OR_RAISE(
auto input_table, convertToTable(chunk_edges.second));
79 GAR_RETURN_NOT_OK(writer.
WriteTable(input_table, vertex_chunk_index, 0));
80 chunk_edges.second.clear();
87 ValidateLevel validate_level)
const {
89 if (validate_level == ValidateLevel::default_validate)
90 validate_level = validate_level_;
92 if (validate_level == ValidateLevel::no_validate)
99 "The edge builder has been saved, can not add "
100 "new edges any more");
103 if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
105 "Adj list type ", AdjListTypeToString(adj_list_type_),
106 " does not exist in the ", edge_info_->GetEdgeType(),
" edge info.");
110 if (validate_level == ValidateLevel::strong_validate) {
113 if (!edge_info_->HasProperty(property.first)) {
115 " is not contained in the ",
116 edge_info_->GetEdgeType(),
" edge info.");
119 auto type = edge_info_->GetPropertyType(property.first).value();
120 bool invalid_type =
false;
121 switch (type->id()) {
123 if (property.second.type() !=
124 typeid(
typename TypeToArrowType<Type::BOOL>::CType)) {
129 if (property.second.type() !=
130 typeid(
typename TypeToArrowType<Type::INT32>::CType)) {
135 if (property.second.type() !=
136 typeid(
typename TypeToArrowType<Type::INT64>::CType)) {
141 if (property.second.type() !=
142 typeid(
typename TypeToArrowType<Type::FLOAT>::CType)) {
147 if (property.second.type() !=
148 typeid(
typename TypeToArrowType<Type::DOUBLE>::CType)) {
153 if (property.second.type() !=
154 typeid(
typename TypeToArrowType<Type::STRING>::CType)) {
160 if (property.second.type() !=
161 typeid(
typename TypeToArrowType<Type::DATE>::CType::c_type)) {
165 case Type::TIMESTAMP:
167 if (property.second.type() !=
168 typeid(
typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type)) {
177 "Invalid data type for property ", property.first +
", defined as ",
178 type->ToTypeName(),
", but got ", property.second.type().name());
186 Status EdgesBuilder::tryToAppend(
187 const std::string& property_name,
188 std::shared_ptr<arrow::Array>& array,
189 const std::vector<Edge>& edges) {
190 using CType =
typename TypeToArrowType<type>::CType;
191 arrow::MemoryPool* pool = arrow::default_memory_pool();
192 typename TypeToArrowType<type>::BuilderType builder(pool);
193 for (
const auto& e : edges) {
195 RETURN_NOT_ARROW_OK(builder.AppendNull());
198 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
201 array = builder.Finish().ValueOrDie();
206 Status EdgesBuilder::tryToAppend<Type::TIMESTAMP>(
207 const std::string& property_name,
208 std::shared_ptr<arrow::Array>& array,
209 const std::vector<Edge>& edges) {
210 using CType =
typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type;
211 arrow::MemoryPool* pool = arrow::default_memory_pool();
212 typename TypeToArrowType<Type::TIMESTAMP>::BuilderType builder(
213 arrow::timestamp(arrow::TimeUnit::MILLI), pool);
214 for (
const auto& e : edges) {
216 RETURN_NOT_ARROW_OK(builder.AppendNull());
219 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
222 array = builder.Finish().ValueOrDie();
227 Status EdgesBuilder::tryToAppend<Type::DATE>(
228 const std::string& property_name,
229 std::shared_ptr<arrow::Array>& array,
230 const std::vector<Edge>& edges) {
231 using CType =
typename TypeToArrowType<Type::DATE>::CType::c_type;
232 arrow::MemoryPool* pool = arrow::default_memory_pool();
233 typename TypeToArrowType<Type::DATE>::BuilderType builder(pool);
234 for (
const auto& e : edges) {
236 RETURN_NOT_ARROW_OK(builder.AppendNull());
239 builder.Append(std::any_cast<CType>(e.
GetProperty(property_name))));
242 array = builder.Finish().ValueOrDie();
246 Status EdgesBuilder::appendToArray(
247 const std::shared_ptr<DataType>& type,
const std::string& property_name,
248 std::shared_ptr<arrow::Array>& array,
249 const std::vector<Edge>& edges) {
250 switch (type->id()) {
252 return tryToAppend<Type::BOOL>(property_name, array, edges);
254 return tryToAppend<Type::INT32>(property_name, array, edges);
256 return tryToAppend<Type::INT64>(property_name, array, edges);
258 return tryToAppend<Type::FLOAT>(property_name, array, edges);
260 return tryToAppend<Type::DOUBLE>(property_name, array, edges);
262 return tryToAppend<Type::STRING>(property_name, array, edges);
264 return tryToAppend<Type::DATE>(property_name, array, edges);
265 case Type::TIMESTAMP:
266 return tryToAppend<Type::TIMESTAMP>(property_name, array, edges);
273 Status EdgesBuilder::tryToAppend(
275 std::shared_ptr<arrow::Array>& array,
276 const std::vector<Edge>& edges) {
277 arrow::MemoryPool* pool = arrow::default_memory_pool();
278 typename arrow::TypeTraits<arrow::Int64Type>::BuilderType builder(pool);
279 for (
const auto& e : edges) {
280 RETURN_NOT_ARROW_OK(builder.Append(std::any_cast<int64_t>(
283 array = builder.Finish().ValueOrDie();
287 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::convertToTable(
288 const std::vector<Edge>& edges) {
289 const auto& property_groups = edge_info_->GetPropertyGroups();
290 std::vector<std::shared_ptr<arrow::Array>> arrays;
291 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
293 std::shared_ptr<arrow::Array> array;
294 schema_vector.push_back(arrow::field(
295 GeneralParams::kSrcIndexCol, DataType::DataTypeToArrowDataType(int64())));
296 GAR_RETURN_NOT_OK(tryToAppend(1, array, edges));
297 arrays.push_back(array);
299 schema_vector.push_back(arrow::field(
300 GeneralParams::kDstIndexCol, DataType::DataTypeToArrowDataType(int64())));
301 GAR_RETURN_NOT_OK(tryToAppend(0, array, edges));
302 arrays.push_back(array);
304 for (
auto& property_group : property_groups) {
305 for (
auto& property : property_group->GetProperties()) {
307 schema_vector.push_back(arrow::field(
308 property.name, DataType::DataTypeToArrowDataType(property.type)));
310 std::shared_ptr<arrow::Array> array;
312 appendToArray(property.type, property.name, array, edges));
313 arrays.push_back(array);
316 auto schema = std::make_shared<arrow::Schema>(schema_vector);
317 return arrow::Table::Make(schema, arrays);
320 Result<std::shared_ptr<arrow::Table>> EdgesBuilder::getOffsetTable(
321 IdType vertex_chunk_index,
const std::vector<Edge>& edges) {
322 arrow::Int64Builder builder;
323 IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
324 end_index = begin_index + vertex_chunk_size_;
325 RETURN_NOT_ARROW_OK(builder.Append(0));
327 std::vector<std::shared_ptr<arrow::Array>> arrays;
328 std::vector<std::shared_ptr<arrow::Field>> schema_vector;
329 schema_vector.push_back(arrow::field(
330 GeneralParams::kOffsetCol, DataType::DataTypeToArrowDataType(int64())));
333 for (IdType i = begin_index; i < end_index; i++) {
334 while (index < edges.size()) {
335 int64_t x = (adj_list_type_ == AdjListType::ordered_by_source
336 ? edges[index].GetSource()
337 : edges[index].GetDestination());
344 RETURN_NOT_ARROW_OK(builder.Append(index));
346 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto array, builder.Finish());
347 arrays.push_back(array);
348 auto schema = std::make_shared<arrow::Schema>(schema_vector);
349 return arrow::Table::Make(schema, arrays);
The writer for edge (adj list, offset and property group) chunks.
Status WriteOffsetChunk(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, ValidateLevel validate_level=ValidateLevel::default_validate) const
Validate and write the offset chunk for a vertex chunk.
Status WriteEdgesNum(IdType vertex_chunk_index, const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of edges into the file.
Status WriteTable(const std::shared_ptr< arrow::Table > &input_table, IdType vertex_chunk_index, IdType start_chunk_index=0, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write chunks of the adj list and all property groups for the edges of a vertex chunk.
Status WriteVerticesNum(const IdType &count, ValidateLevel validate_level=ValidateLevel::default_validate) const
Write the number of vertices into the file.
Status outcome object (success or error)
static Status TypeError(Args &&... args)
static Status KeyError(Args &&... args)
static Status Invalid(Args &&... args)
Edge is designed for constructing edges builder.
const std::any & GetProperty(const std::string &property) const
Get a property of the edge.
const std::unordered_map< std::string, std::any > & GetProperties() const
Get all properties of the edge.
IdType GetSource() const noexcept
Get source id of the edge.
bool Empty() const noexcept
Check if the edge is empty.
IdType GetDestination() const noexcept
Get destination id of the edge.
bool ContainProperty(const std::string &property) const
Check if the edge contains a property.
Status Dump()
Dump the collection into files.