Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -31,6 +32,7 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.NoSuchStatementException;
Expand Down Expand Up @@ -105,9 +107,10 @@ public ExecuteResult execute(
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}

Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
new AvaticaParameterBinder(
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
.bind(typedValues);
.bind(typedValues, 0, rawTimestamps);

if (statementHandle.signature == null
|| statementHandle.signature.statementType == StatementType.IS_DML) {
Expand Down Expand Up @@ -149,11 +152,12 @@ public ExecuteBatchResult executeBatch(
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}

Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
final AvaticaParameterBinder binder =
new AvaticaParameterBinder(
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator());
for (int i = 0; i < parameterValuesList.size(); i++) {
binder.bind(parameterValuesList.get(i), i);
binder.bind(parameterValuesList.get(i), i, rawTimestamps);
}

// Update query
Expand All @@ -173,6 +177,14 @@ public Frame fetch(
String.format("%s does not use frames.", this), AvaticaConnection.HELPER.unsupported());
}

private Map<Integer, Timestamp> getRawTimestamps(StatementHandle statementHandle) {
AvaticaStatement avaticaStmt = connection.statementMap.get(statementHandle.id);
if (avaticaStmt instanceof ArrowFlightPreparedStatement) {
return ((ArrowFlightPreparedStatement) avaticaStmt).getRawTimestamps();
}
return Collections.emptyMap();
}

private PreparedStatement prepareForHandle(final String query, StatementHandle handle) {
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.util.Preconditions;
Expand All @@ -30,6 +35,7 @@ public class ArrowFlightPreparedStatement extends AvaticaPreparedStatement
implements ArrowFlightInfoStatement {

private final ArrowFlightSqlClientHandler.PreparedStatement preparedStatement;
private final Map<Integer, Timestamp> rawTimestamps = new HashMap<>();

private ArrowFlightPreparedStatement(
final ArrowFlightConnection connection,
Expand Down Expand Up @@ -74,6 +80,60 @@ public synchronized void close() throws SQLException {
super.close();
}

@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
if (x != null) {
rawTimestamps.put(parameterIndex, (Timestamp) x.clone());
} else {
rawTimestamps.remove(parameterIndex);
}
super.setTimestamp(parameterIndex, x);
}

@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
if (x != null) {
rawTimestamps.put(parameterIndex, (Timestamp) x.clone());
} else {
rawTimestamps.remove(parameterIndex);
}
super.setTimestamp(parameterIndex, x, cal);
}

@Override
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
rawTimestamps.remove(parameterIndex);
super.setObject(parameterIndex, x, targetSqlType);
}

@Override
public void setObject(int parameterIndex, Object x) throws SQLException {
rawTimestamps.remove(parameterIndex);
super.setObject(parameterIndex, x);
}

@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength)
throws SQLException {
rawTimestamps.remove(parameterIndex);
super.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
}

@Override
public void clearParameters() throws SQLException {
rawTimestamps.clear();
super.clearParameters();
}

/**
* Returns the raw java.sql.Timestamp objects set via setTimestamp(), keyed by 1-based parameter
* index. These preserve sub-millisecond precision (getNanos()) that Avatica's TypedValue
* serialization discards.
*/
Map<Integer, Timestamp> getRawTimestamps() {
return Collections.unmodifiableMap(rawTimestamps);
}

@Override
public FlightInfo executeFlightInfoQuery() throws SQLException {
return preparedStatement.executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.setValueCount(index + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.endValue(index, values.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.endValue(index, values.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.converter.impl;

import java.sql.Timestamp;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand All @@ -28,16 +29,87 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.remote.TypedValue;
import org.checkerframework.checker.nullness.qual.Nullable;

/** AvaticaParameterConverter for Timestamp Arrow types. */
public class TimestampAvaticaParameterConverter extends BaseAvaticaParameterConverter {

public TimestampAvaticaParameterConverter(ArrowType.Timestamp type) {}
private final ArrowType.Timestamp type;

public TimestampAvaticaParameterConverter(ArrowType.Timestamp type) {
this.type = type;
}

/**
* Converts a raw java.sql.Timestamp to the value in the target Arrow time unit, preserving
* sub-millisecond precision from Timestamp.getNanos().
*/
private long convertFromTimestamp(Timestamp ts) {
// Timestamp.getTime() returns epoch millis (truncated, no sub-ms precision).
// Timestamp.getNanos() returns the fractional-second component in nanoseconds (0..999_999_999).
// We reconstruct the full-precision value from epoch seconds + nanos to avoid double-counting.
long epochSeconds = Math.floorDiv(ts.getTime(), 1_000L);
int nanos = ts.getNanos(); // 0..999_999_999, full fractional second
switch (type.getUnit()) {
case SECOND:
return epochSeconds;
case MILLISECOND:
return epochSeconds * 1_000L + nanos / 1_000_000;
case MICROSECOND:
return epochSeconds * 1_000_000L + nanos / 1_000;
case NANOSECOND:
return epochSeconds * 1_000_000_000L + nanos;
default:
throw new UnsupportedOperationException("Unsupported time unit: " + type.getUnit());
}
}

/** Converts an epoch millisecond value from Avatica to the target time unit. */
private long convertFromMillis(long epochMillis) {
switch (type.getUnit()) {
case SECOND:
return Math.floorDiv(epochMillis, 1_000L);
case MILLISECOND:
return epochMillis;
case MICROSECOND:
return epochMillis * 1_000L;
case NANOSECOND:
return epochMillis * 1_000_000L;
default:
throw new UnsupportedOperationException("Unsupported time unit: " + type.getUnit());
}
}

/**
* Bind a timestamp parameter, using the raw java.sql.Timestamp if available for full precision.
*
* @param vector FieldVector to bind to.
* @param typedValue TypedValue from Avatica (epoch millis, may have lost sub-ms precision).
* @param index Vector index to bind the value at.
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond nanos.
* @return true if binding was successful.
*/
public boolean bindParameter(
FieldVector vector, TypedValue typedValue, int index, @Nullable Timestamp rawTimestamp) {
long value;
// Only use the raw timestamp if the TypedValue actually represents a timestamp.
if (rawTimestamp != null && typedValue.type == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) {
value = convertFromTimestamp(rawTimestamp);
} else {
value = convertFromMillis((long) typedValue.toLocal());
}
return setTimestampVector(vector, index, value);
}

@Override
public boolean bindParameter(FieldVector vector, TypedValue typedValue, int index) {
long value = (long) typedValue.toLocal();
long value = convertFromMillis((long) typedValue.toLocal());
return setTimestampVector(vector, index, value);
}

private boolean setTimestampVector(FieldVector vector, int index, long value) {
if (vector instanceof TimeStampSecVector) {
((TimeStampSecVector) vector).setSafe(index, value);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.arrow.driver.jdbc.utils;

import java.sql.Timestamp;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.converter.impl.BinaryAvaticaParameterConverter;
import org.apache.arrow.driver.jdbc.converter.impl.BinaryViewAvaticaParameterConverter;
Expand Down Expand Up @@ -81,7 +84,7 @@ public AvaticaParameterBinder(
* @param typedValues The parameter values.
*/
public void bind(List<TypedValue> typedValues) {
bind(typedValues, 0);
bind(typedValues, 0, Collections.emptyMap());
}

/**
Expand All @@ -91,6 +94,18 @@ public void bind(List<TypedValue> typedValues) {
* @param index index for parameter.
*/
public void bind(List<TypedValue> typedValues, int index) {
bind(typedValues, index, Collections.emptyMap());
}

/**
* Bind the given Avatica values to the prepared statement at the given index, with optional raw
* java.sql.Timestamp values that preserve sub-millisecond precision.
*
* @param typedValues The parameter values.
* @param index index for parameter.
* @param rawTimestamps Raw java.sql.Timestamp objects keyed by 1-based parameter index.
*/
public void bind(List<TypedValue> typedValues, int index, Map<Integer, Timestamp> rawTimestamps) {
if (preparedStatement.getParameterSchema().getFields().size() != typedValues.size()) {
throw new IllegalStateException(
String.format(
Expand All @@ -99,7 +114,9 @@ public void bind(List<TypedValue> typedValues, int index) {
}

for (int i = 0; i < typedValues.size(); i++) {
bind(parameters.getVector(i), typedValues.get(i), index);
// rawTimestamps uses 1-based JDBC parameter indices
Timestamp rawTs = rawTimestamps.get(i + 1);
bind(parameters.getVector(i), typedValues.get(i), index, rawTs);
}

if (!typedValues.isEmpty()) {
Expand All @@ -114,8 +131,13 @@ public void bind(List<TypedValue> typedValues, int index) {
* @param vector FieldVector to bind to.
* @param typedValue TypedValue to bind to the vector.
* @param index Vector index to bind the value at.
* @param rawTimestamp Optional raw java.sql.Timestamp with sub-millisecond precision.
*/
private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index) {
private void bind(
FieldVector vector,
@Nullable TypedValue typedValue,
int index,
@Nullable Timestamp rawTimestamp) {
try {
if (typedValue == null || typedValue.value == null) {
if (vector.getField().isNullable()) {
Expand All @@ -126,7 +148,7 @@ private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index
} else if (!vector
.getField()
.getType()
.accept(new BinderVisitor(vector, typedValue, index))) {
.accept(new BinderVisitor(vector, typedValue, index, rawTimestamp))) {
throw new UnsupportedOperationException(
String.format("Binding to vector type %s is not yet supported", vector.getClass()));
}
Expand All @@ -146,18 +168,22 @@ public static class BinderVisitor implements ArrowType.ArrowTypeVisitor<Boolean>
private final FieldVector vector;
private final TypedValue typedValue;
private final int index;
@Nullable private final Timestamp rawTimestamp;

/**
* Instantiate a new BinderVisitor.
*
* @param vector FieldVector to bind values to.
* @param value TypedValue to bind.
* @param index Vector index (0-based) to bind the value to.
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond precision.
*/
public BinderVisitor(FieldVector vector, TypedValue value, int index) {
public BinderVisitor(
FieldVector vector, TypedValue value, int index, @Nullable Timestamp rawTimestamp) {
this.vector = vector;
this.typedValue = value;
this.index = index;
this.rawTimestamp = rawTimestamp;
}

@Override
Expand Down Expand Up @@ -266,7 +292,8 @@ public Boolean visit(ArrowType.Time type) {

@Override
public Boolean visit(ArrowType.Timestamp type) {
return new TimestampAvaticaParameterConverter(type).bindParameter(vector, typedValue, index);
return new TimestampAvaticaParameterConverter(type)
.bindParameter(vector, typedValue, index, rawTimestamp);
}

@Override
Expand Down
Loading
Loading