Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 184 additions & 6 deletions paimon-python/pypaimon/tests/reader_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,84 @@ def test_primary_key_value_stats_excludes_system_fields(self):
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system field: {field_name}")

def test_value_stats_empty_when_stats_disabled(self):
catalog = CatalogFactory.create({
"warehouse": self.warehouse
})
catalog.create_database("test_db_stats_disabled", True)

pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('price', pa.float64()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
primary_keys=['id'],
options={'metadata.stats-mode': 'none', 'bucket': '2'} # Stats disabled
)
catalog.create_table("test_db_stats_disabled.test_stats_disabled", schema, False)
table = catalog.get_table("test_db_stats_disabled.test_stats_disabled")

test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'price': [10.5, 20.3, 30.7],
}, schema=pa_schema)

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.starting_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
lambda row: table_scan.starting_scanner._filter_manifest_entry(row),
False
)

self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry")
file_meta = manifest_entries[0].file

self.assertEqual(
file_meta.value_stats_cols, [],
"value_stats_cols should be empty list [] when stats are disabled"
)

self.assertEqual(
file_meta.value_stats.min_values.arity, 0,
"value_stats.min_values should be empty (arity=0) when stats are disabled"
)
self.assertEqual(
file_meta.value_stats.max_values.arity, 0,
"value_stats.max_values should be empty (arity=0) when stats are disabled"
)
self.assertEqual(
len(file_meta.value_stats.null_counts), 0,
"value_stats.null_counts should be empty when stats are disabled"
)

empty_stats = SimpleStats.empty_stats()
self.assertEqual(
file_meta.value_stats.min_values.arity, len(empty_stats.min_values),
"value_stats.min_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
)
self.assertEqual(
file_meta.value_stats.max_values.arity, len(empty_stats.max_values),
"value_stats.max_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
)
self.assertEqual(
len(file_meta.value_stats.null_counts), len(empty_stats.null_counts),
"value_stats.null_counts should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
)

def test_types(self):
data_fields = [
DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
Expand Down Expand Up @@ -776,20 +854,55 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols,
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)

def _test_append_only_schema_match_case(self, table, pa_schema):
"""Test that for append-only tables, data.schema matches table.fields.
from pypaimon.schema.data_types import PyarrowFieldParser

This verifies the assumption in data_writer.py that for append-only tables,
PyarrowFieldParser.to_paimon_schema(data.schema) should have the same fields
as self.table.fields (same count and same field names).
"""
self.assertFalse(table.is_primary_key_table,
"Table should be append-only (no primary keys)")

test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'price': [10.5, 20.3, 30.7],
'category': ['A', 'B', 'C'],
'category': ['A', 'B', 'C']
}, schema=pa_schema)

data_fields_from_schema = PyarrowFieldParser.to_paimon_schema(test_data.schema)
table_fields = table.fields

self.assertEqual(
len(data_fields_from_schema), len(table_fields),
f"Field count mismatch: data.schema has {len(data_fields_from_schema)} fields, "
f"but table.fields has {len(table_fields)} fields"
)

data_field_names = {field.name for field in data_fields_from_schema}
table_field_names = {field.name for field in table_fields}
self.assertEqual(
data_field_names, table_field_names,
f"Field names mismatch: data.schema has {data_field_names}, "
f"but table.fields has {table_field_names}"
)

def test_primary_key_value_stats(self):
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('price', pa.float64()),
('category', pa.string())
])
schema = Schema.from_pyarrow_schema(
pa_schema,
primary_keys=['id'],
options={'metadata.stats-mode': 'full', 'bucket': '2'}
)
self.catalog.create_table('default.test_pk_value_stats', schema, False)
table = self.catalog.get_table('default.test_pk_value_stats')

test_data = pa.Table.from_pydict({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'price': [10.5, 20.3, 30.7, 40.1, 50.9],
'category': ['A', 'B', 'C', 'D', 'E']
}, schema=pa_schema)

write_builder = table.new_batch_write_builder()
Expand Down Expand Up @@ -831,6 +944,71 @@ def _test_append_only_schema_match_case(self, table, pa_schema):
file_meta = manifest_entries[0].file
self.assertIsNone(file_meta.value_stats_cols,
"value_stats_cols should be None when all table fields are included")
self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry")
file_meta = manifest_entries[0].file

key_stats = file_meta.key_stats
self.assertIsNotNone(key_stats, "key_stats should not be None")
self.assertGreater(key_stats.min_values.arity, 0, "key_stats should contain key fields")
self.assertEqual(key_stats.min_values.arity, 1, "key_stats should contain exactly 1 key field (id)")

value_stats = file_meta.value_stats
self.assertIsNotNone(value_stats, "value_stats should not be None")

if file_meta.value_stats_cols is None:
expected_value_fields = ['name', 'price', 'category']
self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields),
f"value_stats should contain at least {len(expected_value_fields)} value fields")
else:
self.assertNotIn('id', file_meta.value_stats_cols,
"Key field 'id' should NOT be in value_stats_cols")

expected_value_fields = ['name', 'price', 'category']
self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)),
f"value_stats_cols should contain value fields: {expected_value_fields}, "
f"but got: {file_meta.value_stats_cols}")

