-
Notifications
You must be signed in to change notification settings - Fork 76
Description
The main goal of this issue is to track the progress of file writer implementations.
Background
We already have file format writers as below:
- writer interface: https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/file_writer.h
- avro writer: https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_writer.h
- parquet writer: https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/parquet/parquet_writer.h
To create a writer, users can call WriterFactory to create one for a specific file format. Users can also register their own writer implementations via `WriterFactoryRegistry.
Tentative Design
Now we need to support various writers for data file, v2 equality and position delete files. The following class interface and hierarchy has learned the art of Java implementation but has been simplified a lot. It is subject to change during the implementation stage.
FileWriter
Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/FileWriter.java
/// \brief Base interface for data file writers.
class ICEBERG_EXPORT FileWriter {
public:
virtual ~FileWriter() = default;
/// \brief Write a batch of records.
virtual Status Write(ArrowArray* data) = 0;
/// \brief Get the current number of bytes written.
virtual Result<int64_t> Length() const = 0;
/// \brief Close the writer and finalize the file.
virtual Status Close() = 0;
/// \brief File metadata for all files produced by the writer.
struct ICEBERG_EXPORT WriteResult {
// Usually a writer produces a single data or delete file.
// Position delete writer may produces multiple file-scoped delete files.
// In the future, multiple files can be produced if file rolling is supported.
std::vector<std::shared_ptr<DataFile>> data_files;
};
/// \brief Get file metadata for all files produced by this writer.
virtual Result<WriteResult> Metadata()
};DataWriter
Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/DataWriter.java
/// \brief Options for creating a DataWriter.
struct ICEBERG_EXPORT DataWriterOptions {
std::string path;
std::shared_ptr<Schema> schema;
std::shared_ptr<PartitionSpec> spec;
PartitionValues partition;
FileFormatType format = FileFormatType::kParquet;
std::shared_ptr<FileIO> io;
std::optional<int32_t> sort_order_id;
std::shared_ptr<WriterProperties> properties;
};
/// \brief Writer for Iceberg data files.
class ICEBERG_EXPORT DataWriter : public FileWriter {
public:
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
~DataWriter() override;
Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
Result<WriteResult> Metadata() override;
private:
class Impl;
std::unique_ptr<Impl> impl_;
explicit DataWriter(std::unique_ptr<Impl> impl);
};PositionDeleteWriter
/// \brief Options for creating a PositionDeleteWriter.
struct ICEBERG_EXPORT PositionDeleteWriterOptions {
std::string path;
std::shared_ptr<Schema> schema;
std::shared_ptr<PartitionSpec> spec;
PartitionValues partition;
FileFormatType format = FileFormatType::kParquet;
std::shared_ptr<FileIO> io;
std::shared_ptr<Schema> row_schema; // Optional row data schema
std::shared_ptr<WriterProperties> properties;
};
/// \brief Writer for Iceberg position delete files.
class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
public:
static Result<std::unique_ptr<PositionDeleteWriter>> Make(
const PositionDeleteWriterOptions& options);
~PositionDeleteWriter() override;
Status Write(ArrowArray* data) override;
Status WriteDelete(std::string_view file_path, int64_t pos);
Result<int64_t> Length() const override;
Status Close() override;
Result<WriteResult> Metadata() override;
private:
class Impl;
std::unique_ptr<Impl> impl_;
explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
};EqualityDeleteWriter
/// \brief Options for creating an EqualityDeleteWriter.
struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
std::string path;
std::shared_ptr<Schema> schema;
std::shared_ptr<PartitionSpec> spec;
PartitionValues partition;
FileFormatType format = FileFormatType::kParquet;
std::shared_ptr<FileIO> io;
std::vector<int32_t> equality_field_ids;
std::optional<int32_t> sort_order_id;
std::shared_ptr<WriterProperties> properties;
};
/// \brief Writer for Iceberg equality delete files.
class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
public:
static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
const EqualityDeleteWriterOptions& options);
~EqualityDeleteWriter() override;
Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
Result<WriteResult> Metadata() override;
const std::vector<int32_t>& equality_field_ids() const;
private:
class Impl;
std::unique_ptr<Impl> impl_;
explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
};FileWriterFactory
/// \brief Factory for creating Iceberg file writers.
class ICEBERG_EXPORT FileWriterFactory {
public:
FileWriterFactory(std::shared_ptr<Schema> schema,
std::shared_ptr<PartitionSpec> spec,
std::shared_ptr<FileIO> io,
std::shared_ptr<WriterProperties> properties = nullptr);
~FileWriterFactory();
void SetEqualityDeleteConfig(std::shared_ptr<Schema> eq_delete_schema,
std::vector<int32_t> equality_field_ids);
void SetPositionDeleteRowSchema(std::shared_ptr<Schema> pos_delete_row_schema);
Result<std::unique_ptr<DataWriter>> NewDataWriter(
const std::string& path, FileFormatType format,
const PartitionValues& partition,
std::optional<int32_t> sort_order_id = std::nullopt);
Result<std::unique_ptr<PositionDeleteWriter>> NewPositionDeleteWriter(
const std::string& path, FileFormatType format,
const PartitionValues& partition);
Result<std::unique_ptr<EqualityDeleteWriter>> NewEqualityDeleteWriter(
const std::string& path, FileFormatType format,
const PartitionValues& partition,
std::optional<int32_t> sort_order_id = std::nullopt);
private:
class Impl;
std::unique_ptr<Impl> impl_;
};Tasks
- Add
iceberg/datasubdirectory and writer interfaces (iceberg/data/writer.h|cc). - Implement data file writer (
iceberg/data/data_writer.h|cc). - Implement position file writer (
iceberg/data/position_delete_writer.h|cc). - Implement equality file writer (
iceberg/data/equality_delete_writer.h|cc). - Implement file writer factory (still use
iceberg/data/writer.h|cc).