Skip to content

Implement data file writers #441

@wgtmac

Description

@wgtmac

The main goal of this issue is to track the progress of file writer implementations.

Background

We already have file format writers as below:

To create a writer, users can call WriterFactory to create one for a specific file format. Users can also register their own writer implementations via `WriterFactoryRegistry.

Tentative Design

Now we need to support various writers for data file, v2 equality and position delete files. The following class interface and hierarchy has learned the art of Java implementation but has been simplified a lot. It is subject to change during the implementation stage.

FileWriter

Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/FileWriter.java

/// \brief Base interface for data file writers.
class ICEBERG_EXPORT FileWriter {
 public:
  virtual ~FileWriter() = default;

  /// \brief Write a batch of records.
  virtual Status Write(ArrowArray* data) = 0;

  /// \brief Get the current number of bytes written.
  virtual Result<int64_t> Length() const = 0;

  /// \brief Close the writer and finalize the file.
  virtual Status Close() = 0;

  /// \brief File metadata for all files produced by the writer.
  struct ICEBERG_EXPORT WriteResult {
    // Usually a writer produces a single data or delete file.
    // Position delete writer may produces multiple file-scoped delete files.
    // In the future, multiple files can be produced if file rolling is supported.
    std::vector<std::shared_ptr<DataFile>> data_files;
  };
  /// \brief Get file metadata for all files produced by this writer.
  virtual Result<WriteResult> Metadata()
};

DataWriter

Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/DataWriter.java

/// \brief Options for creating a DataWriter.
struct ICEBERG_EXPORT DataWriterOptions {
  std::string path;
  std::shared_ptr<Schema> schema;
  std::shared_ptr<PartitionSpec> spec;
  PartitionValues partition;
  FileFormatType format = FileFormatType::kParquet;
  std::shared_ptr<FileIO> io;
  std::optional<int32_t> sort_order_id;
  std::shared_ptr<WriterProperties> properties;
};

/// \brief Writer for Iceberg data files.
class ICEBERG_EXPORT DataWriter : public FileWriter {
 public:
  static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
  ~DataWriter() override;

  Status Write(ArrowArray* data) override;
  Result<int64_t> Length() const override;
  Status Close() override;
  Result<WriteResult> Metadata() override;

 private:
  class Impl;
  std::unique_ptr<Impl> impl_;
  explicit DataWriter(std::unique_ptr<Impl> impl);
};

PositionDeleteWriter

Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java

/// \brief Options for creating a PositionDeleteWriter.
struct ICEBERG_EXPORT PositionDeleteWriterOptions {
  std::string path;
  std::shared_ptr<Schema> schema;
  std::shared_ptr<PartitionSpec> spec;
  PartitionValues partition;
  FileFormatType format = FileFormatType::kParquet;
  std::shared_ptr<FileIO> io;
  std::shared_ptr<Schema> row_schema;  // Optional row data schema
  std::shared_ptr<WriterProperties> properties;
};

/// \brief Writer for Iceberg position delete files.
class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
 public:
  static Result<std::unique_ptr<PositionDeleteWriter>> Make(
      const PositionDeleteWriterOptions& options);
  ~PositionDeleteWriter() override;

  Status Write(ArrowArray* data) override;
  Status WriteDelete(std::string_view file_path, int64_t pos);
  Result<int64_t> Length() const override;
  Status Close() override;
  Result<WriteResult> Metadata() override;

 private:
  class Impl;
  std::unique_ptr<Impl> impl_;
  explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
};

EqualityDeleteWriter

Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java

/// \brief Options for creating an EqualityDeleteWriter.
struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
  std::string path;
  std::shared_ptr<Schema> schema;
  std::shared_ptr<PartitionSpec> spec;
  PartitionValues partition;
  FileFormatType format = FileFormatType::kParquet;
  std::shared_ptr<FileIO> io;
  std::vector<int32_t> equality_field_ids;
  std::optional<int32_t> sort_order_id;
  std::shared_ptr<WriterProperties> properties;
};

/// \brief Writer for Iceberg equality delete files.
class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
 public:
  static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
      const EqualityDeleteWriterOptions& options);
  ~EqualityDeleteWriter() override;

  Status Write(ArrowArray* data) override;
  Result<int64_t> Length() const override;
  Status Close() override;
  Result<WriteResult> Metadata() override;

  const std::vector<int32_t>& equality_field_ids() const;

 private:
  class Impl;
  std::unique_ptr<Impl> impl_;
  explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
};

FileWriterFactory

Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java

/// \brief Factory for creating Iceberg file writers.
class ICEBERG_EXPORT FileWriterFactory {
 public:
  FileWriterFactory(std::shared_ptr<Schema> schema,
                    std::shared_ptr<PartitionSpec> spec,
                    std::shared_ptr<FileIO> io,
                    std::shared_ptr<WriterProperties> properties = nullptr);
  ~FileWriterFactory();

  void SetEqualityDeleteConfig(std::shared_ptr<Schema> eq_delete_schema,
                               std::vector<int32_t> equality_field_ids);
  void SetPositionDeleteRowSchema(std::shared_ptr<Schema> pos_delete_row_schema);

  Result<std::unique_ptr<DataWriter>> NewDataWriter(
      const std::string& path, FileFormatType format,
      const PartitionValues& partition,
      std::optional<int32_t> sort_order_id = std::nullopt);

  Result<std::unique_ptr<PositionDeleteWriter>> NewPositionDeleteWriter(
      const std::string& path, FileFormatType format,
      const PartitionValues& partition);

  Result<std::unique_ptr<EqualityDeleteWriter>> NewEqualityDeleteWriter(
      const std::string& path, FileFormatType format,
      const PartitionValues& partition,
      std::optional<int32_t> sort_order_id = std::nullopt);

 private:
  class Impl;
  std::unique_ptr<Impl> impl_;
};

Tasks

  • Add iceberg/data subdirectory and writer interfaces (iceberg/data/writer.h|cc).
  • Implement data file writer (iceberg/data/data_writer.h|cc).
  • Implement position file writer (iceberg/data/position_delete_writer.h|cc).
  • Implement equality file writer (iceberg/data/equality_delete_writer.h|cc).
  • Implement file writer factory (still use iceberg/data/writer.h|cc).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions