@@ -890,9 +890,9 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
890890 Pair ::first , Collectors .mapping (Pair ::second , Collectors .toList ())));
891891 }
892892
893- private List <TransformSpec > getSortTransformSpec (Table table ) {
894- return table .sortOrder ().fields ().stream (). map ( s ->
895- IcebergTableUtil .getTransformSpec (table , s .transform ().toString (), s .sourceId ()))
893+ private List <TransformSpec > getWriteSortTransformSpecs (Table table ) {
894+ return table .sortOrder ().fields ().stream ()
895+ . map ( s -> IcebergTableUtil .getTransformSpec (table , s .transform ().toString (), s .sourceId ()))
896896 .toList ();
897897 }
898898
@@ -913,11 +913,16 @@ public DynamicPartitionCtx createDPContext(
913913 hiveConf .getVar (ConfVars .DEFAULT_PARTITION_NAME ),
914914 hiveConf .getIntVar (ConfVars .DYNAMIC_PARTITION_MAX_PARTS_PER_NODE ));
915915
916+ // Add Iceberg partition transforms as custom partition expressions.
917+ // These are used for both distribution (deciding reducers) and sorting within reducers.
916918 if (table .spec ().isPartitioned () &&
917- hiveConf .getIntVar (ConfVars .HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD ) >= 0 ) {
918- addCustomSortExpr (table , hmsTable , writeOperation , dpCtx , getPartitionTransformSpec (hmsTable ));
919+ hiveConf .getIntVar (ConfVars .HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD ) >= 0 ) {
920+ addCustomPartitionTransformExpressions (table , hmsTable , writeOperation , dpCtx ,
921+ getPartitionTransformSpec (hmsTable ));
919922 }
920923
924+ // Add write sort order expressions as custom sort expressions.
925+ // These are used ONLY for sorting within reducers, NOT for distribution.
921926 SortOrder sortOrder = table .sortOrder ();
922927 if (sortOrder .isSorted ()) {
923928 List <Integer > customSortPositions = Lists .newLinkedList ();
@@ -943,7 +948,8 @@ public DynamicPartitionCtx createDPContext(
943948 }
944949 }
945950
946- addCustomSortExpr (table , hmsTable , writeOperation , dpCtx , getSortTransformSpec (table ));
951+ addCustomWriteSortExpressions (table , hmsTable , writeOperation , dpCtx ,
952+ getWriteSortTransformSpecs (table ));
947953 }
948954
949955 // Even if table has no explicit sort order, honor z-order if configured
@@ -999,21 +1005,43 @@ private void addZOrderCustomExpr(Map<String, String> props, DynamicPartitionCtx
9991005 }
10001006 }
10011007
1002- private void addCustomSortExpr (Table table , org .apache .hadoop .hive .ql .metadata .Table hmsTable ,
1003- Operation writeOperation , DynamicPartitionCtx dpCtx ,
1004- List <TransformSpec > transformSpecs ) {
1008+ private void addCustomPartitionTransformExpressions (Table table ,
1009+ org .apache .hadoop .hive .ql .metadata .Table hmsTable , Operation writeOperation ,
1010+ DynamicPartitionCtx dpCtx , List <TransformSpec > transformSpecs ) {
1011+ Map <String , Integer > fieldOrderMap = buildFieldOrderMap (table );
1012+ int offset = getWriteRowOffset (hmsTable , writeOperation );
1013+
1014+ dpCtx .addCustomPartitionExpressions (transformSpecs .stream ()
1015+ .map (spec -> IcebergTransformSortFunctionUtil .getCustomTransformExpr (
1016+ spec , fieldOrderMap .get (spec .getColumnName ()) + offset ))
1017+ .collect (Collectors .toList ()));
1018+ }
1019+
1020+ private void addCustomWriteSortExpressions (Table table ,
1021+ org .apache .hadoop .hive .ql .metadata .Table hmsTable , Operation writeOperation ,
1022+ DynamicPartitionCtx dpCtx , List <TransformSpec > transformSpecs ) {
1023+ Map <String , Integer > fieldOrderMap = buildFieldOrderMap (table );
1024+ int offset = getWriteRowOffset (hmsTable , writeOperation );
1025+
1026+ dpCtx .addCustomSortExpressions (transformSpecs .stream ()
1027+ .map (spec -> IcebergTransformSortFunctionUtil .getCustomTransformExpr (
1028+ spec , fieldOrderMap .get (spec .getColumnName ()) + offset ))
1029+ .collect (Collectors .toList ()));
1030+ }
1031+
1032+ private Map <String , Integer > buildFieldOrderMap (Table table ) {
10051033 List <Types .NestedField > fields = table .schema ().columns ();
10061034 Map <String , Integer > fieldOrderMap = Maps .newHashMapWithExpectedSize (fields .size ());
10071035 for (int i = 0 ; i < fields .size (); ++i ) {
10081036 fieldOrderMap .put (fields .get (i ).name (), i );
10091037 }
1038+ return fieldOrderMap ;
1039+ }
10101040
1011- int offset = (shouldOverwrite (hmsTable , writeOperation ) ?
1012- ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns (hmsTable , writeOperation )).size ();
1013-
1014- dpCtx .addCustomSortExpressions (transformSpecs .stream ().map (spec ->
1015- IcebergTransformSortFunctionUtil .getCustomSortExprs (spec , fieldOrderMap .get (spec .getColumnName ()) + offset )
1016- ).collect (Collectors .toList ()));
1041+ private int getWriteRowOffset (org .apache .hadoop .hive .ql .metadata .Table hmsTable , Operation writeOperation ) {
1042+ return (shouldOverwrite (hmsTable , writeOperation ) ?
1043+ ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA :
1044+ acidSelectColumns (hmsTable , writeOperation )).size ();
10171045 }
10181046
10191047 @ Override
0 commit comments