expected_arity = len(file_meta.value_stats_cols)
self.assertEqual(value_stats.min_values.arity, expected_arity,
f"value_stats should contain {expected_arity} fields (matching value_stats_cols), "
f"but got {value_stats.min_values.arity}")
self.assertEqual(value_stats.max_values.arity, expected_arity,
f"value_stats should contain {expected_arity} fields (matching value_stats_cols), "
f"but got {value_stats.max_values.arity}")
self.assertEqual(len(value_stats.null_counts), expected_arity,
f"value_stats null_counts should have {expected_arity} elements, "
f"but got {len(value_stats.null_counts)}")

self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols),
f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match "
f"value_stats_cols length ({len(file_meta.value_stats_cols)})")

for field_name in file_meta.value_stats_cols:
is_system_field = (field_name.startswith('_KEY_') or
field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'])
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system field: {field_name}")

value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
{'_VALUE_STATS_COLS': file_meta.value_stats_cols},
table.fields
)
min_value_stats = GenericRowDeserializer.from_bytes(
value_stats.min_values.data,
value_stats_fields
).values
max_value_stats = GenericRowDeserializer.from_bytes(
value_stats.max_values.data,
value_stats_fields
).values

self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values")
self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values")

actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits())
self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows")
actual_ids = sorted(actual_data.column('id').to_pylist())
self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be present")

def test_split_target_size(self):
"""Test source.split.target-size configuration effect on split generation."""
Expand Down
19 changes: 3 additions & 16 deletions paimon-python/pypaimon/write/writer/data_blob_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
# Column stats (only for normal columns)
metadata_stats_enabled = self.options.metadata_stats_enabled()
stats_columns = self.normal_columns if metadata_stats_enabled else []
column_stats = {
field.name: self._get_column_stats(data, field.name)
for field in stats_columns
}

min_value_stats = [column_stats[field.name]['min_values'] for field in stats_columns]
max_value_stats = [column_stats[field.name]['max_values'] for field in stats_columns]
value_null_counts = [column_stats[field.name]['null_counts'] for field in stats_columns]
value_stats = self._collect_value_stats(data, stats_columns)

self.sequence_generator.start = self.sequence_generator.current

Expand All @@ -293,14 +286,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
row_count=data.num_rows,
min_key=GenericRow([], []),
max_key=GenericRow([], []),
key_stats=SimpleStats(
GenericRow([], []),
GenericRow([], []),
[]),
value_stats=SimpleStats(
GenericRow(min_value_stats, stats_columns),
GenericRow(max_value_stats, stats_columns),
value_null_counts),
key_stats=SimpleStats.empty_stats(),
value_stats=value_stats,
min_sequence_number=-1,
max_sequence_number=-1,
schema_id=self.table.table_schema.id,
Expand Down
57 changes: 39 additions & 18 deletions paimon-python/pypaimon/write/writer/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,14 @@ def _write_data_to_file(self, data: pa.Table):
field.name: self._get_column_stats(data, field.name)
for field in stats_fields
}
data_fields = stats_fields if value_stats_enabled else []
min_value_stats = [column_stats[field.name]['min_values'] for field in data_fields]
max_value_stats = [column_stats[field.name]['max_values'] for field in data_fields]
value_null_counts = [column_stats[field.name]['null_counts'] for field in data_fields]
key_fields = self.trimmed_primary_keys_fields
min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields]
max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields]
key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields]
if not all(count == 0 for count in key_null_counts):
key_stats = self._collect_value_stats(data, key_fields, column_stats)
if not all(count == 0 for count in key_stats.null_counts):
raise RuntimeError("Primary key should not be null")

value_fields = stats_fields if value_stats_enabled else []
value_stats = self._collect_value_stats(data, value_fields, column_stats)

min_seq = self.sequence_generator.start
max_seq = self.sequence_generator.current
self.sequence_generator.start = self.sequence_generator.current
Expand All @@ -221,16 +218,8 @@ def _write_data_to_file(self, data: pa.Table):
row_count=data.num_rows,
min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
key_stats=SimpleStats(
GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
key_null_counts,
),
value_stats=SimpleStats(
GenericRow(min_value_stats, data_fields),
GenericRow(max_value_stats, data_fields),
value_null_counts,
),
key_stats=key_stats,
value_stats=value_stats,
min_sequence_number=min_seq,
max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
Expand Down Expand Up @@ -278,6 +267,27 @@ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:

return best_split

def _collect_value_stats(self, data: pa.Table, fields: List,
column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats:
if not fields:
return SimpleStats.empty_stats()

if column_stats is None or not column_stats:
column_stats = {
field.name: self._get_column_stats(data, field.name)
for field in fields
}

min_stats = [column_stats[field.name]['min_values'] for field in fields]
max_stats = [column_stats[field.name]['max_values'] for field in fields]
null_counts = [column_stats[field.name]['null_counts'] for field in fields]

return SimpleStats(
GenericRow(min_stats, fields),
GenericRow(max_stats, fields),
null_counts
)

@staticmethod
def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict:
column_array = record_batch.column(column_name)
Expand All @@ -287,6 +297,17 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict:
"max_values": None,
"null_counts": column_array.null_count,
}

column_type = column_array.type
supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type))

if not supports_minmax:
return {
"min_values": None,
"max_values": None,
"null_counts": column_array.null_count,
}

min_values = pc.min(column_array).as_py()
max_values = pc.max(column_array).as_py()
null_counts = column_array.null_count
Expand Down