Skip to content

MDT Test framework without writing data files#17796

Open
vamsikarnika wants to merge 20 commits intoapache:masterfrom
vamsikarnika:mdt_stats_tool
Open

MDT Test framework without writing data files#17796
vamsikarnika wants to merge 20 commits intoapache:masterfrom
vamsikarnika:mdt_stats_tool

Conversation

@vamsikarnika
Copy link
Copy Markdown
Collaborator

Describe the issue this Pull Request addresses

Summary and Changelog

Impact

Risk Level

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:XL PR with lines of changes > 1000 label Jan 7, 2026
initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
break;
case PARTITION_STATS:
// For PARTITION_STATS, COLUMN_STATS should also be enabled
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets get this taken care. lets not comment out any code.
but introduce a config and disable it in our benchmarking script.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can try reverting this commit if that helps

#14165

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) {
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. lets revert these changes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
log.warn("Skipping reconcile markers for instant: {}", instantTs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were we not able to understand why this was deleting the data or log files?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is P1. and on me (siva)


import scala.collection.JavaConverters;

public class HoodieMDTStats implements Closeable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataBenchmarkingTool

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true)
public Integer numPartitions = 1;

@Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do incremental commits differently.
for instance,
to bootstrap, we can offer a top level config for num files.
and for incremental we can offer a diff config.

something like
add 1M files in first batch.
and then 5000 files in each incremental batch.
we don't need this right now. but in a few days, once get the benchmarking tool in ready to use state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for our first deliverable, 1 commit would suffice.
but we need to ensure all files in MDT are hfiles (i..e base files) and not log files.

we can enhance for more number of commits next week.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets try to get this working:

initial setup:
just create data table w/o any metadata table.

from tests:
generate data
ingest into mdt which will also initialize. that way, we get hfiles in mdt directly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep this as P1.

spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode", "strict");

// Create schema with the columns used for data skipping
StructType dataSchema = new StructType()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have declared the schema in 3 places.
can we declare once and reuse it wherever required

LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString());

dataFilters.add(filter1);
// Expression filter2 = org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets clean this up

scala.collection.Seq<Expression> partitionFiltersSeq = partitionFiltersList;

// Call filterFileSlices
scala.collection.Seq<scala.Tuple2<scala.Option<org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the timer here?
are we not interested in measuring the read latency (just the planning) in this case?

LOG.info(String.join("", Collections.nCopies(100, "-")));

int totalFileSlices = 0;
for (int j = 0; j < filteredSlices.size(); j++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets just print the total file slices we get after filtering.
for verbose output, we should add additional top level config.

}

public static class Config implements Serializable {
@Parameter(names = {"--table-base-path", "-tbp"}, description = "Number of columns to index", required = true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets fix the description to be in line w/ the config.
for all configs

Copy link
Copy Markdown
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the query side,
lets see if we can support below queries.

numColumnsToIndex = 1
numPartitions = 100

query: select count(*) from tbl where dt >= '2025-01-01' and dt <= '2025-01-31' and tenantId = '100000000'

we can keep this P1.

Ideally, tenantId w/n each partition will be clustered.
but the spread of each tenant could be different.
2025-01-01 : 10k fgs.

  • t1: 2 fgs
  • t2,.... t10: fg3
  • t11: fg3...fg10
  • ..
  • .

for Friday, lets just focus on general benchmarking script deliverable.

@Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true)
public Integer numPartitions = 1;

@Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets try to get this working:

initial setup:
just create data table w/o any metadata table.

from tests:
generate data
ingest into mdt which will also initialize. that way, we get hfiles in mdt directly.

@Parameter(names = {"--num-partitions", "-np"}, description = "Target Base path for the table", required = true)
public Integer numPartitions = 1;

@Parameter(names = {"--files-per-commit", "-fpc"}, description = "Number of files to create per commit. If not specified or >= num-files, all files will be in one commit", required = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep this as P1.

} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
log.warn("Skipping reconcile markers for instant: {}", instantTs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is P1. and on me (siva)

filesPartitionExists,
metadataMetaClient.getTableConfig().getMetadataPartitions());

if (!filesPartitionExists) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. if this helps w/initializing FILES directly w/ first commit from benchmarking tool, we can leave it as is.

// Generate column stats records
@SuppressWarnings("rawtypes")
Map<String, Map<String, HoodieColumnRangeMetadata<Comparable>>> expectedStats = new HashMap<>();
List<HoodieRecord<HoodieMetadataPayload>> columnStatsRecords = generateColumnStatsRecordsForCommitMetadata(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this P1. lets focus on other feedback and come to this later

Comparable minValue;
Comparable maxValue;

if (colIdx == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cardinality of tenantId is 25 to 30k.
so, lets rename salary -> tenantId.
generate random long within 30k values.

and from top level config,
lets accept 1 or 2 as numColumnsToIndex.
if 1 -> tenantId
if 2 -> tenantId & (either of salary or age)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vamsi suggestion: use age (so that we have one with high cardinality / one w/ low cardinality)

@nsivabalan
Copy link
Copy Markdown
Contributor

Deliverable by Friday:

  • Focus on just 1 commit to mdt. we need hfiles in latest file slices of MDT(files and col stats). so that we can measure best possible read latencies for query pruning.

  • Ensure we can support date predicate and tenantId predicates in queries.

  • Generate col stats records using spark engine context

  • Benchmarking script should be able to run either of writer or read benchmarks.

  • Lets validate 1M files and 360 partitions. If we run into scale issues, atleast try to find the inclination point. for eg, can we do 100k files.

  • resources:

    • driver: 6 or gb. executors: 4 core 8gb. if not, 3 core 9gb.
  • Disabling partition stats and other feedback comments. pavithran and vamsi to sync up.

@apache apache deleted a comment from hudi-bot Feb 10, 2026
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants