Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
List<Field> fields = new ArrayList<>();
// set schema
fields.addAll(
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
LanceArrowUtils.toArrowSchema(
tableDescriptor.getSchema().getRowType(),
tableDescriptor.getCustomProperties())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to confirm the return type of getCustomProperties(). If it returns an immutable Map or a Properties type rather than Map<String, String>, will tableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX) in toArrowField work correctly? Suggest using Map<String, String> in the new method signature and ensuring compatibility at the call site.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCustomProperties() returns Map<String, String>

    public Map<String, String> getCustomProperties() {
        return customProperties;
    }

.getFields());
try {
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new Schema(fields), params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Schema;

Expand Down Expand Up @@ -54,31 +55,43 @@ public static VectorSchemaRoot convertToNonShaded(
VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
nonShadedRoot.allocateNew();

List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> shadedVectors =
shadedRoot.getFieldVectors();
List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
try {
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> shadedVectors =
shadedRoot.getFieldVectors();
List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();

for (int i = 0; i < shadedVectors.size(); i++) {
copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
}
for (int i = 0; i < shadedVectors.size(); i++) {
copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
}

nonShadedRoot.setRowCount(shadedRoot.getRowCount());
return nonShadedRoot;
nonShadedRoot.setRowCount(shadedRoot.getRowCount());
return nonShadedRoot;
} catch (Exception e) {
nonShadedRoot.close();
throw e;
}
}

private static void copyVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedVector,
FieldVector nonShadedVector) {

if (shadedVector
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
&& nonShadedVector instanceof ListVector) {
copyListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(ListVector) nonShadedVector);
return;
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) {
if (nonShadedVector instanceof FixedSizeListVector) {
copyListToFixedSizeListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(FixedSizeListVector) nonShadedVector);
return;
} else if (nonShadedVector instanceof ListVector) {
copyListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(ListVector) nonShadedVector);
return;
}
}

List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> shadedBuffers =
Expand Down Expand Up @@ -143,4 +156,129 @@ private static void copyListVectorData(
// For ListVector, we need to manually set lastSet to avoid offset buffer recalculation
nonShadedListVector.setLastSet(valueCount - 1);
}

private static void copyListToFixedSizeListVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
shadedListVector,
FixedSizeListVector nonShadedFixedSizeListVector) {

int valueCount = shadedListVector.getValueCount();
int expectedListSize = nonShadedFixedSizeListVector.getListSize();
int nonNullCount = valueCount - shadedListVector.getNullCount();
int expectedTotalChildCount = nonNullCount * expectedListSize;

// Validate that backing data vector element count matches expected fixed-size layout.
int totalChildCount = shadedListVector.getDataVector().getValueCount();
if (totalChildCount != expectedTotalChildCount) {
throw new IllegalArgumentException(
String.format(
"Total child elements (%d) does not match expected %d for FixedSizeList conversion.",
totalChildCount, expectedTotalChildCount));
}

// Copy child data from the source ListVector to the target FixedSizeListVector.
//
// In a ListVector, child elements for non-null rows are packed contiguously
// (null rows contribute 0 children). In a FixedSizeListVector, child data is
// stride-based: row i's data starts at index i * listSize, so null rows still
// occupy child slots. When null rows exist, a simple bulk buffer copy won't
// work because the layouts differ — we must remap per-row using the source
// offset buffer.
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedDataVector =
shadedListVector.getDataVector();
FieldVector nonShadedDataVector = nonShadedFixedSizeListVector.getDataVector();

if (shadedDataVector != null && nonShadedDataVector != null) {
if (nonNullCount == valueCount) {
// No null rows — child layouts are identical, use fast bulk copy.
copyVectorData(shadedDataVector, nonShadedDataVector);
} else if (totalChildCount > 0) {
// Null rows present — copy child data row-by-row with offset remapping.
copyChildDataWithOffsetRemapping(
shadedListVector, nonShadedDataVector, valueCount, expectedListSize);
nonShadedDataVector.setValueCount(valueCount * expectedListSize);
}
}

// FixedSizeListVector only has a validity buffer (no offset buffer).
// Copy the validity buffer from the shaded ListVector.
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> shadedBuffers =
shadedListVector.getFieldBuffers();
List<ArrowBuf> nonShadedBuffers = nonShadedFixedSizeListVector.getFieldBuffers();

// Both ListVector and FixedSizeListVector have validity as their first buffer
if (!shadedBuffers.isEmpty() && !nonShadedBuffers.isEmpty()) {
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf shadedValidityBuf =
shadedBuffers.get(0);
ArrowBuf nonShadedValidityBuf = nonShadedBuffers.get(0);

long size = Math.min(shadedValidityBuf.capacity(), nonShadedValidityBuf.capacity());
if (size > 0) {
ByteBuffer srcBuffer = shadedValidityBuf.nioBuffer(0, (int) size);
srcBuffer.position(0);
srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
nonShadedValidityBuf.setBytes(0, srcBuffer);
}
}

nonShadedFixedSizeListVector.setValueCount(valueCount);
}

