Skip to content

Commit ddf31fd

Browse files
committed
fix ext func caching
1 parent a9adc70 commit ddf31fd

2 files changed

Lines changed: 64 additions & 12 deletions

File tree

native-engine/auron-planner/src/planner.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -906,17 +906,12 @@ impl PhysicalPlanner {
906906
let fun_name = &e.name;
907907
let fun = datafusion_ext_functions::create_auron_ext_function(
908908
fun_name,
909-
self.partition_id,
910-
)?;
911-
Arc::new(create_udf(
912-
&format!("spark_ext_function_{fun_name}"),
913909
args.iter()
914910
.map(|e| e.data_type(input_schema))
915911
.collect::<Result<Vec<_>, _>>()?,
916912
convert_required!(e.return_type)?,
917-
Volatility::Volatile,
918-
fun,
919-
))
913+
)?;
914+
Arc::new(ScalarUDF::from(fun))
920915
} else {
921916
let scalar_udf: Arc<ScalarUDF> = scalar_function.into();
922917
scalar_udf

native-engine/datafusion-ext-functions/src/lib.rs

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use std::sync::Arc;
16+
use std::{any::Any, fmt, sync::Arc};
1717

18-
use datafusion::{common::Result, logical_expr::ScalarFunctionImplementation};
18+
use datafusion::{
19+
arrow::datatypes::DataType,
20+
logical_expr::{
21+
ColumnarValue, ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDFImpl, Signature,
22+
Volatility,
23+
},
24+
};
1925
use datafusion_ext_commons::df_unimplemented_err;
2026

2127
mod brickhouse;
@@ -38,12 +44,13 @@ mod spark_unscaled_value;
3844
#[allow(clippy::panic)] // Temporarily allow panic to refactor to Result later
3945
pub fn create_auron_ext_function(
4046
name: &str,
41-
spark_partition_id: usize,
42-
) -> Result<ScalarFunctionImplementation> {
47+
input_types: Vec<DataType>,
48+
return_type: DataType,
49+
) -> datafusion::common::Result<AuronExtFunction> {
4350
// auron ext functions, if used for spark should be start with 'Spark_',
4451
// if used for flink should be start with 'Flink_',
4552
// same to other engines.
46-
Ok(match name {
53+
let fun: ScalarFunctionImplementation = match name {
4754
"Placeholder" => Arc::new(|_| panic!("placeholder() should never be called")),
4855
"Spark_NullIf" => Arc::new(spark_null_if::spark_null_if),
4956
"Spark_NullIfZero" => Arc::new(spark_null_if::spark_null_if_zero),
@@ -86,5 +93,55 @@ pub fn create_auron_ext_function(
8693
}
8794
"Spark_IsNaN" => Arc::new(spark_isnan::spark_isnan),
8895
_ => df_unimplemented_err!("spark ext function not implemented: {name}")?,
96+
};
97+
98+
Ok(AuronExtFunction {
99+
name: name.to_string(),
100+
signature: Signature::exact(input_types, Volatility::Volatile),
101+
return_type,
102+
fun,
89103
})
90104
}
105+
106+
pub struct AuronExtFunction {
107+
name: String,
108+
signature: Signature,
109+
return_type: DataType,
110+
fun: ScalarFunctionImplementation,
111+
}
112+
113+
impl fmt::Debug for AuronExtFunction {
114+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115+
f.debug_struct("AuronExtFunction")
116+
.field("name", &self.name)
117+
.field("signature", &self.signature)
118+
.field("return_type", &self.return_type)
119+
.field("fun", &"<FUNC>")
120+
.finish()
121+
}
122+
}
123+
124+
impl ScalarUDFImpl for AuronExtFunction {
125+
fn as_any(&self) -> &dyn Any {
126+
self
127+
}
128+
129+
fn name(&self) -> &str {
130+
&self.name
131+
}
132+
133+
fn signature(&self) -> &Signature {
134+
&self.signature
135+
}
136+
137+
fn return_type(&self, _arg_types: &[DataType]) -> datafusion::common::Result<DataType> {
138+
Ok(self.return_type.clone())
139+
}
140+
141+
fn invoke_with_args(
142+
&self,
143+
args: ScalarFunctionArgs,
144+
) -> datafusion::common::Result<ColumnarValue> {
145+
(self.fun)(&args.args)
146+
}
147+
}

0 commit comments

Comments
 (0)