Apache GraphAr C++ Library
The C++ Library for Apache GraphAr
filesystem.cc
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifdef ARROW_ORC
21 #include "arrow/adapters/orc/adapter.h"
22 #endif
23 #include "arrow/api.h"
24 #include "arrow/csv/api.h"
25 #include "arrow/dataset/api.h"
26 #include "arrow/filesystem/api.h"
27 #include "arrow/filesystem/s3fs.h"
28 #include "arrow/ipc/writer.h"
29 #include "parquet/arrow/writer.h"
30 #include "simple-uri-parser/uri_parser.h"
31 
32 #include "graphar/expression.h"
33 #include "graphar/filesystem.h"
34 #include "graphar/fwd.h"
35 
36 namespace graphar::detail {
37 template <typename U, typename T>
38 static Status CastToLargeOffsetArray(
39  const std::shared_ptr<arrow::Array>& in,
40  const std::shared_ptr<arrow::DataType>& to_type,
41  std::shared_ptr<arrow::Array>& out) { // NOLINT(runtime/references)
42  auto array_data = in->data()->Copy();
43  auto offset = array_data->buffers[1];
44  using from_offset_type = typename U::offset_type;
45  using to_string_offset_type = typename T::offset_type;
46  auto raw_value_offsets_ =
47  offset == NULLPTR
48  ? NULLPTR
49  : reinterpret_cast<const from_offset_type*>(offset->data());
50  std::vector<to_string_offset_type> to_offset(offset->size() /
51  sizeof(from_offset_type));
52  for (size_t i = 0; i < to_offset.size(); ++i) {
53  to_offset[i] = raw_value_offsets_[i];
54  }
55  std::shared_ptr<arrow::Buffer> buffer;
56  arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
57  RETURN_NOT_ARROW_OK(
58  buffer_builder.Append(to_offset.data(), to_offset.size()));
59  RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
60  array_data->type = to_type;
61  array_data->buffers[1] = buffer;
62  out = arrow::MakeArray(array_data);
63  RETURN_NOT_ARROW_OK(out->ValidateFull());
64  return Status::OK();
65 }
66 
67 template <typename U, typename T>
68 static Status CastToLargeOffsetArray(
69  const std::shared_ptr<arrow::ChunkedArray>& in,
70  const std::shared_ptr<arrow::DataType>& to_type,
71  std::shared_ptr<arrow::ChunkedArray>& out) { // NOLINT(runtime/references)
72  std::vector<std::shared_ptr<arrow::Array>> chunks;
73  for (auto const& chunk : in->chunks()) {
74  std::shared_ptr<arrow::Array> array;
75  auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
76  GAR_RETURN_NOT_OK(status);
77  chunks.emplace_back(array);
78  }
79  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
80  return Status::OK();
81 }
82 } // namespace graphar::detail
83 
84 namespace graphar {
85 namespace ds = arrow::dataset;
86 
87 std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
88  const FileType type) const {
89  switch (type) {
90  case CSV:
91  return std::make_shared<ds::CsvFileFormat>();
92  case PARQUET:
93  return std::make_shared<ds::ParquetFileFormat>();
94  case JSON:
95  return std::make_shared<ds::JsonFileFormat>();
96 #ifdef ARROW_ORC
97  case ORC:
98  return std::make_shared<ds::OrcFileFormat>();
99 #endif
100  default:
101  return nullptr;
102  }
103 }
104 
105 Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
106  const std::string& path, FileType file_type,
107  const util::FilterOptions& options) const noexcept {
108  std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
109  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
110  auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
111  arrow_fs_, {path}, format,
112  arrow::dataset::FileSystemFactoryOptions()));
113  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto dataset, factory->Finish());
114  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scan_builder, dataset->NewScan());
115 
116  // Apply the row filter and select the specified columns
117  if (options.filter) {
118  GAR_ASSIGN_OR_RAISE(auto filter, options.filter->Evaluate());
119  RETURN_NOT_ARROW_OK(scan_builder->Filter(filter));
120  }
121  if (options.columns) {
122  RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns));
123  }
124 
125  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scanner, scan_builder->Finish());
126  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto table, scanner->ToTable());
127  // cast string array to large string array as we need concatenate chunks in
128  // some places, e.g., in vineyard
129  for (int i = 0; i < table->num_columns(); ++i) {
130  std::shared_ptr<arrow::DataType> type = table->column(i)->type();
131  if (type->id() == arrow::Type::STRING) {
132  type = arrow::large_utf8();
133  } else if (type->id() == arrow::Type::BINARY) {
134  type = arrow::large_binary();
135  }
136  if (type->Equals(table->column(i)->type())) {
137  continue;
138  }
139  // do casting
140  auto field = table->field(i)->WithType(type);
141  std::shared_ptr<arrow::ChunkedArray> chunked_array;
142 
143  if (table->num_rows() == 0) {
144  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
145  chunked_array, arrow::ChunkedArray::MakeEmpty(type));
146  } else if (type->Equals(arrow::large_utf8())) {
147  auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
148  arrow::LargeStringArray>(
149  table->column(i), type, chunked_array);
150  GAR_RETURN_NOT_OK(status);
151  } else if (type->Equals(arrow::large_binary())) {
152  auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
153  arrow::LargeBinaryArray>(
154  table->column(i), type, chunked_array);
155  GAR_RETURN_NOT_OK(status);
156  } else {
157  // noop
158  chunked_array = table->column(i);
159  }
160  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
161  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
162  table, table->AddColumn(i, field, chunked_array));
163  }
164  return table;
165 }
166 
167 template <typename T>
168 Result<T> FileSystem::ReadFileToValue(const std::string& path) const noexcept {
169  T ret;
170  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto input,
171  arrow_fs_->OpenInputStream(path));
172  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes,
173  input->Read(sizeof(T), &ret));
174  ARROW_UNUSED(bytes);
175  return ret;
176 }
177 
178 template <>
179 Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
180  noexcept {
181  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto access_file,
182  arrow_fs_->OpenInputFile(path));
183  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes, access_file->GetSize());
184  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto buffer,
185  access_file->ReadAt(0, bytes));
186  return buffer->ToString();
187 }
188 
189 template <typename T>
191  const std::string& path) const noexcept {
192  // try to create the directory, oss filesystem may not support this, ignore
193  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
194  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
195  arrow_fs_->OpenOutputStream(path));
196  RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
197  RETURN_NOT_ARROW_OK(ofstream->Close());
198  return Status::OK();
199 }
200 
201 template <>
202 Status FileSystem::WriteValueToFile(const std::string& value,
203  const std::string& path) const noexcept {
204  // try to create the directory, oss filesystem may not support this, ignore
205  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
206  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
207  arrow_fs_->OpenOutputStream(path));
208  RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
209  RETURN_NOT_ARROW_OK(ofstream->Close());
210  return Status::OK();
211 }
212 
213 Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
214  FileType file_type,
215  const std::string& path) const noexcept {
216  // try to create the directory, oss filesystem may not support this, ignore
217  ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
218  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
219  arrow_fs_->OpenOutputStream(path));
220  switch (file_type) {
221  case FileType::CSV: {
222  auto write_options = arrow::csv::WriteOptions::Defaults();
223  write_options.include_header = true;
224  write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
225  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
226  auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
227  table->schema(), write_options));
228  RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
229  RETURN_NOT_ARROW_OK(writer->Close());
230  break;
231  }
232  case FileType::PARQUET: {
233  parquet::WriterProperties::Builder builder;
234  builder.compression(arrow::Compression::type::ZSTD); // enable compression
235  RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
236  *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
237  builder.build(), parquet::default_arrow_writer_properties()));
238  break;
239  }
240 #ifdef ARROW_ORC
241  case FileType::ORC: {
242  auto writer_options = arrow::adapters::orc::WriteOptions();
243  writer_options.compression = arrow::Compression::type::ZSTD;
244  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
245  auto writer, arrow::adapters::orc::ORCFileWriter::Open(
246  output_stream.get(), writer_options));
247  RETURN_NOT_ARROW_OK(writer->Write(*table));
248  RETURN_NOT_ARROW_OK(writer->Close());
249  break;
250  }
251 #endif
252  default:
253  return Status::Invalid(
254  "Unsupported file type: ", FileTypeToString(file_type), " for wrting.");
255  }
256  return Status::OK();
257 }
258 
259 Status FileSystem::CopyFile(const std::string& src_path,
260  const std::string& dst_path) const noexcept {
261  // try to create the directory, oss filesystem may not support this, ignore
262  ARROW_UNUSED(
263  arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/"))));
264  RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
265  return Status::OK();
266 }
267 
268 Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
269  bool recursive) const noexcept {
270  arrow::fs::FileSelector file_selector;
271  file_selector.base_dir = dir_path;
272  file_selector.allow_not_found = false; // if dir_path not exist, return error
273  file_selector.recursive = recursive;
274  arrow::fs::FileInfoVector file_infos;
275  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
276  arrow_fs_->GetFileInfo(file_selector));
277  return static_cast<IdType>(file_infos.size());
278 }
279 
280 FileSystem::~FileSystem() {}
281 
282 Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
283  const std::string& uri_string, std::string* out_path) {
284  if (uri_string.length() >= 1 && uri_string[0] == '/') {
285  // if the uri_string is an absolute path, we need to create a local file
286  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
287  auto arrow_fs,
288  arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
289  // arrow would delete the last slash, so use uri string
290  if (out_path != nullptr) {
291  *out_path = uri_string;
292  }
293  return std::make_shared<FileSystem>(arrow_fs);
294  }
295 
296  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
297  auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
298  auto uri = uri::parse_uri(uri_string);
299  if (uri.error != uri::Error::None) {
300  return Status::Invalid("Failed to parse URI: ", uri_string);
301  }
302  if (out_path != nullptr) {
303  if (uri.scheme == "file" || uri.scheme == "hdfs" || uri.scheme.empty()) {
304  *out_path = uri.path;
305  } else if (uri.scheme == "s3" || uri.scheme == "gs") {
306  // bucket name is the host, path is the path
307  *out_path = uri.authority.host + uri.path;
308  } else {
309  return Status::Invalid("Unrecognized filesystem type in URI: ",
310  uri_string);
311  }
312  }
313  return std::make_shared<FileSystem>(arrow_fs);
314 }
315 
316 Status InitializeS3() {
317  RETURN_NOT_ARROW_OK(
318  arrow::fs::InitializeS3(arrow::fs::S3GlobalOptions::Defaults()));
319  return Status::OK();
320 }
321 
322 Status FinalizeS3() {
323  RETURN_NOT_ARROW_OK(arrow::fs::FinalizeS3());
324  return Status::OK();
325 }
326 
328 template Result<IdType> FileSystem::ReadFileToValue<IdType>(
329  const std::string&) const noexcept;
331 template Status FileSystem::WriteValueToFile<IdType>(const IdType&,
332  const std::string&) const
333  noexcept;
334 } // namespace graphar
Result< IdType > GetFileNumOfDir(const std::string &dir_path, bool recursive=false) const noexcept
Definition: filesystem.cc:268
Status WriteValueToFile(const T &value, const std::string &path) const noexcept
Write a value of type T to a file.
Definition: filesystem.cc:190
Status WriteTableToFile(const std::shared_ptr< arrow::Table > &table, FileType file_type, const std::string &path) const noexcept
Write a table to a file with a specific type.
Definition: filesystem.cc:213
Result< T > ReadFileToValue(const std::string &path) const noexcept
Read a file and convert its bytes to a value of type T.
Definition: filesystem.cc:168
Status CopyFile(const std::string &src_path, const std::string &dst_path) const noexcept
Definition: filesystem.cc:259
Result< std::shared_ptr< arrow::Table > > ReadFileToTable(const std::string &path, FileType file_type, const util::FilterOptions &options={}) const noexcept
Read and filter a file as an arrow::Table.
Definition: filesystem.cc:105
Status outcome object (success or error)
Definition: status.h:123
static Status Invalid(Args &&... args)
Definition: status.h:188
static Status OK()
Definition: status.h:157