/**
* Copies child element data from a shaded ListVector to a non-shaded child vector, remapping
* offsets so that row i's data lands at index {@code i * listSize} in the target (the layout
* required by FixedSizeListVector). Null rows in the source are skipped.
*/
private static void copyChildDataWithOffsetRemapping(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
shadedListVector,
FieldVector nonShadedChildVector,
int valueCount,
int listSize) {

// Source child data buffer (index 1 for fixed-width vectors: [validity, data])
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf srcDataBuf =
shadedListVector.getDataVector().getFieldBuffers().get(1);
// Source offset buffer: offset[i] is the start element index for row i
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf srcOffsetBuf =
shadedListVector.getOffsetBuffer();

// Target child buffers: [0] = validity, [1] = data
ArrowBuf tgtValidityBuf = nonShadedChildVector.getFieldBuffers().get(0);
ArrowBuf tgtDataBuf = nonShadedChildVector.getFieldBuffers().get(1);

// Get element byte width from the non-shaded child vector type.
// Buffer capacity cannot be used because Arrow pads/aligns buffers.
if (!(nonShadedChildVector instanceof org.apache.arrow.vector.BaseFixedWidthVector)) {
throw new UnsupportedOperationException(
"FixedSizeList conversion with null rows only supports fixed-width child vectors, got "
+ nonShadedChildVector.getClass().getSimpleName());
}
int elementByteWidth =
((org.apache.arrow.vector.BaseFixedWidthVector) nonShadedChildVector)
.getTypeWidth();
int rowByteWidth = listSize * elementByteWidth;

for (int i = 0; i < valueCount; i++) {
if (!shadedListVector.isNull(i)) {
int srcElementStart = srcOffsetBuf.getInt((long) i * Integer.BYTES);
int srcByteOffset = srcElementStart * elementByteWidth;
int tgtElementStart = i * listSize;
int tgtByteOffset = tgtElementStart * elementByteWidth;

// Copy the data bytes for this row's child elements
ByteBuffer srcSlice = srcDataBuf.nioBuffer(srcByteOffset, rowByteWidth);
tgtDataBuf.setBytes(tgtByteOffset, srcSlice);

// Set validity bits for each child element in this row
for (int j = 0; j < listSize; j++) {
int bitIndex = tgtElementStart + j;
int byteIndex = bitIndex / 8;
int bitOffset = bitIndex % 8;
byte currentByte = tgtValidityBuf.getByte(byteIndex);
tgtValidityBuf.setByte(byteIndex, currentByte | (1 << bitOffset));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,84 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is needed because Lance
* requires non-shaded Arrow API.
*/
public class LanceArrowUtils {

/** Property suffix for configuring a fixed-size list Arrow type on array columns. */
public static final String FIXED_SIZE_LIST_SIZE_SUFFIX = ".arrow.fixed-size-list.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property key format is <column_name>.arrow.fixed-size-list.size. If the column name itself contains . (e.g., a.b), it leads to parsing ambiguity. For example, a.b.arrow.fixed-size-list.size — it's unclear whether this refers to column a.b or column a with some unrelated suffix.

Suggestion:

  1. Explicitly document that column names must not contain .; or
  2. Validate in toArrowField that the column name does not contain ., and throw a meaningful error message if it does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment, I actually look up Lance doc further and found that they do not support dots in columns. Ref: https://docs.lancedb.com/search/filtering#advanced-sql-filters

Updated to throw exception


/** Returns the non-shaded Arrow schema of the specified Fluss RowType. */
public static Schema toArrowSchema(RowType rowType) {
return toArrowSchema(rowType, Collections.emptyMap());
}

/**
* Returns the non-shaded Arrow schema of the specified Fluss RowType, using table properties to
* determine whether array columns should use FixedSizeList instead of List.
*
* <p>When a table property {@code <column>.arrow.fixed-size-list.size} is set, the
* corresponding ARRAY column will be emitted as {@code FixedSizeList<element>(size)} instead of
* {@code List<element>}.
*/
public static Schema toArrowSchema(RowType rowType, Map<String, String> tableProperties) {
List<Field> fields =
rowType.getFields().stream()
.map(f -> toArrowField(f.getName(), f.getType()))
.map(f -> toArrowField(f.getName(), f.getType(), tableProperties))
.collect(Collectors.toList());
return new Schema(fields);
}

private static Field toArrowField(String fieldName, DataType logicalType) {
FieldType fieldType =
new FieldType(logicalType.isNullable(), toArrowType(logicalType), null);
private static Field toArrowField(
String fieldName, DataType logicalType, Map<String, String> tableProperties) {
checkArgument(
!fieldName.contains("."),
"Column name '%s' must not contain periods. "
+ "Lance does not support field names with periods.",
fieldName);
ArrowType arrowType;
if (logicalType instanceof ArrayType && tableProperties != null) {
String sizeStr = tableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX);
if (sizeStr != null) {
int listSize;
try {
listSize = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
String.format(
"Invalid value '%s' for property '%s', expected a positive integer.",
sizeStr, fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX),
e);
}

checkArgument(
listSize > 0,
"Invalid value '%s' for property '%s'. Expected a positive integer.",
sizeStr,
fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX);
arrowType = new ArrowType.FixedSizeList(listSize);
} else {
arrowType = toArrowType(logicalType);
}
} else {
arrowType = toArrowType(logicalType);
}
FieldType fieldType = new FieldType(logicalType.isNullable(), arrowType, null);
List<Field> children = null;
if (logicalType instanceof ArrayType) {
children =
Collections.singletonList(
toArrowField("element", ((ArrayType) logicalType).getElementType()));
toArrowField(
"element",
((ArrayType) logicalType).getElementType(),
tableProperties));
}
return new Field(fieldName, fieldType, children);
}
Expand Down
Loading