Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
filesystem.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <iostream>
21 #include <memory>
22 #include "graphar/writer_util.h"
23 #ifdef ARROW_ORC
24 #include "arrow/adapters/orc/adapter.h"
25 #endif
26 #include <arrow/compute/api.h>
27 #include "arrow/api.h"
28 #include "arrow/csv/api.h"
29 #include "arrow/dataset/api.h"
30 #include "parquet/arrow/reader.h"
31 #if defined(ARROW_VERSION) && ARROW_VERSION <= 12000000
32 #include "arrow/dataset/file_json.h"
33 #endif
34 #include "arrow/filesystem/api.h"
35 #include "arrow/filesystem/s3fs.h"
36 #include "arrow/ipc/writer.h"
37 #include "parquet/arrow/writer.h"
38 #include "simple-uri-parser/uri_parser.h"
39 
40 #include "graphar/expression.h"
41 #include "graphar/filesystem.h"
42 #include "graphar/fwd.h"
43 #include "graphar/general_params.h"
44 
45 namespace graphar::detail {
46 template <typename U, typename T>
47 static Status CastToLargeOffsetArray(
48  const std::shared_ptr<arrow::Array>& in,
49  const std::shared_ptr<arrow::DataType>& to_type,
50  std::shared_ptr<arrow::Array>& out) { // NOLINT(runtime/references)
51  auto array_data = in->data()->Copy();
52  auto offset = array_data->buffers[1];
53  using from_offset_type = typename U::offset_type;
54  using to_string_offset_type = typename T::offset_type;
55  auto raw_value_offsets_ =
56  offset == NULLPTR
57  ? NULLPTR
58  : reinterpret_cast<const from_offset_type*>(offset->data());
59  std::vector<to_string_offset_type> to_offset(offset->size() /
60  sizeof(from_offset_type));
61  for (size_t i = 0; i < to_offset.size(); ++i) {
62  to_offset[i] = raw_value_offsets_[i];
63  }
64  std::shared_ptr<arrow::Buffer> buffer;
65  arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
66  RETURN_NOT_ARROW_OK(
67  buffer_builder.Append(to_offset.data(), to_offset.size()));
68  RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
69  array_data->type = to_type;
70  array_data->buffers[1] = buffer;
71  out = arrow::MakeArray(array_data);
72  RETURN_NOT_ARROW_OK(out->ValidateFull());
73  return Status::OK();
74 }
75 
76 template <typename U, typename T>
77 static Status CastToLargeOffsetArray(
78  const std::shared_ptr<arrow::ChunkedArray>& in,
79  const std::shared_ptr<arrow::DataType>& to_type,
80  std::shared_ptr<arrow::ChunkedArray>& out) { // NOLINT(runtime/references)
81  std::vector<std::shared_ptr<arrow::Array>> chunks;
82  for (auto const& chunk : in->chunks()) {
83  std::shared_ptr<arrow::Array> array;
84  auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
85  GAR_RETURN_NOT_OK(status);
86  chunks.emplace_back(array);
87  }
88  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
89  return Status::OK();
90 }
91 } // namespace graphar::detail
92 
93 namespace graphar {
94 namespace ds = arrow::dataset;
95 
96 std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
97  const FileType type) const {
98  switch (type) {
99  case CSV:
100  return std::make_shared<ds::CsvFileFormat>();
101  case PARQUET:
102  return std::make_shared<ds::ParquetFileFormat>();
103  case JSON:
104  return std::make_shared<ds::JsonFileFormat>();
105 #ifdef ARROW_ORC
106  case ORC:
107  return std::make_shared<ds::OrcFileFormat>();
108 #endif
109  default:
110  return nullptr;
111  }
112 }
113 
114 Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
115  const std::string& path, FileType file_type,
116  const std::vector<int>& column_indices) const noexcept {
117  parquet::arrow::FileReaderBuilder builder;
118  auto open_file_status = builder.OpenFile(path);
119  if (!open_file_status.ok()) {
120  return Status::Invalid("Failed to open file: ", path, " - ",
121  open_file_status.ToString());
122  }
123  builder.memory_pool(arrow::default_memory_pool());
124  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto reader, builder.Build());
125  std::shared_ptr<arrow::Table> table;
126  if (column_indices.empty()) {
127  arrow::Status read_status = reader->ReadTable(&table);
128  if (!read_status.ok()) {
129  return Status::Invalid("Failed to read table from file: ", path, " - ",
130  read_status.ToString());
131  }
132  } else {
133  arrow::Status read_status = reader->ReadTable(column_indices, &table);
134  if (!read_status.ok()) {
135  return Status::Invalid("Failed to read table from file: ", path, " - ",
136  read_status.ToString());
137  }
138  }
139  return table;
140 }
141 
142 Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
143  const std::string& path, FileType file_type,
144  const util::FilterOptions& options) const noexcept {
145  std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
146  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
147  auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
148  arrow_fs_, {path}, format,
149  arrow::dataset::FileSystemFactoryOptions()));
150  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto dataset, factory->Finish());
151  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scan_builder, dataset->NewScan());
152 #if ARROW_VERSION >= 21000000
153  RETURN_NOT_ARROW_OK(arrow::compute::Initialize());
154 #endif
155  // Apply the row filter and select the specified columns
156  if (options.filter) {
157  GAR_ASSIGN_OR_RAISE(auto filter, options.filter->Evaluate());
158  RETURN_NOT_ARROW_OK(scan_builder->Filter(filter));
159  }
160  if (options.columns) {
161  RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns));
162  }
163 
164  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scanner, scan_builder->Finish());
165  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto table, scanner->ToTable());
166  // cast string array to large string array as we need concatenate chunks in
167  // some places, e.g., in vineyard
168  for (int i = 0; i < table->num_columns(); ++i) {
169  std::shared_ptr<arrow::DataType> type = table->column(i)->type();
170  if (type->id() == arrow::Type::STRING) {
171  type = arrow::large_utf8();
172  } else if (type->id() == arrow::Type::BINARY) {
173  type = arrow::large_binary();
174  }
175  if (type->Equals(table->column(i)->type())) {
176  continue;
177  }
178  // do casting
179  auto field = table->field(i)->WithType(type);
180  std::shared_ptr<arrow::ChunkedArray> chunked_array;
181 
182  if (table->num_rows() == 0) {
183  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
184  chunked_array, arrow::ChunkedArray::MakeEmpty(type));
185  } else if (type->Equals(arrow::large_utf8())) {
186  auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
187  arrow::LargeStringArray>(
188  table->column(i), type, chunked_array);
189  GAR_RETURN_NOT_OK(status);
190  } else if (type->Equals(arrow::large_binary())) {
191  auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
192  arrow::LargeBinaryArray>(
193  table->column(i), type, chunked_array);
194  GAR_RETURN_NOT_OK(status);
195  } else {
196  // noop
197  chunked_array = table->column(i);
198  }
199  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
200  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
201  table, table->AddColumn(i, field, chunked_array));
202  }
203  return table;
204 }
205 
206 template <typename T>
207 Result<T> FileSystem::ReadFileToValue(const std::string& path) const noexcept {
208  T ret;
209  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto input,
210  arrow_fs_->OpenInputStream(path));
211  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes,
212  input->Read(sizeof(T), &ret));
213  ARROW_UNUSED(bytes);
214  return ret;
215 }
216 
217 template <>
218 Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
219  noexcept {
220  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto access_file,
221  arrow_fs_->OpenInputFile(path));
222  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes, access_file->GetSize());
223  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto buffer,
224  access_file->ReadAt(0, bytes));
225  return buffer->ToString();
226 }
227 
228 template <typename T>
230  const std::string& path) const noexcept {
231  // try to create the directory, oss filesystem may not support this, ignore
232  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
233  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
234  arrow_fs_->OpenOutputStream(path));
235  RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
236  RETURN_NOT_ARROW_OK(ofstream->Close());
237  return Status::OK();
238 }
239 
240 template <>
241 Status FileSystem::WriteValueToFile(const std::string& value,
242  const std::string& path) const noexcept {
243  // try to create the directory, oss filesystem may not support this, ignore
244  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
245  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
246  arrow_fs_->OpenOutputStream(path));
247  RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
248  RETURN_NOT_ARROW_OK(ofstream->Close());
249  return Status::OK();
250 }
251 
253  const std::shared_ptr<arrow::Table>& table, FileType file_type,
254  const std::string& path,
255  const std::shared_ptr<WriterOptions>& options) const noexcept {
256  // try to create the directory, oss filesystem may not support this, ignore
257  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
258  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
259  arrow_fs_->OpenOutputStream(path));
260  switch (file_type) {
261  case FileType::CSV: {
262  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
263  auto writer,
264  arrow::csv::MakeCSVWriter(output_stream.get(), table->schema(),
265  options->getCsvOption()));
266  RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
267  RETURN_NOT_ARROW_OK(writer->Close());
268  break;
269  }
270  case FileType::PARQUET: {
271  auto schema = table->schema();
272  RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
273  *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
274  options->getParquetWriterProperties(),
275  options->getArrowWriterProperties()));
276  break;
277  }
278 #ifdef ARROW_ORC
279  case FileType::ORC: {
280  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
281  auto writer, arrow::adapters::orc::ORCFileWriter::Open(
282  output_stream.get(), options->getOrcOption()));
283  RETURN_NOT_ARROW_OK(writer->Write(*table));
284  RETURN_NOT_ARROW_OK(writer->Close());
285  break;
286  }
287 #endif
288  default:
289  return Status::Invalid(
290  "Unsupported file type: ", FileTypeToString(file_type), " for wrting.");
291  }
292  return Status::OK();
293 }
294 
296  const std::shared_ptr<arrow::Table>& table, const std::string& path) const
297  noexcept {
298  // try to create the directory, oss filesystem may not support this, ignore
299  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
300  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
301  arrow_fs_->OpenOutputStream(path));
302  auto schema = table->schema();
303  auto column_num = schema->num_fields();
304  parquet::WriterProperties::Builder builder;
305  builder.compression(arrow::Compression::type::ZSTD); // enable compression
306  builder.encoding(parquet::Encoding::RLE);
307  RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
308  *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
309  builder.build(), parquet::default_arrow_writer_properties()));
310  return Status::OK();
311 }
312 
313 Status FileSystem::CopyFile(const std::string& src_path,
314  const std::string& dst_path) const noexcept {
315  // try to create the directory, oss filesystem may not support this, ignore
316  ARROW_UNUSED(
317  arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/"))));
318  RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
319  return Status::OK();
320 }
321 
322 Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
323  bool recursive) const noexcept {
324  arrow::fs::FileSelector file_selector;
325  file_selector.base_dir = dir_path;
326  file_selector.allow_not_found = false; // if dir_path not exist, return error
327  file_selector.recursive = recursive;
328  arrow::fs::FileInfoVector file_infos;
329  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
330  arrow_fs_->GetFileInfo(file_selector));
331  return static_cast<IdType>(file_infos.size());
332 }
333 
334 FileSystem::~FileSystem() {}
335 
336 Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
337  const std::string& uri_string, std::string* out_path) {
338  if (uri_string.length() >= 1 && uri_string[0] == '/') {
339  // if the uri_string is an absolute path, we need to create a local file
340  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
341  auto arrow_fs,
342  arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
343  // arrow would delete the last slash, so use uri string
344  if (out_path != nullptr) {
345  *out_path = uri_string;
346  }
347  return std::make_shared<FileSystem>(arrow_fs);
348  }
349 
350  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
351  auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
352  auto uri = uri::parse_uri(uri_string);
353  if (uri.error != uri::Error::None) {
354  return Status::Invalid("Failed to parse URI: ", uri_string);
355  }
356  if (out_path != nullptr) {
357  if (uri.scheme == "file" || uri.scheme == "hdfs" || uri.scheme.empty()) {
358  *out_path = uri.path;
359  } else if (uri.scheme == "s3" || uri.scheme == "gs") {
360  // bucket name is the host, path is the path
361  *out_path = uri.authority.host + uri.path;
362  } else {
363  return Status::Invalid("Unrecognized filesystem type in URI: ",
364  uri_string);
365  }
366  }
367  return std::make_shared<FileSystem>(arrow_fs);
368 }
369 
370 // arrow::fs::InitializeS3 and arrow::fs::FinalizeS3 need arrow_version >= 15
371 Status InitializeS3() {
372 #if defined(ARROW_VERSION) && ARROW_VERSION > 14000000
373 #if defined(ARROW_S3)
374  auto options = arrow::fs::S3GlobalOptions::Defaults();
375 #endif
376 #else
377  arrow::fs::S3GlobalOptions options;
378  options.log_level = arrow::fs::S3LogLevel::Fatal;
379 #endif
380 #if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
381 #if defined(ARROW_S3)
382  RETURN_NOT_ARROW_OK(arrow::fs::InitializeS3(options));
383 #endif
384 #endif
385  return Status::OK();
386 }
387 
388 Status FinalizeS3() {
389 #if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
390 #if defined(ARROW_S3)
391  RETURN_NOT_ARROW_OK(arrow::fs::FinalizeS3());
392 #endif
393 #endif
394  return Status::OK();
395 }
396 
398 template Result<IdType> FileSystem::ReadFileToValue<IdType>(
399  const std::string&) const noexcept;
401 template Status FileSystem::WriteValueToFile<IdType>(const IdType&,
402  const std::string&) const
403  noexcept;
404 } // namespace graphar
Result< IdType > GetFileNumOfDir(const std::string &dir_path, bool recursive=false) const noexcept
Definition: filesystem.cc:322
Status WriteValueToFile(const T &value, const std::string &path) const noexcept
Write a value of type T to a file.
Definition: filesystem.cc:229
Result< T > ReadFileToValue(const std::string &path) const noexcept
Read a file and convert its bytes to a value of type T.
Definition: filesystem.cc:207
Status CopyFile(const std::string &src_path, const std::string &dst_path) const noexcept
Definition: filesystem.cc:313
Result< std::shared_ptr< arrow::Table > > ReadFileToTable(const std::string &path, FileType file_type, const util::FilterOptions &options={}) const noexcept
Read and filter a file as an arrow::Table.
Definition: filesystem.cc:142
Status WriteLabelTableToFile(const std::shared_ptr< arrow::Table > &table, const std::string &path) const noexcept
Write a label table to a file with parquet type.
Definition: filesystem.cc:295
Status WriteTableToFile(const std::shared_ptr< arrow::Table > &table, FileType file_type, const std::string &path, const std::shared_ptr< WriterOptions > &options) const noexcept
Write a table to a file with a specific type.
Definition: filesystem.cc:252
Status outcome object (success or error)
Definition: status.h:123
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157