Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions bindings/python/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl FlussAdmin {
admin
.drop_table(&core_table_path, ignore_if_not_exists)
.await
.map_err(|e| FlussError::new_err(format!("Failed to drop table: {e}")))?;
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| Ok(py.None()))
})
Expand Down Expand Up @@ -401,7 +401,7 @@ impl FlussAdmin {
let offsets = admin
.list_offsets(&core_table_path, &bucket_ids, offset_spec)
.await
.map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?;
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| {
let dict = pyo3::types::PyDict::new(py);
Expand Down Expand Up @@ -448,9 +448,7 @@ impl FlussAdmin {
let offsets = admin
.list_partition_offsets(&core_table_path, &partition_name, &bucket_ids, offset_spec)
.await
.map_err(|e| {
FlussError::new_err(format!("Failed to list partition offsets: {e}"))
})?;
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| {
let dict = pyo3::types::PyDict::new(py);
Expand Down Expand Up @@ -487,7 +485,7 @@ impl FlussAdmin {
admin
.create_partition(&core_table_path, &core_partition_spec, ignore_if_exists)
.await
.map_err(|e| FlussError::new_err(format!("Failed to create partition: {e}")))?;
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| Ok(py.None()))
})
Expand All @@ -512,7 +510,7 @@ impl FlussAdmin {
let partition_infos = admin
.list_partition_infos(&core_table_path)
.await
.map_err(|e| FlussError::new_err(format!("Failed to list partitions: {e}")))?;
.map_err(|e| FlussError::from_core_error(&e))?;

Python::attach(|py| {
let py_list = pyo3::types::PyList::empty(py);
Expand Down
15 changes: 13 additions & 2 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;

use crate::error::Result;
use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;

pub struct FlussConnection {
Expand Down Expand Up @@ -88,7 +88,18 @@ impl FlussConnection {

pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
let table_info = self.metadata.get_cluster().get_table(table_path)?.clone();
let table_info = self
.metadata
.get_cluster()
.get_table(table_path)
.map_err(|e| {
if e.api_error() == Some(FlussError::InvalidTableException) {
Error::table_not_exist(format!("Table not found: {table_path}"))
} else {
e
}
})?
.clone();
Ok(FlussTable::new(self, self.metadata.clone(), table_info))
}
}
9 changes: 9 additions & 0 deletions crates/fluss/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ pub enum Error {
/// These create `FlussAPIError` with the correct protocol error code,
/// consistent with Java where e.g. `InvalidTableException` always carries code 15.
impl Error {
pub fn table_not_exist(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
code: FlussError::TableNotExist.code(),
message: message.into(),
},
}
}

pub fn invalid_table(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {
Expand Down
Loading