21 #include "arrow/adapters/orc/adapter.h"
23 #include "arrow/api.h"
24 #include "arrow/csv/api.h"
25 #include "arrow/dataset/api.h"
26 #include "arrow/filesystem/api.h"
27 #include "arrow/filesystem/s3fs.h"
28 #include "arrow/ipc/writer.h"
29 #include "parquet/arrow/writer.h"
30 #include "simple-uri-parser/uri_parser.h"
32 #include "graphar/expression.h"
33 #include "graphar/filesystem.h"
34 #include "graphar/fwd.h"
36 namespace graphar::detail {
37 template <
typename U,
typename T>
38 static Status CastToLargeOffsetArray(
39 const std::shared_ptr<arrow::Array>& in,
40 const std::shared_ptr<arrow::DataType>& to_type,
41 std::shared_ptr<arrow::Array>& out) {
42 auto array_data = in->data()->Copy();
43 auto offset = array_data->buffers[1];
44 using from_offset_type =
typename U::offset_type;
45 using to_string_offset_type =
typename T::offset_type;
46 auto raw_value_offsets_ =
49 :
reinterpret_cast<const from_offset_type*
>(offset->data());
50 std::vector<to_string_offset_type> to_offset(offset->size() /
51 sizeof(from_offset_type));
52 for (
size_t i = 0; i < to_offset.size(); ++i) {
53 to_offset[i] = raw_value_offsets_[i];
55 std::shared_ptr<arrow::Buffer> buffer;
56 arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
58 buffer_builder.Append(to_offset.data(), to_offset.size()));
59 RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
60 array_data->type = to_type;
61 array_data->buffers[1] = buffer;
62 out = arrow::MakeArray(array_data);
63 RETURN_NOT_ARROW_OK(out->ValidateFull());
67 template <
typename U,
typename T>
68 static Status CastToLargeOffsetArray(
69 const std::shared_ptr<arrow::ChunkedArray>& in,
70 const std::shared_ptr<arrow::DataType>& to_type,
71 std::shared_ptr<arrow::ChunkedArray>& out) {
72 std::vector<std::shared_ptr<arrow::Array>> chunks;
73 for (
auto const& chunk : in->chunks()) {
74 std::shared_ptr<arrow::Array> array;
75 auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
76 GAR_RETURN_NOT_OK(status);
77 chunks.emplace_back(array);
79 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
85 namespace ds = arrow::dataset;
87 std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
88 const FileType type)
const {
91 return std::make_shared<ds::CsvFileFormat>();
93 return std::make_shared<ds::ParquetFileFormat>();
95 return std::make_shared<ds::JsonFileFormat>();
98 return std::make_shared<ds::OrcFileFormat>();
106 const std::string& path, FileType file_type,
108 std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
109 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
110 auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
111 arrow_fs_, {path}, format,
112 arrow::dataset::FileSystemFactoryOptions()));
113 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto dataset, factory->Finish());
114 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto scan_builder, dataset->NewScan());
117 if (options.filter) {
118 GAR_ASSIGN_OR_RAISE(
auto filter, options.filter->Evaluate());
119 RETURN_NOT_ARROW_OK(scan_builder->Filter(filter));
121 if (options.columns) {
122 RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns));
125 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto scanner, scan_builder->Finish());
126 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto table, scanner->ToTable());
129 for (
int i = 0; i < table->num_columns(); ++i) {
130 std::shared_ptr<arrow::DataType> type = table->column(i)->type();
131 if (type->id() == arrow::Type::STRING) {
132 type = arrow::large_utf8();
133 }
else if (type->id() == arrow::Type::BINARY) {
134 type = arrow::large_binary();
136 if (type->Equals(table->column(i)->type())) {
140 auto field = table->field(i)->WithType(type);
141 std::shared_ptr<arrow::ChunkedArray> chunked_array;
143 if (table->num_rows() == 0) {
144 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
145 chunked_array, arrow::ChunkedArray::MakeEmpty(type));
146 }
else if (type->Equals(arrow::large_utf8())) {
147 auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
148 arrow::LargeStringArray>(
149 table->column(i), type, chunked_array);
150 GAR_RETURN_NOT_OK(status);
151 }
else if (type->Equals(arrow::large_binary())) {
152 auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
153 arrow::LargeBinaryArray>(
154 table->column(i), type, chunked_array);
155 GAR_RETURN_NOT_OK(status);
158 chunked_array = table->column(i);
160 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
161 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
162 table, table->AddColumn(i, field, chunked_array));
167 template <
typename T>
170 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto input,
171 arrow_fs_->OpenInputStream(path));
172 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto bytes,
173 input->Read(
sizeof(T), &ret));
181 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto access_file,
182 arrow_fs_->OpenInputFile(path));
183 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto bytes, access_file->GetSize());
184 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto buffer,
185 access_file->ReadAt(0, bytes));
186 return buffer->ToString();
189 template <
typename T>
191 const std::string& path)
const noexcept {
193 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
194 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto ofstream,
195 arrow_fs_->OpenOutputStream(path));
196 RETURN_NOT_ARROW_OK(ofstream->Write(&value,
sizeof(T)));
197 RETURN_NOT_ARROW_OK(ofstream->Close());
203 const std::string& path)
const noexcept {
205 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
206 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto ofstream,
207 arrow_fs_->OpenOutputStream(path));
208 RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
209 RETURN_NOT_ARROW_OK(ofstream->Close());
215 const std::string& path)
const noexcept {
217 ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of(
"/"))));
218 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto output_stream,
219 arrow_fs_->OpenOutputStream(path));
221 case FileType::CSV: {
222 auto write_options = arrow::csv::WriteOptions::Defaults();
223 write_options.include_header =
true;
224 write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
225 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
226 auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
227 table->schema(), write_options));
228 RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
229 RETURN_NOT_ARROW_OK(writer->Close());
232 case FileType::PARQUET: {
233 parquet::WriterProperties::Builder builder;
234 builder.compression(arrow::Compression::type::ZSTD);
235 RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
236 *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
237 builder.build(), parquet::default_arrow_writer_properties()));
241 case FileType::ORC: {
242 auto writer_options = arrow::adapters::orc::WriteOptions();
243 writer_options.compression = arrow::Compression::type::ZSTD;
244 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
245 auto writer, arrow::adapters::orc::ORCFileWriter::Open(
246 output_stream.get(), writer_options));
247 RETURN_NOT_ARROW_OK(writer->Write(*table));
248 RETURN_NOT_ARROW_OK(writer->Close());
254 "Unsupported file type: ", FileTypeToString(file_type),
" for wrting.");
260 const std::string& dst_path)
const noexcept {
263 arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of(
"/"))));
264 RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
269 bool recursive)
const noexcept {
270 arrow::fs::FileSelector file_selector;
271 file_selector.base_dir = dir_path;
272 file_selector.allow_not_found =
false;
273 file_selector.recursive = recursive;
274 arrow::fs::FileInfoVector file_infos;
275 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
276 arrow_fs_->GetFileInfo(file_selector));
277 return static_cast<IdType
>(file_infos.size());
280 FileSystem::~FileSystem() {}
282 Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
283 const std::string& uri_string, std::string* out_path) {
284 if (uri_string.length() >= 1 && uri_string[0] ==
'/') {
286 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
288 arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
290 if (out_path !=
nullptr) {
291 *out_path = uri_string;
293 return std::make_shared<FileSystem>(arrow_fs);
296 GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
297 auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
298 auto uri = uri::parse_uri(uri_string);
299 if (uri.error != uri::Error::None) {
302 if (out_path !=
nullptr) {
303 if (uri.scheme ==
"file" || uri.scheme ==
"hdfs" || uri.scheme.empty()) {
304 *out_path = uri.path;
305 }
else if (uri.scheme ==
"s3" || uri.scheme ==
"gs") {
307 *out_path = uri.authority.host + uri.path;
313 return std::make_shared<FileSystem>(arrow_fs);
316 Status InitializeS3() {
318 arrow::fs::InitializeS3(arrow::fs::S3GlobalOptions::Defaults()));
322 Status FinalizeS3() {
323 RETURN_NOT_ARROW_OK(arrow::fs::FinalizeS3());
328 template Result<IdType> FileSystem::ReadFileToValue<IdType>(
329 const std::string&)
const noexcept;
331 template Status FileSystem::WriteValueToFile<IdType>(
const IdType&,
332 const std::string&)
const
Result< IdType > GetFileNumOfDir(const std::string &dir_path, bool recursive=false) const noexcept
Status WriteValueToFile(const T &value, const std::string &path) const noexcept
Write a value of type T to a file.
Status WriteTableToFile(const std::shared_ptr< arrow::Table > &table, FileType file_type, const std::string &path) const noexcept
Write a table to a file with a specific type.
Result< T > ReadFileToValue(const std::string &path) const noexcept
Read a file and convert its bytes to a value of type T.
Status CopyFile(const std::string &src_path, const std::string &dst_path) const noexcept
Result< std::shared_ptr< arrow::Table > > ReadFileToTable(const std::string &path, FileType file_type, const util::FilterOptions &options={}) const noexcept
Read and filter a file as an arrow::Table.
Status outcome object (success or error)
static Status Invalid(Args &&... args)