22 #include "graphar/writer_util.h"
24 #include "arrow/adapters/orc/adapter.h"
26 #include <arrow/compute/api.h>
27 #include "arrow/api.h"
28 #include "arrow/csv/api.h"
29 #include "arrow/dataset/api.h"
30 #include "parquet/arrow/reader.h"
31 #if defined(ARROW_VERSION) && ARROW_VERSION <= 12000000
32 #include "arrow/dataset/file_json.h"
34 #include "arrow/filesystem/api.h"
35 #include "arrow/filesystem/s3fs.h"
36 #include "arrow/ipc/writer.h"
37 #include "parquet/arrow/writer.h"
38 #include "simple-uri-parser/uri_parser.h"
40 #include "graphar/expression.h"
41 #include "graphar/filesystem.h"
42 #include "graphar/fwd.h"
43 #include "graphar/general_params.h"
45 namespace graphar::detail {
46 template <
typename U,
typename T>
47 static Status CastToLargeOffsetArray(
48 const std::shared_ptr<arrow::Array>& in,
49 const std::shared_ptr<arrow::DataType>& to_type,
50 std::shared_ptr<arrow::Array>& out) {
51 auto array_data = in->data()->Copy();
52 auto offset = array_data->buffers[1];
53 using from_offset_type =
typename U::offset_type;
54 using to_string_offset_type =
typename T::offset_type;
55 auto raw_value_offsets_ =
58 :
reinterpret_cast<const from_offset_type*
>(offset->data());
59 std::vector<to_string_offset_type> to_offset(offset->size() /
60 sizeof(from_offset_type));
61 for (
size_t i = 0; i < to_offset.size(); ++i) {
62 to_offset[i] = raw_value_offsets_[i];
64 std::shared_ptr<arrow::Buffer> buffer;
65 arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
67 buffer_builder.Append(to_offset.data(), to_offset.size()));
68 RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
69 array_data->type = to_type;
70 array_data->buffers[1] = buffer;
71 out = arrow::MakeArray(array_data);
72 RETURN_NOT_ARROW_OK(out->ValidateFull());
76 template <
typename U,
typename T>
77 static Status CastToLargeOffsetArray(
78 const std::shared_ptr<arrow::ChunkedArray>& in,
79 const std::shared_ptr<arrow::DataType>& to_type,
80 std::shared_ptr<arrow::ChunkedArray>& out) {
81 std::vector<std::shared_ptr<arrow::Array>> chunks;
82 for (
auto const& chunk : in->chunks()) {
83 std::shared_ptr<arrow::Array> array;
84 auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
85 GAR_RETURN_NOT_OK(status);
86 chunks.emplace_back(array);
88 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
94 namespace ds = arrow::dataset;
96 std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
97 const FileType type)
const {
100 return std::make_shared<ds::CsvFileFormat>();
102 return std::make_shared<ds::ParquetFileFormat>();
104 return std::make_shared<ds::JsonFileFormat>();
107 return std::make_shared<ds::OrcFileFormat>();
115 const std::string& path, FileType file_type,
116 const std::vector<int>& column_indices)
const noexcept {
117 parquet::arrow::FileReaderBuilder builder;
118 auto open_file_status = builder.OpenFile(path);
119 if (!open_file_status.ok()) {
121 open_file_status.ToString());
123 builder.memory_pool(arrow::default_memory_pool());
124 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto reader, builder.Build());
125 std::shared_ptr<arrow::Table> table;
126 if (column_indices.empty()) {
127 arrow::Status read_status = reader->ReadTable(&table);
128 if (!read_status.ok()) {
129 return Status::Invalid(
"Failed to read table from file: ", path,
" - ",
130 read_status.ToString());
133 arrow::Status read_status = reader->ReadTable(column_indices, &table);
134 if (!read_status.ok()) {
135 return Status::Invalid(
"Failed to read table from file: ", path,
" - ",
136 read_status.ToString());
143 const std::string& path, FileType file_type,
145 std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
146 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
147 auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
148 arrow_fs_, {path}, format,
149 arrow::dataset::FileSystemFactoryOptions()));
150 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto dataset, factory->Finish());
151 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto scan_builder, dataset->NewScan());
152 #if ARROW_VERSION >= 21000000
153 RETURN_NOT_ARROW_OK(arrow::compute::Initialize());
156 if (options.filter) {
157 GAR_ASSIGN_OR_RAISE(
auto filter, options.filter->Evaluate());
158 RETURN_NOT_ARROW_OK(scan_builder->Filter(filter));
160 if (options.columns) {
161 RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns));
164 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto scanner, scan_builder->Finish());
165 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto table, scanner->ToTable());
168 for (
int i = 0; i < table->num_columns(); ++i) {
169 std::shared_ptr<arrow::DataType> type = table->column(i)->type();
170 if (type->id() == arrow::Type::STRING) {
171 type = arrow::large_utf8();
172 }
else if (type->id() == arrow::Type::BINARY) {
173 type = arrow::large_binary();
175 if (type->Equals(table->column(i)->type())) {
179 auto field = table->field(i)->WithType(type);
180 std::shared_ptr<arrow::ChunkedArray> chunked_array;
182 if (table->num_rows() == 0) {
183 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
184 chunked_array, arrow::ChunkedArray::MakeEmpty(type));
185 }
else if (type->Equals(arrow::large_utf8())) {
186 auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
187 arrow::LargeStringArray>(
188 table->column(i), type, chunked_array);
189 GAR_RETURN_NOT_OK(status);
190 }
else if (type->Equals(arrow::large_binary())) {
191 auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
192 arrow::LargeBinaryArray>(
193 table->column(i), type, chunked_array);
194 GAR_RETURN_NOT_OK(status);
197 chunked_array = table->column(i);
199 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
200 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
201 table, table->AddColumn(i, field, chunked_array));
206 template <
typename T>
209 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto input,
210 arrow_fs_->OpenInputStream(path));
211 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto bytes,
212 input->Read(
sizeof(T), &ret));
220 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto access_file,
221 arrow_fs_->OpenInputFile(path));
222 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto bytes, access_file->GetSize());
223 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto buffer,
224 access_file->ReadAt(0, bytes));
225 return buffer->ToString();
228 template <
typename T>
230 const std::string& path)
const noexcept {
232 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
233 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto ofstream,
234 arrow_fs_->OpenOutputStream(path));
235 RETURN_NOT_ARROW_OK(ofstream->Write(&value,
sizeof(T)));
236 RETURN_NOT_ARROW_OK(ofstream->Close());
242 const std::string& path)
const noexcept {
244 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
245 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto ofstream,
246 arrow_fs_->OpenOutputStream(path));
247 RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
248 RETURN_NOT_ARROW_OK(ofstream->Close());
253 const std::shared_ptr<arrow::Table>& table, FileType file_type,
254 const std::string& path,
255 const std::shared_ptr<WriterOptions>& options)
const noexcept {
257 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
258 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto output_stream,
259 arrow_fs_->OpenOutputStream(path));
261 case FileType::CSV: {
262 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
264 arrow::csv::MakeCSVWriter(output_stream.get(), table->schema(),
265 options->getCsvOption()));
266 RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
267 RETURN_NOT_ARROW_OK(writer->Close());
270 case FileType::PARQUET: {
271 auto schema = table->schema();
272 RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
273 *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
274 options->getParquetWriterProperties(),
275 options->getArrowWriterProperties()));
279 case FileType::ORC: {
280 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
281 auto writer, arrow::adapters::orc::ORCFileWriter::Open(
282 output_stream.get(), options->getOrcOption()));
283 RETURN_NOT_ARROW_OK(writer->Write(*table));
284 RETURN_NOT_ARROW_OK(writer->Close());
290 "Unsupported file type: ", FileTypeToString(file_type),
" for wrting.");
296 const std::shared_ptr<arrow::Table>& table,
const std::string& path)
const
299 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
300 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto output_stream,
301 arrow_fs_->OpenOutputStream(path));
302 auto schema = table->schema();
303 auto column_num = schema->num_fields();
304 parquet::WriterProperties::Builder builder;
305 builder.compression(arrow::Compression::type::ZSTD);
306 builder.encoding(parquet::Encoding::RLE);
307 RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
308 *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
309 builder.build(), parquet::default_arrow_writer_properties()));
314 const std::string& dst_path)
const noexcept {
317 arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of(
"/"))));
318 RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
323 bool recursive)
const noexcept {
324 arrow::fs::FileSelector file_selector;
325 file_selector.base_dir = dir_path;
326 file_selector.allow_not_found =
false;
327 file_selector.recursive = recursive;
328 arrow::fs::FileInfoVector file_infos;
329 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
330 arrow_fs_->GetFileInfo(file_selector));
331 return static_cast<IdType
>(file_infos.size());
334 FileSystem::~FileSystem() {}
336 Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
337 const std::string& uri_string, std::string* out_path) {
338 if (uri_string.length() >= 1 && uri_string[0] ==
'/') {
340 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
342 arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
344 if (out_path !=
nullptr) {
345 *out_path = uri_string;
347 return std::make_shared<FileSystem>(arrow_fs);
350 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
351 auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
352 auto uri = uri::parse_uri(uri_string);
353 if (uri.error != uri::Error::None) {
356 if (out_path !=
nullptr) {
357 if (uri.scheme ==
"file" || uri.scheme ==
"hdfs" || uri.scheme.empty()) {
358 *out_path = uri.path;
359 }
else if (uri.scheme ==
"s3" || uri.scheme ==
"gs") {
361 *out_path = uri.authority.host + uri.path;
367 return std::make_shared<FileSystem>(arrow_fs);
371 Status InitializeS3() {
372 #if defined(ARROW_VERSION) && ARROW_VERSION > 14000000
373 #if defined(ARROW_S3)
374 auto options = arrow::fs::S3GlobalOptions::Defaults();
377 arrow::fs::S3GlobalOptions options;
378 options.log_level = arrow::fs::S3LogLevel::Fatal;
380 #if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
381 #if defined(ARROW_S3)
382 RETURN_NOT_ARROW_OK(arrow::fs::InitializeS3(options));
388 Status FinalizeS3() {
389 #if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
390 #if defined(ARROW_S3)
391 RETURN_NOT_ARROW_OK(arrow::fs::FinalizeS3());
398 template Result<IdType> FileSystem::ReadFileToValue<IdType>(
399 const std::string&)
const noexcept;
401 template Status FileSystem::WriteValueToFile<IdType>(
const IdType&,
402 const std::string&)
const
Result< IdType > GetFileNumOfDir(const std::string &dir_path, bool recursive=false) const noexcept
Status WriteValueToFile(const T &value, const std::string &path) const noexcept
Write a value of type T to a file.
Result< T > ReadFileToValue(const std::string &path) const noexcept
Read a file and convert its bytes to a value of type T.
Status CopyFile(const std::string &src_path, const std::string &dst_path) const noexcept
Result< std::shared_ptr< arrow::Table > > ReadFileToTable(const std::string &path, FileType file_type, const util::FilterOptions &options={}) const noexcept
Read and filter a file as an arrow::Table.
Status WriteLabelTableToFile(const std::shared_ptr< arrow::Table > &table, const std::string &path) const noexcept
Write a label table to a file with parquet type.
Status WriteTableToFile(const std::shared_ptr< arrow::Table > &table, FileType file_type, const std::string &path, const std::shared_ptr< WriterOptions > &options) const noexcept
Write a table to a file with a specific type.
Status outcome object (success or error)
static Status Invalid(Args &&... args)