MDT Test framework without writing data files#17796
MDT Test framework without writing data files#17796vamsikarnika wants to merge 20 commits intoapache:masterfrom
Conversation
| initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition); | ||
| break; | ||
| case PARTITION_STATS: | ||
| // For PARTITION_STATS, COLUMN_STATS should also be enabled |
There was a problem hiding this comment.
lets get this taken care. lets not comment out any code.
but introduce a config and disable it in our benchmarking script.
There was a problem hiding this comment.
we can try reverting this commit if that helps
| } | ||
|
|
||
| if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) { | ||
| checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient), |
There was a problem hiding this comment.
same here. lets revert these changes
| } catch (IOException ioe) { | ||
| throw new HoodieIOException(ioe.getMessage(), ioe); | ||
| } | ||
| log.warn("Skipping reconcile markers for instant: {}", instantTs); |
There was a problem hiding this comment.
were we not able to understand why this was deleting the data or log files?
There was a problem hiding this comment.
this is P1. and on me (siva)
|
|
||
| import scala.collection.JavaConverters; | ||
|
|
||
| public class HoodieMDTStats implements Closeable { |
There was a problem hiding this comment.
MetadataBenchmarkingTool
| @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true) | ||
| public Integer numPartitions = 1; | ||
|
|
||
| @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false) |
There was a problem hiding this comment.
lets do incremental commits differently.
for instance,
to bootstrap, we can offer a top level config for num files.
and for incremental we can offer a diff config.
something like
add 1M files in first batch.
and then 5000 files in each incremental batch.
we don't need this right now. but in a few days, once get the benchmarking tool in ready to use state.
There was a problem hiding this comment.
for our first deliverable, 1 commit would suffice.
but we need to ensure all files in MDT are hfiles (i..e base files) and not log files.
we can enhance for more number of commits next week.
There was a problem hiding this comment.
lets try to get this working:
initial setup:
just create data table w/o any metadata table.
from tests:
generate data
ingest into mdt which will also initialize. that way, we get hfiles in mdt directly.
| spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode", "strict"); | ||
|
|
||
| // Create schema with the columns used for data skipping | ||
| StructType dataSchema = new StructType() |
There was a problem hiding this comment.
I see we have declared the schema in 3 places.
can we declare once and reuse it wherever required
| LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString()); | ||
|
|
||
| dataFilters.add(filter1); | ||
| // Expression filter2 = org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr( |
| scala.collection.Seq<Expression> partitionFiltersSeq = partitionFiltersList; | ||
|
|
||
| // Call filterFileSlices | ||
| scala.collection.Seq<scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>, |
There was a problem hiding this comment.
where is the timer here?
are we not interested in measuring the read latency (just the planning) in this case?
| LOG.info(String.join("", Collections.nCopies(100, "-"))); | ||
|
|
||
| int totalFileSlices = 0; | ||
| for (int j = 0; j < filteredSlices.size(); j++) { |
There was a problem hiding this comment.
lets just print the total file slices we get after filtering.
for verbose output, we should add additional top level config.
| } | ||
|
|
||
| public static class Config implements Serializable { | ||
| @Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of columns to index", required = true) |
There was a problem hiding this comment.
lets fix the description to be in line w/ the config.
for all configs
There was a problem hiding this comment.
On the query side,
lets see if we can support below queries.
numColumnsToIndex = 1
numPartitions = 100
query: select count(*) from tbl where dt >= '2025-01-01' and dt <= '2025-01-31' and tenantId = '100000000'
we can keep this P1.
Ideally, tenantId w/n each partition will be clustered.
but the spread of each tenant could be different.
2025-01-01 : 10k fgs.
- t1: 2 fgs
- t2,.... t10: fg3
- t11: fg3...fg10
- ..
- .
for Friday, lets just focus on general benchmarking script deliverable.
| @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true) | ||
| public Integer numPartitions = 1; | ||
|
|
||
| @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false) |
There was a problem hiding this comment.
lets try to get this working:
initial setup:
just create data table w/o any metadata table.
from tests:
generate data
ingest into mdt which will also initialize. that way, we get hfiles in mdt directly.
| @Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true) | ||
| public Integer numPartitions = 1; | ||
|
|
||
| @Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false) |
| } catch (IOException ioe) { | ||
| throw new HoodieIOException(ioe.getMessage(), ioe); | ||
| } | ||
| log.warn("Skipping reconcile markers for instant: {}", instantTs); |
There was a problem hiding this comment.
this is P1. and on me (siva)
| filesPartitionExists, | ||
| metadataMetaClient.getTableConfig().getMetadataPartitions()); | ||
|
|
||
| if (!filesPartitionExists) { |
There was a problem hiding this comment.
ok. if this helps w/initializing FILES directly w/ first commit from benchmarking tool, we can leave it as is.
| // Generate column stats records | ||
| @SuppressWarnings("rawtypes") | ||
| Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> expectedStats = new HashMap<>(); | ||
| List<HoodieRecord<HoodieMetadataPayload>> columnStatsRecords = generateColumnStatsRecordsForCommitMetadata( |
There was a problem hiding this comment.
make this P1. lets focus on other feedback and come to this later
| Comparable minValue; | ||
| Comparable maxValue; | ||
|
|
||
| if (colIdx == 0) { |
There was a problem hiding this comment.
cardinality of tenantId is 25 to 30k.
so, lets rename salary -> tenantId.
generate random long within 30k values.
and from top level config,
lets accept 1 or 2 as numColumnsToIndex.
if 1 -> tenantId
if 2 -> tenantId & (either of salary or age)
There was a problem hiding this comment.
vamsi suggestion: use age (so that we have one with high cardinality / one w/ low cardinality)
|
Deliverable by Friday:
|
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist