Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ int main() {

std::unordered_map<int32_t, int64_t> earliest_offsets;
check("list_earliest_offsets",
admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetQuery::Earliest(),
admin.ListOffsets(table_path, all_bucket_ids, fluss::OffsetSpec::Earliest(),
earliest_offsets));
std::cout << "Earliest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : earliest_offsets) {
Expand All @@ -295,7 +295,7 @@ int main() {

std::unordered_map<int32_t, int64_t> latest_offsets;
check("list_latest_offsets", admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Latest(), latest_offsets));
fluss::OffsetSpec::Latest(), latest_offsets));
std::cout << "Latest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : latest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
Expand All @@ -310,7 +310,7 @@ int main() {
std::unordered_map<int32_t, int64_t> timestamp_offsets;
check("list_timestamp_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::FromTimestamp(timestamp_ms), timestamp_offsets));
fluss::OffsetSpec::Timestamp(timestamp_ms), timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" << std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset << std::endl;
Expand Down Expand Up @@ -507,7 +507,7 @@ int main() {
admin.CreatePartition(partitioned_table_path, {{"region", "EU"}}, true));
std::cout << "Created partitions: US, EU" << std::endl;

// List partitions
// List all partitions
std::vector<fluss::PartitionInfo> partition_infos;
check("list_partition_infos",
admin.ListPartitionInfos(partitioned_table_path, partition_infos));
Expand All @@ -516,6 +516,13 @@ int main() {
<< std::endl;
}

// List partitions with partial spec filter
std::vector<fluss::PartitionInfo> us_partition_infos;
check("list_partition_infos_with_spec",
admin.ListPartitionInfos(partitioned_table_path, {{"region", "US"}}, us_partition_infos));
std::cout << " Filtered (region=US): " << us_partition_infos.size() << " partition(s)"
<< std::endl;

// Write data to partitioned table
fluss::Table partitioned_table;
check("get_partitioned_table", conn.GetTable(partitioned_table_path, partitioned_table));
Expand Down
22 changes: 13 additions & 9 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,19 @@ enum class DatumType {

constexpr int64_t EARLIEST_OFFSET = -2;

enum class OffsetSpec {
enum class OffsetType {
Earliest = 0,
Latest = 1,
Timestamp = 2,
};

struct OffsetQuery {
OffsetSpec spec;
struct OffsetSpec {
OffsetType type;
int64_t timestamp{0};

static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
static OffsetQuery FromTimestamp(int64_t ts) { return {OffsetSpec::Timestamp, ts}; }
static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; }
static OffsetSpec Latest() { return {OffsetType::Latest, 0}; }
static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp, ts}; }
};

struct Result {
Expand Down Expand Up @@ -1000,15 +1000,19 @@ class Admin {
Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out);

Result ListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out);
const OffsetSpec& offset_spec, std::unordered_map<int32_t, int64_t>& out);

Result ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out);

Result ListPartitionInfos(const TablePath& table_path, std::vector<PartitionInfo>& out);

Result ListPartitionInfos(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
std::vector<PartitionInfo>& out);

Result CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists = false);
Expand All @@ -1035,7 +1039,7 @@ class Admin {

private:
Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out,
const OffsetSpec& offset_spec, std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name = nullptr);

friend class Connection;
Expand Down
45 changes: 38 additions & 7 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o

// function for common list offsets functionality
Result Admin::DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name) {
if (!Available()) {
Expand All @@ -122,8 +122,8 @@ Result Admin::DoListOffsets(const TablePath& table_path, const std::vector<int32
}

ffi::FfiOffsetQuery ffi_query;
ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
ffi_query.timestamp = offset_query.timestamp;
ffi_query.offset_type = static_cast<int32_t>(offset_spec.type);
ffi_query.timestamp = offset_spec.timestamp;

ffi::FfiListOffsetsResult ffi_result;
if (partition_name != nullptr) {
Expand All @@ -145,16 +145,16 @@ Result Admin::DoListOffsets(const TablePath& table_path, const std::vector<int32
}

Result Admin::ListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out);
return DoListOffsets(table_path, bucket_ids, offset_spec, out);
}

Result Admin::ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out, &partition_name);
return DoListOffsets(table_path, bucket_ids, offset_spec, out, &partition_name);
}

Result Admin::ListPartitionInfos(const TablePath& table_path, std::vector<PartitionInfo>& out) {
Expand All @@ -177,6 +177,37 @@ Result Admin::ListPartitionInfos(const TablePath& table_path, std::vector<Partit
return result;
}

Result Admin::ListPartitionInfos(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
std::vector<PartitionInfo>& out) {
if (!Available()) {
return utils::make_client_error("Admin not available");
}

auto ffi_path = utils::to_ffi_table_path(table_path);

rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
for (const auto& [key, value] : partition_spec) {
ffi::FfiPartitionKeyValue kv;
kv.key = rust::String(key);
kv.value = rust::String(value);
rust_spec.push_back(std::move(kv));
}

auto ffi_result = admin_->list_partition_infos_with_spec(ffi_path, std::move(rust_spec));

auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
out.reserve(ffi_result.partition_infos.size());
for (const auto& pi : ffi_result.partition_infos) {
out.push_back({pi.partition_id, std::string(pi.partition_name)});
}
}

return result;
}

