Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
writer_util.h
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #pragma once
21 #include <parquet/properties.h>
22 #include <cstdint>
23 #include <limits>
24 #include <memory>
25 #include <string>
26 #include <unordered_map>
27 #include <utility>
28 #include <vector>
29 #ifdef ARROW_ORC
30 #include "arrow/adapters/orc/adapter.h"
31 #endif
32 #include "arrow/api.h"
33 #include "arrow/csv/api.h"
34 #include "arrow/dataset/api.h"
35 #include "arrow/filesystem/api.h"
36 #include "parquet/arrow/writer.h"
37 
38 namespace graphar {
60  private:
67  class CSVOption {
68  public:
69  arrow::io::IOContext io_context;
70  std::string null_string;
71  std::string eol = "\n";
72  bool include_header = true;
73  char delimiter = ',';
74  int32_t batch_size = 1024;
75  arrow::csv::QuotingStyle quoting_style = arrow::csv::QuotingStyle::Needed;
76  };
84  class ParquetOption {
85  public:
86  std::shared_ptr<::parquet::FileEncryptionProperties> encryption_properties;
87  std::unordered_map<std::string, ::parquet::Encoding::type> column_encoding;
88  std::unordered_map<std::string, arrow::Compression::type>
89  column_compression;
90  std::unordered_map<std::string, int> column_compression_level;
91  std::unordered_map<std::string, size_t> column_max_statistics_size;
92  std::unordered_map<std::string, bool> column_statistics;
93  std::unordered_map<std::string, bool> column_write_page_index;
94  std::vector<::parquet::SortingColumn> sorting_columns;
95  int64_t dictionary_pagesize_limit = 1024 * 1024;
96  int64_t write_batch_size = 1024;
97  int64_t max_row_group_length = 1024 * 1024;
98  int64_t data_pagesize = 1024 * 1024;
99  size_t max_statistics_size = 4096;
100  int compression_level = std::numeric_limits<int>::min();
101  ::parquet::ParquetDataPageVersion data_page_version =
102  ::parquet::ParquetDataPageVersion::V1;
103  ::parquet::ParquetVersion::type version =
104  ::parquet::ParquetVersion::PARQUET_2_6;
105  ::parquet::Encoding::type encoding = ::parquet::Encoding::PLAIN;
106  arrow::Compression::type compression = arrow::Compression::ZSTD;
107  ::arrow::TimeUnit::type coerce_timestamps = ::arrow::TimeUnit::MICRO;
108  ::arrow::internal::Executor* executor = nullptr;
109  bool enable_dictionary = true;
110  bool enable_statistics = true;
111  bool enable_store_decimal_as_integer = false;
112  bool enable_write_page_index = false;
113  bool compliant_nested_types = true;
114  bool use_threads = false;
115  bool enable_deprecated_int96_timestamps = false;
116  bool allow_truncated_timestamps = false;
117  bool store_schema = false;
118  };
125  class ORCOption {
126 #ifdef ARROW_ORC
127  public:
128  std::vector<int64_t> bloom_filter_columns;
129  arrow::adapters::orc::FileVersion file_version =
130  arrow::adapters::orc::FileVersion(0, 12);
131  arrow::adapters::orc::CompressionStrategy compression_strategy =
132  arrow::adapters::orc::CompressionStrategy::kSpeed;
133  arrow::Compression::type compression = arrow::Compression::UNCOMPRESSED;
134  int64_t stripe_size = 64 * 1024 * 1024;
135  int64_t batch_size = 1024;
136  int64_t compression_block_size = 64 * 1024;
137  int64_t row_index_stride = 10000;
138  double padding_tolerance = 0.0;
139  double dictionary_key_size_threshold = 0.0;
140  double bloom_filter_fpp = 0.05;
141 #endif
142  };
143 
144  public:
145  // Builder for CSVOption
147  public:
148  CSVOptionBuilder() : option_(std::make_shared<CSVOption>()) {}
149  explicit CSVOptionBuilder(std::shared_ptr<WriterOptions> wopt)
150  : writerOptions_(wopt),
151  option_(wopt && wopt->csvOption_ ? wopt->csvOption_
152  : std::make_shared<CSVOption>()) {}
153  CSVOptionBuilder& include_header(bool header) {
154  option_->include_header = header;
155  return *this;
156  }
157  CSVOptionBuilder& batch_size(int32_t bs) {
158  option_->batch_size = bs;
159  return *this;
160  }
161  CSVOptionBuilder& delimiter(char d) {
162  option_->delimiter = d;
163  return *this;
164  }
165  CSVOptionBuilder& null_string(const std::string& ns) {
166  option_->null_string = ns;
167  return *this;
168  }
169  CSVOptionBuilder& io_context(const arrow::io::IOContext& ctx) {
170  option_->io_context = ctx;
171  return *this;
172  }
173  CSVOptionBuilder& eol(const std::string& e) {
174  option_->eol = e;
175  return *this;
176  }
177  CSVOptionBuilder& quoting_style(arrow::csv::QuotingStyle qs) {
178  option_->quoting_style = qs;
179  return *this;
180  }
181  std::shared_ptr<WriterOptions> build() {
182  if (!writerOptions_) {
183  writerOptions_ = std::make_shared<WriterOptions>();
184  }
185  writerOptions_->setCsvOption(option_);
186  return writerOptions_;
187  }
188 
189  private:
190  std::shared_ptr<WriterOptions> writerOptions_;
191  std::shared_ptr<CSVOption> option_;
192  };
193 
194  // Builder for ParquetOption
196  public:
197  ParquetOptionBuilder() : option_(std::make_shared<ParquetOption>()) {}
198  explicit ParquetOptionBuilder(std::shared_ptr<WriterOptions> wopt)
199  : writerOptions_(wopt),
200  option_(wopt && wopt->parquetOption_
201  ? wopt->parquetOption_
202  : std::make_shared<ParquetOption>()) {}
203  ParquetOptionBuilder& enable_dictionary(bool enable) {
204  option_->enable_dictionary = enable;
205  return *this;
206  }
207  ParquetOptionBuilder& dictionary_pagesize_limit(int64_t limit) {
208  option_->dictionary_pagesize_limit = limit;
209  return *this;
210  }
211  ParquetOptionBuilder& write_batch_size(int64_t batch_size) {
212  option_->write_batch_size = batch_size;
213  return *this;
214  }
215  ParquetOptionBuilder& max_row_group_length(int64_t length) {
216  option_->max_row_group_length = length;
217  return *this;
218  }
219  ParquetOptionBuilder& data_pagesize(int64_t pagesize) {
220  option_->data_pagesize = pagesize;
221  return *this;
222  }
223  ParquetOptionBuilder& data_page_version(
224  ::parquet::ParquetDataPageVersion version) {
225  option_->data_page_version = version;
226  return *this;
227  }
228  ParquetOptionBuilder& version(::parquet::ParquetVersion::type ver) {
229  option_->version = ver;
230  return *this;
231  }
232  ParquetOptionBuilder& encoding(::parquet::Encoding::type enc) {
233  option_->encoding = enc;
234  return *this;
235  }
236  ParquetOptionBuilder& column_encoding(
237  const std::unordered_map<std::string, ::parquet::Encoding::type>&
238  encodings) {
239  option_->column_encoding = encodings;
240  return *this;
241  }
242  ParquetOptionBuilder& compression(arrow::Compression::type comp) {
243  option_->compression = comp;
244  return *this;
245  }
246  ParquetOptionBuilder& column_compression(
247  const std::unordered_map<std::string, arrow::Compression::type>&
248  compressions) {
249  option_->column_compression = compressions;
250  return *this;
251  }
252  ParquetOptionBuilder& compression_level(int level) {
253  option_->compression_level = level;
254  return *this;
255  }
256  ParquetOptionBuilder& column_compression_level(
257  const std::unordered_map<std::string, int>& levels) {
258  option_->column_compression_level = levels;
259  return *this;
260  }
261  ParquetOptionBuilder& max_statistics_size(size_t size) {
262  option_->max_statistics_size = size;
263  return *this;
264  }
265  ParquetOptionBuilder& column_max_statistics_size(
266  const std::unordered_map<std::string, size_t>& sizes) {
267  option_->column_max_statistics_size = sizes;
268  return *this;
269  }
270  ParquetOptionBuilder& encryption_properties(
271  const std::shared_ptr<::parquet::FileEncryptionProperties>& props) {
272  option_->encryption_properties = props;
273  return *this;
274  }
275  ParquetOptionBuilder& enable_statistics(bool enable) {
276  option_->enable_statistics = enable;
277  return *this;
278  }
279  ParquetOptionBuilder& column_statistics(
280  const std::unordered_map<std::string, bool>& stats) {
281  option_->column_statistics = stats;
282  return *this;
283  }
284  ParquetOptionBuilder& sorting_columns(
285  const std::vector<::parquet::SortingColumn>& columns) {
286  option_->sorting_columns = columns;
287  return *this;
288  }
289  ParquetOptionBuilder& enable_store_decimal_as_integer(bool enable) {
290  option_->enable_store_decimal_as_integer = enable;
291  return *this;
292  }
293  ParquetOptionBuilder& enable_write_page_index(bool enable) {
294  option_->enable_write_page_index = enable;
295  return *this;
296  }
297  ParquetOptionBuilder& column_write_page_index(
298  const std::unordered_map<std::string, bool>& indices) {
299  option_->column_write_page_index = indices;
300  return *this;
301  }
302  ParquetOptionBuilder& compliant_nested_types(bool compliant) {
303  option_->compliant_nested_types = compliant;
304  return *this;
305  }
306  ParquetOptionBuilder& use_threads(bool use) {
307  option_->use_threads = use;
308  return *this;
309  }
310  ParquetOptionBuilder& enable_deprecated_int96_timestamps(bool enable) {
311  option_->enable_deprecated_int96_timestamps = enable;
312  return *this;
313  }
314  ParquetOptionBuilder& coerce_timestamps(::arrow::TimeUnit::type unit) {
315  option_->coerce_timestamps = unit;
316  return *this;
317  }
318  ParquetOptionBuilder& allow_truncated_timestamps(bool allow) {
319  option_->allow_truncated_timestamps = allow;
320  return *this;
321  }
322  ParquetOptionBuilder& store_schema(bool store) {
323  option_->store_schema = store;
324  return *this;
325  }
326  ParquetOptionBuilder& executor(::arrow::internal::Executor* exec) {
327  option_->executor = exec;
328  return *this;
329  }
330  std::shared_ptr<WriterOptions> build() {
331  if (!writerOptions_) {
332  writerOptions_ = std::make_shared<WriterOptions>();
333  }
334  writerOptions_->setParquetOption(option_);
335  return writerOptions_;
336  }
337 
338  private:
339  std::shared_ptr<WriterOptions> writerOptions_;
340  std::shared_ptr<ParquetOption> option_;
341  };
342 
343  // Builder for ORCOption
345  public:
346  ORCOptionBuilder() : option_(std::make_shared<ORCOption>()) {}
347  explicit ORCOptionBuilder(std::shared_ptr<WriterOptions> wopt)
348  : writerOptions_(wopt),
349  option_(wopt && wopt->orcOption_ ? wopt->orcOption_
350  : std::make_shared<ORCOption>()) {}
351 #ifdef ARROW_ORC
352  ORCOptionBuilder& batch_size(int64_t bs) {
353  option_->batch_size = bs;
354  return *this;
355  }
356  ORCOptionBuilder& file_version(arrow::adapters::orc::FileVersion fv) {
357  option_->file_version = fv;
358  return *this;
359  }
360  ORCOptionBuilder& stripe_size(int64_t ss) {
361  option_->stripe_size = ss;
362  return *this;
363  }
364  ORCOptionBuilder& compression(arrow::Compression::type comp) {
365  option_->compression = comp;
366  return *this;
367  }
368  ORCOptionBuilder& compression_block_size(int64_t cbs) {
369  option_->compression_block_size = cbs;
370  return *this;
371  }
372  ORCOptionBuilder& compression_strategy(
373  arrow::adapters::orc::CompressionStrategy cs) {
374  option_->compression_strategy = cs;
375  return *this;
376  }
377  ORCOptionBuilder& row_index_stride(int64_t ris) {
378  option_->row_index_stride = ris;
379  return *this;
380  }
381  ORCOptionBuilder& padding_tolerance(double pt) {
382  option_->padding_tolerance = pt;
383  return *this;
384  }
385  ORCOptionBuilder& dictionary_key_size_threshold(double dkst) {
386  option_->dictionary_key_size_threshold = dkst;
387  return *this;
388  }
389  ORCOptionBuilder& bloom_filter_columns(const int64_t bfc) {
390  option_->bloom_filter_columns.push_back(bfc);
391  return *this;
392  }
393  ORCOptionBuilder& bloom_filter_fpp(double bffpp) {
394  option_->bloom_filter_fpp = bffpp;
395  return *this;
396  }
397 #endif
398  std::shared_ptr<WriterOptions> build() {
399  if (!writerOptions_) {
400  writerOptions_ = std::make_shared<WriterOptions>();
401  }
402  writerOptions_->setOrcOption(option_);
403  return writerOptions_;
404  }
405 
406  private:
407  std::shared_ptr<WriterOptions> writerOptions_;
408  std::shared_ptr<ORCOption> option_;
409  };
410 
411  WriterOptions() = default;
412  WriterOptions(std::shared_ptr<CSVOption> csv,
413  std::shared_ptr<ParquetOption> parquet,
414  std::shared_ptr<ORCOption> orc)
415  : csvOption_(csv), parquetOption_(parquet), orcOption_(orc) {}
416  static std::shared_ptr<WriterOptions> DefaultWriterOption() {
417  return std::make_shared<WriterOptions>();
418  }
419  void setCsvOption(std::shared_ptr<CSVOption> csv_option) {
420  csvOption_ = csv_option;
421  }
422  void setParquetOption(std::shared_ptr<ParquetOption> parquet_option) {
423  parquetOption_ = parquet_option;
424  }
425  void setOrcOption(std::shared_ptr<ORCOption> orc_option) {
426  orcOption_ = orc_option;
427  }
428  arrow::csv::WriteOptions getCsvOption() const;
429  std::shared_ptr<parquet::WriterProperties> getParquetWriterProperties() const;
430  std::shared_ptr<parquet::ArrowWriterProperties> getArrowWriterProperties()
431  const;
432 #ifdef ARROW_ORC
433  arrow::adapters::orc::WriteOptions getOrcOption() const;
434 #endif
435 
436  private:
437  std::shared_ptr<CSVOption> csvOption_;
438  std::shared_ptr<ParquetOption> parquetOption_;
439  std::shared_ptr<ORCOption> orcOption_;
440 };
441 
445 enum class ValidateLevel : char {
447  default_validate = 0,
449  no_validate = 1,
452  weak_validate = 2,
456  strong_validate = 3
457 };
458 
459 } // namespace graphar
Provides configuration options for different file format writers (CSV, Parquet, ORC) in GraphAr....
Definition: writer_util.h:59