diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index d03ce7a2..69659677 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -363,7 +363,7 @@ impl FlussAdmin { admin .drop_table(&core_table_path, ignore_if_not_exists) .await - .map_err(|e| FlussError::new_err(format!("Failed to drop table: {e}")))?; + .map_err(|e| FlussError::from_core_error(&e))?; Python::attach(|py| Ok(py.None())) }) @@ -401,7 +401,7 @@ impl FlussAdmin { let offsets = admin .list_offsets(&core_table_path, &bucket_ids, offset_spec) .await - .map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?; + .map_err(|e| FlussError::from_core_error(&e))?; Python::attach(|py| { let dict = pyo3::types::PyDict::new(py); @@ -448,9 +448,7 @@ impl FlussAdmin { let offsets = admin .list_partition_offsets(&core_table_path, &partition_name, &bucket_ids, offset_spec) .await - .map_err(|e| { - FlussError::new_err(format!("Failed to list partition offsets: {e}")) - })?; + .map_err(|e| FlussError::from_core_error(&e))?; Python::attach(|py| { let dict = pyo3::types::PyDict::new(py); @@ -487,7 +485,7 @@ impl FlussAdmin { admin .create_partition(&core_table_path, &core_partition_spec, ignore_if_exists) .await - .map_err(|e| FlussError::new_err(format!("Failed to create partition: {e}")))?; + .map_err(|e| FlussError::from_core_error(&e))?; Python::attach(|py| Ok(py.None())) }) @@ -512,7 +510,7 @@ impl FlussAdmin { let partition_infos = admin .list_partition_infos(&core_table_path) .await - .map_err(|e| FlussError::new_err(format!("Failed to list partitions: {e}")))?; + .map_err(|e| FlussError::from_core_error(&e))?; Python::attach(|py| { let py_list = pyo3::types::PyList::empty(py); diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index b370682a..a17e57fb 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -24,7 +24,7 @@ use crate::rpc::RpcClient; use parking_lot::RwLock; use std::sync::Arc; -use crate::error::Result; +use crate::error::{Error, FlussError, Result}; use crate::metadata::TablePath; pub struct FlussConnection { @@ -88,7 +88,18 @@ impl FlussConnection { pub async fn get_table(&self, table_path: &TablePath) -> Result> { self.metadata.update_table_metadata(table_path).await?; - let table_info = self.metadata.get_cluster().get_table(table_path)?.clone(); + let table_info = self + .metadata + .get_cluster() + .get_table(table_path) + .map_err(|e| { + if e.api_error() == Some(FlussError::InvalidTableException) { + Error::table_not_exist(format!("Table not found: {table_path}")) + } else { + e + } + })? + .clone(); Ok(FlussTable::new(self, self.metadata.clone(), table_info)) } } diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index af9f274c..59524a63 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -112,6 +112,15 @@ pub enum Error { /// These create `FlussAPIError` with the correct protocol error code, /// consistent with Java where e.g. `InvalidTableException` always carries code 15. impl Error { + pub fn table_not_exist(message: impl Into) -> Self { + Error::FlussAPIError { + api_error: ApiError { + code: FlussError::TableNotExist.code(), + message: message.into(), + }, + } + } + pub fn invalid_table(message: impl Into) -> Self { Error::FlussAPIError { api_error: ApiError {