Result Admin::CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists) {
Expand Down
78 changes: 54 additions & 24 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ mod ffi {
self: &Admin,
table_path: &FfiTablePath,
) -> FfiListPartitionInfosResult;
fn list_partition_infos_with_spec(
self: &Admin,
table_path: &FfiTablePath,
partition_spec: Vec<FfiPartitionKeyValue>,
) -> FfiListPartitionInfosResult;
fn create_partition(
self: &Admin,
table_path: &FfiTablePath,
Expand Down Expand Up @@ -735,30 +740,20 @@ impl Admin {
&self,
table_path: &ffi::FfiTablePath,
) -> ffi::FfiListPartitionInfosResult {
let path = fcore::metadata::TablePath::new(
table_path.database_name.clone(),
table_path.table_name.clone(),
);
let result = RUNTIME.block_on(async { self.inner.list_partition_infos(&path).await });
match result {
Ok(infos) => {
let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
.into_iter()
.map(|info| ffi::FfiPartitionInfo {
partition_id: info.get_partition_id(),
partition_name: info.get_partition_name(),
})
.collect();
ffi::FfiListPartitionInfosResult {
result: ok_result(),
partition_infos,
}
}
Err(e) => ffi::FfiListPartitionInfosResult {
result: err_from_core_error(&e),
partition_infos: vec![],
},
}
self.do_list_partition_infos(table_path, None)
}

fn list_partition_infos_with_spec(
&self,
table_path: &ffi::FfiTablePath,
partition_spec: Vec<ffi::FfiPartitionKeyValue>,
) -> ffi::FfiListPartitionInfosResult {
let spec_map: std::collections::HashMap<String, String> = partition_spec
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect();
let spec = fcore::metadata::PartitionSpec::new(spec_map);
self.do_list_partition_infos(table_path, Some(&spec))
}
fn create_partition(
&self,
Expand Down Expand Up @@ -939,6 +934,41 @@ impl Admin {
},
}
}

fn do_list_partition_infos(
&self,
table_path: &ffi::FfiTablePath,
partial_partition_spec: Option<&fcore::metadata::PartitionSpec>,
) -> ffi::FfiListPartitionInfosResult {
let path = fcore::metadata::TablePath::new(
table_path.database_name.clone(),
table_path.table_name.clone(),
);
let result = RUNTIME.block_on(async {
self.inner
.list_partition_infos_with_spec(&path, partial_partition_spec)
.await
});
match result {
Ok(infos) => {
let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
.into_iter()
.map(|info| ffi::FfiPartitionInfo {
partition_id: info.get_partition_id(),
partition_name: info.get_partition_name(),
})
.collect();
ffi::FfiListPartitionInfosResult {
result: ok_result(),
partition_infos,
}
}
Err(e) => ffi::FfiListPartitionInfosResult {
result: err_from_core_error(&e),
partition_infos: vec![],
},
}
}
}

// Table implementation
Expand Down
18 changes: 12 additions & 6 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ async def main():
# Demo: List offsets
print("\n--- Testing list_offsets() ---")
try:
# Query latest offsets using OffsetType constant (recommended for type safety)
# Query latest offsets using OffsetSpec factory method
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type=fluss.OffsetType.LATEST
offset_spec=fluss.OffsetSpec.latest()
)
print(f"Latest offsets for table (before writes): {offsets}")
except Exception as e:
Expand Down Expand Up @@ -248,11 +248,10 @@ async def main():
# Demo: Check offsets after writes
print("\n--- Checking offsets after writes ---")
try:
# Query with string constant (alternative API - both strings and constants are supported)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
offset_type="latest" # Can also use "earliest" or "timestamp"
offset_spec=fluss.OffsetSpec.latest()
)
print(f"Latest offsets after writing 7 records: {offsets}")
except Exception as e:
Expand Down Expand Up @@ -734,6 +733,13 @@ async def main():
await partitioned_writer.flush()
print("\nWrote 4 records (2 to US, 2 to EU)")

# Demo: list_partition_infos with partial spec filter
print("\n--- Testing list_partition_infos with spec ---")
us_partitions = await admin.list_partition_infos(
partitioned_table_path, partition_spec={"region": "US"}
)
print(f"Filtered partitions (region=US): {us_partitions}")

# Demo: list_partition_offsets
print("\n--- Testing list_partition_offsets ---")

Expand All @@ -743,7 +749,7 @@ async def main():
partitioned_table_path,
partition_name="US",
bucket_ids=[0],
offset_type="latest"
offset_spec=fluss.OffsetSpec.latest()
)
print(f"US partition latest offsets: {us_offsets}")

Expand All @@ -752,7 +758,7 @@ async def main():
partitioned_table_path,
partition_name="EU",
bucket_ids=[0],
offset_type="latest"
offset_spec=fluss.OffsetSpec.latest()
)
print(f"EU partition latest offsets: {eu_offsets}")

Expand Down
Loading
Loading