From b4ad2b9b0ff3fdb716dcbd5de244da9e3f7adaa0 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 13 Feb 2026 19:22:02 +0000 Subject: [PATCH 1/3] chore: cleanup after docs review --- bindings/cpp/examples/example.cpp | 15 ++++-- bindings/cpp/include/fluss.hpp | 22 +++++---- bindings/cpp/src/admin.cpp | 45 ++++++++++++++--- bindings/cpp/src/lib.rs | 78 +++++++++++++++++++++--------- bindings/python/example/example.py | 28 ++++++----- bindings/python/fluss/__init__.pyi | 46 ++++++++++++------ bindings/python/src/admin.rs | 54 ++++++--------------- bindings/python/src/config.rs | 14 +++--- bindings/python/src/lib.rs | 52 ++++++++++++++++---- 9 files changed, 230 insertions(+), 124 deletions(-) diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 2c7f5545..2b7f331e 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -286,7 +286,7 @@ int main() { std::unordered_map earliest_offsets; check("list_earliest_offsets", - admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetQuery::Earliest(), + admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetSpec::Earliest(), earliest_offsets)); std::cout << "Earliest offsets:" << std::endl; for (const auto& [bucket_id, offset] : earliest_offsets) { @@ -295,7 +295,7 @@ int main() { std::unordered_map latest_offsets; check("list_latest_offsets", admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::Latest(), latest_offsets)); + fluss::OffsetSpec::Latest(), latest_offsets)); std::cout << "Latest offsets:" << std::endl; for (const auto& [bucket_id, offset] : latest_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; @@ -310,7 +310,7 @@ int main() { std::unordered_map timestamp_offsets; check("list_timestamp_offsets", admin.ListOffsets(table_path, all_bucket_ids, - fluss::OffsetQuery::FromTimestamp(timestamp_ms), timestamp_offsets)); + fluss::OffsetSpec::Timestamp(timestamp_ms), timestamp_offsets)); std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl; for (const auto& [bucket_id, offset] : timestamp_offsets) { std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl; @@ -507,7 +507,7 @@ int main() { admin.CreatePartition(partitioned_table_path, {{"region", "EU"}}, true)); std::cout << "Created partitions: US, EU" << std::endl; - // List partitions + // List all partitions std::vector partition_infos; check("list_partition_infos", admin.ListPartitionInfos(partitioned_table_path, partition_infos)); @@ -516,6 +516,13 @@ int main() { << std::endl; } + // List partitions with partial spec filter + std::vector us_partition_infos; + check("list_partition_infos_with_spec", + admin.ListPartitionInfos(partitioned_table_path, {{"region", "US"}}, us_partition_infos)); + std::cout << " Filtered (region=US): " << us_partition_infos.size() << " partition(s)" + << std::endl; + // Write data to partitioned table fluss::Table partitioned_table; check("get_partitioned_table", conn.GetTable(partitioned_table_path, partitioned_table)); diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index dd298823..30a8636b 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -305,19 +305,19 @@ enum class DatumType { constexpr int64_t EARLIEST_OFFSET = -2; -enum class OffsetSpec { +enum class OffsetType { Earliest = 0, Latest = 1, Timestamp = 2, }; -struct OffsetQuery { - OffsetSpec spec; +struct OffsetSpec { + OffsetType type; int64_t timestamp{0}; - static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; } - static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; } - static OffsetQuery FromTimestamp(int64_t ts) { return {OffsetSpec::Timestamp, ts}; } + static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; } + static OffsetSpec Latest() { return {OffsetType::Latest, 0}; } + static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp, ts}; } }; struct Result { @@ -1000,15 +1000,19 @@ class Admin { Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out); Result ListOffsets(const TablePath& table_path, const std::vector& bucket_ids, - const OffsetQuery& offset_query, std::unordered_map& out); + const OffsetSpec& offset_spec, std::unordered_map& out); Result ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name, const std::vector& bucket_ids, - const OffsetQuery& offset_query, + const OffsetSpec& offset_spec, std::unordered_map& out); Result ListPartitionInfos(const TablePath& table_path, std::vector& out); + Result ListPartitionInfos(const TablePath& table_path, + const std::unordered_map& partition_spec, + std::vector& out); + Result CreatePartition(const TablePath& table_path, const std::unordered_map& partition_spec, bool ignore_if_exists = false); @@ -1035,7 +1039,7 @@ class Admin { private: Result DoListOffsets(const TablePath& table_path, const std::vector& bucket_ids, - const OffsetQuery& offset_query, std::unordered_map& out, + const OffsetSpec& offset_spec, std::unordered_map& out, const std::string* partition_name = nullptr); friend class Connection; diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index 0fb15b27..8deb182d 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -107,7 +107,7 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o // function for common list offsets functionality Result Admin::DoListOffsets(const TablePath& table_path, const std::vector& bucket_ids, - const OffsetQuery& offset_query, + const OffsetSpec& offset_spec, std::unordered_map& out, const std::string* partition_name) { if (!Available()) { @@ -122,8 +122,8 @@ Result Admin::DoListOffsets(const TablePath& table_path, const std::vector(offset_query.spec); - ffi_query.timestamp = offset_query.timestamp; + ffi_query.offset_type = static_cast(offset_spec.type); + ffi_query.timestamp = offset_spec.timestamp; ffi::FfiListOffsetsResult ffi_result; if (partition_name != nullptr) { @@ -145,16 +145,16 @@ Result Admin::DoListOffsets(const TablePath& table_path, const std::vector& bucket_ids, - const OffsetQuery& offset_query, + const OffsetSpec& offset_spec, std::unordered_map& out) { - return DoListOffsets(table_path, bucket_ids, offset_query, out); + return DoListOffsets(table_path, bucket_ids, offset_spec, out); } Result Admin::ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name, const std::vector& bucket_ids, - const OffsetQuery& offset_query, + const OffsetSpec& offset_spec, std::unordered_map& out) { - return DoListOffsets(table_path, bucket_ids, offset_query, out, &partition_name); + return DoListOffsets(table_path, bucket_ids, offset_spec, out, &partition_name); } Result Admin::ListPartitionInfos(const TablePath& table_path, std::vector& out) { @@ -177,6 +177,37 @@ Result Admin::ListPartitionInfos(const TablePath& table_path, std::vector& partition_spec, + std::vector& out) { + if (!Available()) { + return utils::make_client_error("Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + + rust::Vec rust_spec; + for (const auto& [key, value] : partition_spec) { + ffi::FfiPartitionKeyValue kv; + kv.key = rust::String(key); + kv.value = rust::String(value); + rust_spec.push_back(std::move(kv)); + } + + auto ffi_result = admin_->list_partition_infos_with_spec(ffi_path, std::move(rust_spec)); + + auto result = utils::from_ffi_result(ffi_result.result); + if (result.Ok()) { + out.clear(); + out.reserve(ffi_result.partition_infos.size()); + for (const auto& pi : ffi_result.partition_infos) { + out.push_back({pi.partition_id, std::string(pi.partition_name)}); + } + } + + return result; +} + Result Admin::CreatePartition(const TablePath& table_path, const std::unordered_map& partition_spec, bool ignore_if_exists) { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 8a5bdfdd..fab8edff 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -302,6 +302,11 @@ mod ffi { self: &Admin, table_path: &FfiTablePath, ) -> FfiListPartitionInfosResult; + fn list_partition_infos_with_spec( + self: &Admin, + table_path: &FfiTablePath, + partition_spec: Vec, + ) -> FfiListPartitionInfosResult; fn create_partition( self: &Admin, table_path: &FfiTablePath, @@ -735,30 +740,20 @@ impl Admin { &self, table_path: &ffi::FfiTablePath, ) -> ffi::FfiListPartitionInfosResult { - let path = fcore::metadata::TablePath::new( - table_path.database_name.clone(), - table_path.table_name.clone(), - ); - let result = RUNTIME.block_on(async { self.inner.list_partition_infos(&path).await }); - match result { - Ok(infos) => { - let partition_infos: Vec = infos - .into_iter() - .map(|info| ffi::FfiPartitionInfo { - partition_id: info.get_partition_id(), - partition_name: info.get_partition_name(), - }) - .collect(); - ffi::FfiListPartitionInfosResult { - result: ok_result(), - partition_infos, - } - } - Err(e) => ffi::FfiListPartitionInfosResult { - result: err_from_core_error(&e), - partition_infos: vec![], - }, - } + self.do_list_partition_infos(table_path, None) + } + + fn list_partition_infos_with_spec( + &self, + table_path: &ffi::FfiTablePath, + partition_spec: Vec, + ) -> ffi::FfiListPartitionInfosResult { + let spec_map: std::collections::HashMap = partition_spec + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect(); + let spec = fcore::metadata::PartitionSpec::new(spec_map); + self.do_list_partition_infos(table_path, Some(&spec)) } fn create_partition( &self, @@ -939,6 +934,41 @@ impl Admin { }, } } + + fn do_list_partition_infos( + &self, + table_path: &ffi::FfiTablePath, + partial_partition_spec: Option<&fcore::metadata::PartitionSpec>, + ) -> ffi::FfiListPartitionInfosResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + let result = RUNTIME.block_on(async { + self.inner + .list_partition_infos_with_spec(&path, partial_partition_spec) + .await + }); + match result { + Ok(infos) => { + let partition_infos: Vec = infos + .into_iter() + .map(|info| ffi::FfiPartitionInfo { + partition_id: info.get_partition_id(), + partition_name: info.get_partition_name(), + }) + .collect(); + ffi::FfiListPartitionInfosResult { + result: ok_result(), + partition_infos, + } + } + Err(e) => ffi::FfiListPartitionInfosResult { + result: err_from_core_error(&e), + partition_infos: vec![], + }, + } + } } // Table implementation diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 4ea3bd6a..6ed8d040 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -30,12 +30,12 @@ async def main(): # Create connection configuration config_spec = { - "bootstrap.servers": "127.0.0.1:9123", + "bootstrap_servers": "127.0.0.1:9123", # Add other configuration options as needed - "writer.request-max-size": "10485760", # 10 MB - "writer.acks": "all", # Wait for all replicas to acknowledge - "writer.retries": "3", # Retry up to 3 times on failure - "writer.batch-size": "1000", # Batch size for writes + "writer_request_max_size": "10485760", # 10 MB + "writer_acks": "all", # Wait for all replicas to acknowledge + "writer_retries": "3", # Retry up to 3 times on failure + "writer_batch_size": "1000", # Batch size for writes } config = fluss.Config(config_spec) @@ -90,11 +90,11 @@ async def main(): # Demo: List offsets print("\n--- Testing list_offsets() ---") try: - # Query latest offsets using OffsetType constant (recommended for type safety) + # Query latest offsets using OffsetSpec factory method offsets = await admin.list_offsets( table_path, bucket_ids=[0], - offset_type=fluss.OffsetType.LATEST + offset_spec=fluss.OffsetSpec.latest() ) print(f"Latest offsets for table (before writes): {offsets}") except Exception as e: @@ -248,11 +248,10 @@ async def main(): # Demo: Check offsets after writes print("\n--- Checking offsets after writes ---") try: - # Query with string constant (alternative API - both strings and constants are supported) offsets = await admin.list_offsets( table_path, bucket_ids=[0], - offset_type="latest" # Can also use "earliest" or "timestamp" + offset_spec=fluss.OffsetSpec.latest() ) print(f"Latest offsets after writing 7 records: {offsets}") except Exception as e: @@ -734,6 +733,13 @@ async def main(): await partitioned_writer.flush() print("\nWrote 4 records (2 to US, 2 to EU)") + # Demo: list_partition_infos with partial spec filter + print("\n--- Testing list_partition_infos with spec ---") + us_partitions = await admin.list_partition_infos( + partitioned_table_path, partition_spec={"region": "US"} + ) + print(f"Filtered partitions (region=US): {us_partitions}") + # Demo: list_partition_offsets print("\n--- Testing list_partition_offsets ---") @@ -743,7 +749,7 @@ async def main(): partitioned_table_path, partition_name="US", bucket_ids=[0], - offset_type="latest" + offset_spec=fluss.OffsetSpec.latest() ) print(f"US partition latest offsets: {us_offsets}") @@ -752,7 +758,7 @@ async def main(): partitioned_table_path, partition_name="EU", bucket_ids=[0], - offset_type="latest" + offset_spec=fluss.OffsetSpec.latest() ) print(f"EU partition latest offsets: {eu_offsets}") diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index daccca85..47eeb808 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -193,16 +193,15 @@ class FlussAdmin: self, table_path: TablePath, bucket_ids: List[int], - offset_type: str, - timestamp: Optional[int] = None, + offset_spec: "OffsetSpec", ) -> Dict[int, int]: """List offsets for the specified buckets. Args: table_path: Path to the table bucket_ids: List of bucket IDs to query - offset_type: "earliest", "latest", or "timestamp" - timestamp: Required when offset_type is "timestamp" + offset_spec: Offset specification (OffsetSpec.earliest(), OffsetSpec.latest(), + or OffsetSpec.timestamp(ts)) Returns: Dict mapping bucket_id -> offset @@ -213,8 +212,7 @@ class FlussAdmin: table_path: TablePath, partition_name: str, bucket_ids: List[int], - offset_type: str, - timestamp: Optional[int] = None, + offset_spec: "OffsetSpec", ) -> Dict[int, int]: """List offsets for buckets in a specific partition. @@ -222,8 +220,8 @@ class FlussAdmin: table_path: Path to the table partition_name: Partition value (e.g., "US" not "region=US") bucket_ids: List of bucket IDs to query - offset_type: "earliest", "latest", or "timestamp" - timestamp: Required when offset_type is "timestamp" + offset_spec: Offset specification (OffsetSpec.earliest(), OffsetSpec.latest(), + or OffsetSpec.timestamp(ts)) Returns: Dict mapping bucket_id -> offset @@ -246,11 +244,15 @@ class FlussAdmin: async def list_partition_infos( self, table_path: TablePath, + partition_spec: Optional[Dict[str, str]] = None, ) -> List["PartitionInfo"]: - """List all partitions for a partitioned table. + """List partitions for a partitioned table. Args: table_path: Path to the table + partition_spec: Optional partial partition spec to filter results. + Dict mapping partition column name to value (e.g., {"region": "US"}). + If None, returns all partitions. Returns: List of PartitionInfo objects @@ -839,12 +841,28 @@ class ErrorCode: INVALID_ALTER_TABLE_EXCEPTION: int DELETION_DISABLED_EXCEPTION: int -class OffsetType: - """Offset type constants for list_offsets().""" +class OffsetSpec: + """Offset specification for list_offsets(), matching Java's OffsetSpec. - EARLIEST: str - LATEST: str - TIMESTAMP: str + Use factory methods to create instances: + OffsetSpec.earliest() + OffsetSpec.latest() + OffsetSpec.timestamp(ts) + """ + + @staticmethod + def earliest() -> "OffsetSpec": + """Create an OffsetSpec for the earliest available offset.""" + ... + @staticmethod + def latest() -> "OffsetSpec": + """Create an OffsetSpec for the latest available offset.""" + ... + @staticmethod + def timestamp(ts: int) -> "OffsetSpec": + """Create an OffsetSpec for the offset at or after the given timestamp.""" + ... + def __repr__(self) -> str: ... # Constant for earliest offset (-2) EARLIEST_OFFSET: int diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index d03ce7a2..9a96bea1 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -16,7 +16,6 @@ // under the License. use crate::*; -use fcore::rpc::message::OffsetSpec; use pyo3::conversion::IntoPyObject; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; @@ -27,23 +26,6 @@ pub struct FlussAdmin { __admin: Arc, } -/// Parse offset_type string into OffsetSpec -fn parse_offset_spec(offset_type: &str, timestamp: Option) -> PyResult { - match offset_type { - s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest), - s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest), - s if s.eq_ignore_ascii_case("timestamp") => { - let ts = timestamp.ok_or_else(|| { - FlussError::new_err("timestamp must be provided when offset_type='timestamp'") - })?; - Ok(OffsetSpec::Timestamp(ts)) - } - _ => Err(FlussError::new_err(format!( - "Invalid offset_type: '{offset_type}'. Must be 'earliest', 'latest', or 'timestamp'" - ))), - } -} - /// Validate bucket IDs are non-negative fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> { for &bucket_id in bucket_ids { @@ -374,25 +356,20 @@ impl FlussAdmin { /// Args: /// table_path: Path to the table /// bucket_ids: List of bucket IDs to query - /// offset_type: Type of offset to retrieve: - /// - "earliest" or OffsetType.EARLIEST: Start of the log - /// - "latest" or OffsetType.LATEST: End of the log - /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg) - /// timestamp: Required when offset_type is "timestamp", ignored otherwise + /// offset_spec: Offset specification (OffsetSpec.earliest(), OffsetSpec.latest(), + /// or OffsetSpec.timestamp(ts)) /// /// Returns: /// dict[int, int]: Mapping of bucket_id -> offset - #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))] pub fn list_offsets<'py>( &self, py: Python<'py>, table_path: &TablePath, bucket_ids: Vec, - offset_type: &str, - timestamp: Option, + offset_spec: &OffsetSpec, ) -> PyResult> { validate_bucket_ids(&bucket_ids)?; - let offset_spec = parse_offset_spec(offset_type, timestamp)?; + let offset_spec = offset_spec.inner.clone(); let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); @@ -419,26 +396,21 @@ impl FlussAdmin { /// table_path: Path to the table /// partition_name: Partition value (e.g., "US" not "region=US") /// bucket_ids: List of bucket IDs to query - /// offset_type: Type of offset to retrieve: - /// - "earliest" or OffsetType.EARLIEST: Start of the log - /// - "latest" or OffsetType.LATEST: End of the log - /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg) - /// timestamp: Required when offset_type is "timestamp", ignored otherwise + /// offset_spec: Offset specification (OffsetSpec.earliest(), OffsetSpec.latest(), + /// or OffsetSpec.timestamp(ts)) /// /// Returns: /// dict[int, int]: Mapping of bucket_id -> offset - #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type, timestamp=None))] pub fn list_partition_offsets<'py>( &self, py: Python<'py>, table_path: &TablePath, partition_name: &str, bucket_ids: Vec, - offset_type: &str, - timestamp: Option, + offset_spec: &OffsetSpec, ) -> PyResult> { validate_bucket_ids(&bucket_ids)?; - let offset_spec = parse_offset_spec(offset_type, timestamp)?; + let offset_spec = offset_spec.inner.clone(); let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); @@ -493,24 +465,30 @@ impl FlussAdmin { }) } - /// List all partitions for a partitioned table. + /// List partitions for a partitioned table. /// /// Args: /// table_path: Path to the table + /// partition_spec: Optional partial partition spec to filter results. + /// Dict mapping partition column name to value (e.g., {"region": "US"}). + /// If None, returns all partitions. /// /// Returns: /// List[PartitionInfo]: List of partition info objects + #[pyo3(signature = (table_path, partition_spec=None))] pub fn list_partition_infos<'py>( &self, py: Python<'py>, table_path: &TablePath, + partition_spec: Option>, ) -> PyResult> { let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); + let core_partition_spec = partition_spec.map(fcore::metadata::PartitionSpec::new); future_into_py(py, async move { let partition_infos = admin - .list_partition_infos(&core_table_path) + .list_partition_infos_with_spec(&core_table_path, core_partition_spec.as_ref()) .await .map_err(|e| FlussError::new_err(format!("Failed to list partitions: {e}")))?; diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 237ab6fa..6ebeab23 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -39,33 +39,33 @@ impl Config { let value: String = item.1.extract()?; match key.as_str() { - "bootstrap.servers" => { + "bootstrap_servers" => { config.bootstrap_servers = value; } - "writer.request-max-size" => { + "writer_request_max_size" => { if let Ok(size) = value.parse::() { config.writer_request_max_size = size; } } - "writer.acks" => { + "writer_acks" => { config.writer_acks = value; } - "writer.retries" => { + "writer_retries" => { if let Ok(retries) = value.parse::() { config.writer_retries = retries; } } - "writer.batch-size" => { + "writer_batch_size" => { if let Ok(size) = value.parse::() { config.writer_batch_size = size; } } - "scanner.remote-log.prefetch-num" => { + "scanner_remote_log_prefetch_num" => { if let Ok(num) = value.parse::() { config.scanner_remote_log_prefetch_num = num; } } - "remote-file.download-thread-num" => { + "remote_file_download_thread_num" => { if let Ok(num) = value.parse::() { config.remote_file_download_thread_num = num; } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 801db2c8..553c8a92 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -50,21 +50,53 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to create Tokio runtime") }); -/// Offset type constants for list_offsets() +/// Offset specification for list_offsets(), matching Java's OffsetSpec. +/// +/// Use factory methods to create instances: +/// OffsetSpec.earliest() +/// OffsetSpec.latest() +/// OffsetSpec.timestamp(ts) #[pyclass] #[derive(Clone)] -pub struct OffsetType; +pub struct OffsetSpec { + pub(crate) inner: fcore::rpc::message::OffsetSpec, +} #[pymethods] -impl OffsetType { - #[classattr] - const EARLIEST: &'static str = "earliest"; +impl OffsetSpec { + /// Create an OffsetSpec for the earliest available offset. + #[staticmethod] + fn earliest() -> Self { + Self { + inner: fcore::rpc::message::OffsetSpec::Earliest, + } + } + + /// Create an OffsetSpec for the latest available offset. + #[staticmethod] + fn latest() -> Self { + Self { + inner: fcore::rpc::message::OffsetSpec::Latest, + } + } - #[classattr] - const LATEST: &'static str = "latest"; + /// Create an OffsetSpec for the offset at or after the given timestamp. + #[staticmethod] + fn timestamp(ts: i64) -> Self { + Self { + inner: fcore::rpc::message::OffsetSpec::Timestamp(ts), + } + } - #[classattr] - const TIMESTAMP: &'static str = "timestamp"; + fn __repr__(&self) -> String { + match &self.inner { + fcore::rpc::message::OffsetSpec::Earliest => "OffsetSpec.earliest()".to_string(), + fcore::rpc::message::OffsetSpec::Latest => "OffsetSpec.latest()".to_string(), + fcore::rpc::message::OffsetSpec::Timestamp(ts) => { + format!("OffsetSpec.timestamp({ts})") + } + } + } } #[pymodule] @@ -92,7 +124,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From e7710d03774beb34e7dcfd2c38270b46200bbc2a Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 13 Feb 2026 21:53:55 +0000 Subject: [PATCH 2/3] address config py comments --- bindings/python/src/config.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 6ebeab23..7dc3ae9b 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -39,33 +39,35 @@ impl Config { let value: String = item.1.extract()?; match key.as_str() { - "bootstrap_servers" => { + "bootstrap_servers" | "bootstrap.servers" => { config.bootstrap_servers = value; } - "writer_request_max_size" => { + "writer_request_max_size" | "client.writer.request-max-size" => { if let Ok(size) = value.parse::() { config.writer_request_max_size = size; } } - "writer_acks" => { + "writer_acks" | "client.writer.acks" => { config.writer_acks = value; } - "writer_retries" => { + "writer_retries" | "client.writer.retries" => { if let Ok(retries) = value.parse::() { config.writer_retries = retries; } } - "writer_batch_size" => { + "writer_batch_size" | "client.writer.batch-size" => { if let Ok(size) = value.parse::() { config.writer_batch_size = size; } } - "scanner_remote_log_prefetch_num" => { + "scanner_remote_log_prefetch_num" + | "client.scanner.remote-log.prefetch-num" => { if let Ok(num) = value.parse::() { config.scanner_remote_log_prefetch_num = num; } } - "remote_file_download_thread_num" => { + "remote_file_download_thread_num" + | "client.remote-file.download-thread-num" => { if let Ok(num) = value.parse::() { config.remote_file_download_thread_num = num; } From c8cc45b061f33f71c58f8574748f5d432a3dd666 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 15 Feb 2026 16:42:15 +0000 Subject: [PATCH 3/3] Address commments 2 --- bindings/python/example/example.py | 10 +++++----- bindings/python/src/config.rs | 16 +++++++--------- website/docs/user-guide/cpp/api-reference.md | 14 +++++++------- website/docs/user-guide/cpp/data-types.md | 2 +- .../user-guide/cpp/example/admin-operations.md | 8 ++++---- .../cpp/example/partitioned-tables.md | 2 +- website/docs/user-guide/python/api-reference.md | 17 +++++++++++------ .../python/example/admin-operations.md | 6 +++--- .../user-guide/python/example/configuration.md | 4 ++-- .../user-guide/python/example/log-tables.md | 2 +- 10 files changed, 42 insertions(+), 39 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 6ed8d040..9c2b7e30 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -30,12 +30,12 @@ async def main(): # Create connection configuration config_spec = { - "bootstrap_servers": "127.0.0.1:9123", + "bootstrap.servers": "127.0.0.1:9123", # Add other configuration options as needed - "writer_request_max_size": "10485760", # 10 MB - "writer_acks": "all", # Wait for all replicas to acknowledge - "writer_retries": "3", # Retry up to 3 times on failure - "writer_batch_size": "1000", # Batch size for writes + "writer.request-max-size": "10485760", # 10 MB + "writer.acks": "all", # Wait for all replicas to acknowledge + "writer.retries": "3", # Retry up to 3 times on failure + "writer.batch-size": "1000", # Batch size for writes } config = fluss.Config(config_spec) diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 7dc3ae9b..237ab6fa 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -39,35 +39,33 @@ impl Config { let value: String = item.1.extract()?; match key.as_str() { - "bootstrap_servers" | "bootstrap.servers" => { + "bootstrap.servers" => { config.bootstrap_servers = value; } - "writer_request_max_size" | "client.writer.request-max-size" => { + "writer.request-max-size" => { if let Ok(size) = value.parse::() { config.writer_request_max_size = size; } } - "writer_acks" | "client.writer.acks" => { + "writer.acks" => { config.writer_acks = value; } - "writer_retries" | "client.writer.retries" => { + "writer.retries" => { if let Ok(retries) = value.parse::() { config.writer_retries = retries; } } - "writer_batch_size" | "client.writer.batch-size" => { + "writer.batch-size" => { if let Ok(size) = value.parse::() { config.writer_batch_size = size; } } - "scanner_remote_log_prefetch_num" - | "client.scanner.remote-log.prefetch-num" => { + "scanner.remote-log.prefetch-num" => { if let Ok(num) = value.parse::() { config.scanner_remote_log_prefetch_num = num; } } - "remote_file_download_thread_num" - | "client.remote-file.download-thread-num" => { + "remote-file.download-thread-num" => { if let Ok(num) = value.parse::() { config.remote_file_download_thread_num = num; } diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index 00ff8086..47c9307d 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -68,8 +68,8 @@ Complete API reference for the Fluss C++ client. | Method | Description | |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------| -| `ListOffsets(const TablePath& path, const std::vector& bucket_ids, const OffsetQuery& query, std::unordered_map& out) -> Result` | Get offsets for buckets | -| `ListPartitionOffsets(const TablePath& path, const std::string& partition_name, const std::vector& bucket_ids, const OffsetQuery& query, std::unordered_map& out) -> Result` | Get offsets for a partition's buckets | +| `ListOffsets(const TablePath& path, const std::vector& bucket_ids, const OffsetSpec& query, std::unordered_map& out) -> Result` | Get offsets for buckets | +| `ListPartitionOffsets(const TablePath& path, const std::string& partition_name, const std::vector& bucket_ids, const OffsetSpec& query, std::unordered_map& out) -> Result` | Get offsets for a partition's buckets | ### Lake Operations @@ -423,13 +423,13 @@ When using `table.NewRow()`, the `Set()` method auto-routes to the correct type | `bucket_id` | `int32_t` | Bucket ID | | `offset` | `int64_t` | Offset value | -## `OffsetQuery` +## `OffsetSpec` | Method | Description | |----------------------------------------------------|-----------------------------------------| -| `OffsetQuery::Earliest()` | Query for the earliest available offset | -| `OffsetQuery::Latest()` | Query for the latest offset | -| `OffsetQuery::FromTimestamp(int64_t timestamp_ms)` | Query offset at a specific timestamp | +| `OffsetSpec::Earliest()` | Query for the earliest available offset | +| `OffsetSpec::Latest()` | Query for the latest offset | +| `OffsetSpec::Timestamp(int64_t timestamp_ms)` | Query offset at a specific timestamp | ## Constants @@ -441,7 +441,7 @@ To start reading from the latest offset (only new records), resolve the current ```cpp std::unordered_map offsets; -admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets); +admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets); scanner.Subscribe(0, offsets[0]); ``` diff --git a/website/docs/user-guide/cpp/data-types.md b/website/docs/user-guide/cpp/data-types.md index 11712fac..fb01ac28 100644 --- a/website/docs/user-guide/cpp/data-types.md +++ b/website/docs/user-guide/cpp/data-types.md @@ -105,6 +105,6 @@ To start reading from the latest offset, resolve the current offset via `ListOff ```cpp std::unordered_map offsets; -admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets); +admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets); scanner.Subscribe(0, offsets[0]); ``` diff --git a/website/docs/user-guide/cpp/example/admin-operations.md b/website/docs/user-guide/cpp/example/admin-operations.md index 1a330126..850660ea 100644 --- a/website/docs/user-guide/cpp/example/admin-operations.md +++ b/website/docs/user-guide/cpp/example/admin-operations.md @@ -120,23 +120,23 @@ std::vector bucket_ids = {0, 1, 2}; // Query earliest offsets std::unordered_map earliest_offsets; admin.ListOffsets(table_path, bucket_ids, - fluss::OffsetQuery::Earliest(), earliest_offsets); + fluss::OffsetSpec::Earliest(), earliest_offsets); // Query latest offsets std::unordered_map latest_offsets; admin.ListOffsets(table_path, bucket_ids, - fluss::OffsetQuery::Latest(), latest_offsets); + fluss::OffsetSpec::Latest(), latest_offsets); // Query offsets for a specific timestamp std::unordered_map timestamp_offsets; admin.ListOffsets(table_path, bucket_ids, - fluss::OffsetQuery::FromTimestamp(timestamp_ms), + fluss::OffsetSpec::Timestamp(timestamp_ms), timestamp_offsets); // Query partition offsets std::unordered_map partition_offsets; admin.ListPartitionOffsets(table_path, "partition_name", - bucket_ids, fluss::OffsetQuery::Latest(), + bucket_ids, fluss::OffsetSpec::Latest(), partition_offsets); ``` diff --git a/website/docs/user-guide/cpp/example/partitioned-tables.md b/website/docs/user-guide/cpp/example/partitioned-tables.md index 6a6927f5..371ee3e0 100644 --- a/website/docs/user-guide/cpp/example/partitioned-tables.md +++ b/website/docs/user-guide/cpp/example/partitioned-tables.md @@ -103,7 +103,7 @@ admin.ListPartitionInfos(table_path, partition_infos); std::vector bucket_ids = {0, 1, 2}; std::unordered_map offsets; admin.ListPartitionOffsets(table_path, "2024-01-15$US", - bucket_ids, fluss::OffsetQuery::Latest(), offsets); + bucket_ids, fluss::OffsetSpec::Latest(), offsets); ``` ## Partitioned Primary Key Tables diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 321e25e6..af03058a 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -43,8 +43,8 @@ Supports `with` statement (context manager). | `await get_table_info(table_path) -> TableInfo` | Get table metadata | | `await list_tables(database_name) -> list[str]` | List tables in a database | | `await table_exists(table_path) -> bool` | Check if a table exists | -| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for buckets | -| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's buckets | +| `await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int]` | Get offsets for buckets | +| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_spec) -> dict[int, int]` | Get offsets for a partition's buckets | | `await create_partition(table_path, partition_spec, ignore_if_exists=False)` | Create a partition | | `await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)` | Drop a partition | | `await list_partition_infos(table_path) -> list[PartitionInfo]` | List partitions | @@ -264,14 +264,19 @@ Raised for all Fluss-specific errors (connection failures, table not found, sche | Constant | Value | Description | |------------------------------|---------------|-----------------------------------------------------| | `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest available offset | -| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` | -| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` | -| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with timestamp | + +## `OffsetSpec` + +| Method | Description | +|-----------------------------|--------------------------------------------------| +| `OffsetSpec.earliest()` | Earliest available offset | +| `OffsetSpec.latest()` | Latest offset | +| `OffsetSpec.timestamp(ts)` | Offset at or after the given timestamp (millis) | To start reading from the latest offset (only new records), resolve the current offset via `list_offsets` before subscribing: ```python -offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST) +offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest()) scanner.subscribe(bucket_id=0, start_offset=offsets[0]) ``` diff --git a/website/docs/user-guide/python/example/admin-operations.md b/website/docs/user-guide/python/example/admin-operations.md index 8c62ee78..4561a3fa 100644 --- a/website/docs/user-guide/python/example/admin-operations.md +++ b/website/docs/user-guide/python/example/admin-operations.md @@ -56,13 +56,13 @@ await admin.drop_table(table_path, ignore_if_not_exists=True) ```python # Latest offsets for buckets -offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") +offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_spec=fluss.OffsetSpec.latest()) # By timestamp -offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) +offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.timestamp(1704067200000)) # Per-partition offsets -offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") +offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest()) ``` ## Lake Snapshot diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index c4ef4f3c..9686fc62 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -24,10 +24,10 @@ with await fluss.FlussConnection.create(config) as conn: | Key | Description | Default | |---------------------|-------------------------------------------------------|--------------------| | `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | -| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | | `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | | `writer.retries` | Number of retries on failure | `2147483647` | -| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | +| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | Remember to close the connection when done: diff --git a/website/docs/user-guide/python/example/log-tables.md b/website/docs/user-guide/python/example/log-tables.md index 63903a4e..6e44e061 100644 --- a/website/docs/user-guide/python/example/log-tables.md +++ b/website/docs/user-guide/python/example/log-tables.md @@ -106,7 +106,7 @@ To only consume new records (skip existing data), first resolve the current late ```python admin = await conn.get_admin() -offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST) +offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest()) latest = offsets[0] scanner = await table.new_scan().create_record_batch_log_scanner()