Skip to content

Commit 62960f3

Browse files
committed
feat: add schema update to table metadata builder
1 parent b8edbdf commit 62960f3

10 files changed

Lines changed: 528 additions & 43 deletions

src/iceberg/partition_spec.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,42 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields)
155155
return {};
156156
}
157157

158+
Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
159+
std::unordered_set<std::string> partition_names;
160+
for (const auto& partition_field : fields_) {
161+
auto name = std::string(partition_field.name());
162+
if (name.empty()) {
163+
return InvalidArgument("Cannot use empty partition name: {}", name);
164+
}
165+
if (partition_names.contains(name)) {
166+
return InvalidArgument("Cannot use partition name more than once: {}", name);
167+
}
168+
partition_names.insert(name);
169+
170+
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
171+
auto transform_type = partition_field.transform()->transform_type();
172+
if (transform_type == TransformType::kIdentity) {
173+
// for identity transform case we allow conflicts between partition and schema field
174+
// name as long as they are sourced from the same schema field
175+
if (schema_field.has_value() &&
176+
schema_field.value().get().field_id() != partition_field.source_id()) {
177+
return InvalidArgument(
178+
"Cannot create identity partition sourced from different field in schema: {}",
179+
name);
180+
}
181+
} else {
182+
// for all other transforms we don't allow conflicts between partition name and
183+
// schema field name
184+
if (schema_field.has_value()) {
185+
return InvalidArgument(
186+
"Cannot create partition from name that exists in schema: {}", name);
187+
}
188+
}
189+
}
190+
191+
return {};
192+
}
193+
158194
Result<std::vector<std::reference_wrapper<const PartitionField>>>
159195
PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
160196
ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this));

src/iceberg/partition_spec.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
7979
/// \return Error status if the partition spec is invalid.
8080
Status Validate(const Schema& schema, bool allow_missing_fields) const;
8181

82+
// \brief Validates the partition field names are unique within the partition spec and
83+
// schema.
84+
Status ValidatePartitionName(const Schema& schema) const;
85+
8286
/// \brief Get the partition fields by source ID.
8387
/// \param source_id The id of the source field.
8488
/// \return The partition fields by source ID, or NotFound if the source field is not

src/iceberg/schema.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/util/macros.h"
3131
#include "iceberg/util/type_util.h"
3232
#include "iceberg/util/visit_type.h"
33+
#include "table_metadata.h"
3334

3435
namespace iceberg {
3536

@@ -228,4 +229,43 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
228229
return names;
229230
}
230231

232+
Result<int32_t> Schema::HighestFieldId() const {
233+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
234+
235+
if (id_to_field.get().empty()) {
236+
return kInitialColumnId;
237+
}
238+
239+
auto max_it = std::ranges::max_element(
240+
id_to_field.get(),
241+
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
242+
243+
return max_it->first;
244+
}
245+
246+
bool Schema::SameSchema(const Schema& other) const { return fields_ == other.fields_; }
247+
248+
Status Schema::Validate(int32_t format_version) const {
249+
// Get all fields including nested ones
250+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
251+
252+
// Check each field's type and defaults
253+
for (const auto& [field_id, field_ref] : id_to_field.get()) {
254+
const auto& field = field_ref.get();
255+
256+
// Check if the field's type requires a minimum format version
257+
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());
258+
it != TableMetadata::kMinFormatVersions.end()) {
259+
if (int32_t min_format_version = it->second; format_version < min_format_version) {
260+
return InvalidSchema("Invalid type for {}: {} is not supported until v{}",
261+
field.name(), *field.type(), min_format_version);
262+
}
263+
}
264+
265+
// TODO(GuoTao.yu): Check default values when they are supported
266+
}
267+
268+
return {};
269+
}
270+
231271
} // namespace iceberg

src/iceberg/schema.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace iceberg {
4646
class ICEBERG_EXPORT Schema : public StructType {
4747
public:
4848
static constexpr int32_t kInitialSchemaId = 0;
49+
static constexpr int32_t kInitialColumnId = 0;
4950
static constexpr int32_t kInvalidColumnId = -1;
5051

5152
explicit Schema(std::vector<SchemaField> fields,
@@ -121,12 +122,30 @@ class ICEBERG_EXPORT Schema : public StructType {
121122
Result<std::unique_ptr<Schema>> Project(
122123
const std::unordered_set<int32_t>& field_ids) const;
123124

125+
124126
/// \brief Return the field IDs of the identifier fields.
125127
const std::vector<int32_t>& IdentifierFieldIds() const;
126128

127129
/// \brief Return the canonical field names of the identifier fields.
128130
Result<std::vector<std::string>> IdentifierFieldNames() const;
129131

132+
/// \brief Get the highest field ID in the schema.
133+
/// \return The highest field ID.
134+
Result<int32_t> HighestFieldId() const;
135+
136+
/// \brief Checks whether this schema is equivalent to another schema while ignoring the
137+
/// schema id.
138+
bool SameSchema(const Schema& other) const;
139+
140+
/// \brief Validate the schema for a given format version.
141+
///
142+
/// This validates that the schema does not contain types that were released in later
143+
/// format versions.
144+
///
145+
/// \param format_version The format version to validate against.
146+
/// \return Error status if the schema is invalid.
147+
Status Validate(int32_t format_version) const;
148+
130149
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
131150

132151
private:

0 commit comments

Comments
 (0)