[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828
[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828pnikic-db wants to merge 4 commits into
Conversation
…mulative counters to delta format
96d71a6 to
a29e7d4
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
This PR adds a new SQL/Scala/Python window function counter_diff that converts monotonically increasing cumulative counters into delta form, with built-in handling of counter resets (value decrease, or optional start_time advance) and negative-value validation.
Prior state and problem. counter_diff is expressible today with c - lag(c) IGNORE NULLS OVER (...) plus user-written CASE/error logic, but the SQL gets verbose and easy to get wrong — especially when NULL counters and start_time interact (which row defines the baseline?). This PR consolidates that into one window function with consistent semantics.
Design approach. A new AggregateWindowFunction (CounterDiffBase + two case-class subclasses, dispatched by CounterDiffExpressionBuilder via a sentinel DefaultStartTime and an eq-identity check). It inherits the fixed ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame and carries prevCounter/currCounter (plus prevStartTime/currStartTime for the 2-arg form) in the agg buffer. Rows with NULL counter are excluded from baseline tracking so the next non-NULL row diffs against the most recent non-NULL prev. This is more efficient than expanding into multiple lag window columns via an analyzer rewrite, at the cost of carrying a custom expression class.
Key design decisions.
- Two distinct case classes (
CounterDiff/CounterDiffWithStartTime) for the 1-arg / 2-arg forms, justified by the different buffer sizes (2 attrs vs 4). Centralizing into a single class withOption[Expression]startTime would reduce duplication but at the cost of conditional buffer construction — defensible either way. - Negative counter is a hard runtime error (
COUNTER_DIFF_NEGATIVE_COUNTER_VALUE) with no permissive variant. The 2-arg form additionally errors onstart_timedecrease (COUNTER_DIFF_START_TIME_DECREASED). Asymmetry between start_time decrease (error) vs advance (NULL reset) is intentional. - DECIMAL subtraction uses a new internal
DecimalSubtractNoOverflowCheck(mirror ofDecimalAddNoOverflowCheck) that keeps the result in the input precision/scale. Sound because thecurr < 0andcurr < prevguards fire before the subtraction, so the result is always in[0, curr]and fits. - NULL
start_timeon a non-NULL-counter row is treated as "skip the reset check on this row" — all start_time comparisons evaluate to NULL and fall through to the regular diff path. A NULL counter on a row whosestart_timechanges absorbs that change, and the next non-NULL row sees the cumulative shift from the last non-NULL row. The golden file covers these subtle interactions.
Implementation sketch. New sql/catalyst/.../CounterDiff.scala carries the expression classes and the builder. FunctionRegistry adds the entry after the other window functions. Public-API additions go to sql/api/.../functions.scala, python/pyspark/sql/functions/builtin.py, and python/pyspark/sql/connect/functions/builtin.py. Tests: SQL golden file counter-diff.sql, a test in DataFrameWindowFunctionsSuite, and a Python test in test_functions.py. The expression-schema doc and Python rst/__init__ are also updated.
OSS Spark applicability. PR is filed against apache/spark directly — N/A.
General notes (couldn't anchor to a diff line)
python/pyspark/sql/tests/connect/test_connect_function.py::test_window_functions(lines 728-749) runs every window function side-by-side through Connect and classic. Worth adding(CF.counter_diff("c"), SF.counter_diff("c"))and a 2-arg variant there to cover the Connect path thatpython/pyspark/sql/connect/functions/builtin.pyadds.
| expressions match { | ||
| case Seq(value) => | ||
| CounterDiff(value) | ||
| case Seq(value, startTime) if startTime eq DefaultStartTime => |
There was a problem hiding this comment.
The dispatch between CounterDiff and CounterDiffWithStartTime relies on eq reference identity against the singleton DefaultStartTime. This works because FunctionRegistry calls rearrange and build back-to-back with no intervening transform, and defaultRearrange inserts the same instance verbatim. Worth a short code comment so a future reader (and anyone adding a transform between rearrange and build) understands the invariant: eq rather than == is intentional, because Literal.create(null, NullType) is structurally equal to a user-typed counter_diff(c, NULL) argument but should NOT collapse to the 1-arg form.
There was a problem hiding this comment.
A comment has been added which explains the missing single-parameter form, the use of DefaultStartTime and eq and the signature. If the comment should go into more details, please let me know.
* Add counter_diff Connect + classic test * Remove "optional" from counter_diff with start time doc comment * Fix incorrect counter_diff parameter name in PySpark docstring * Remove redundant "nullable" override for counter_diff * Improve expression description for counter_diff for the NULL counter case * Remove single-parameter case from counter_diff match and explain reasoning
cloud-fan
left a comment
There was a problem hiding this comment.
Status
6 addressed, 0 remaining, 3 new (all 3 late catches from my prior review).
All six findings from the prior review are addressed in Address counter_diff PR comments. The three follow-up nits below are all things I should have flagged in the first pass — apologies for the late catches. None are blockers, so approving; happy to leave them as drive-by improvements.
| /** | ||
| * The SQL representation of the single-parameter form of the counter_diff function. | ||
| */ | ||
| override def sql: String = s"${prettyName}(${counter.sql})" |
There was a problem hiding this comment.
(Late catch — apologies for not flagging this last round.) This override produces the same string as the default Expression.sql (which returns ${prettyName}(${children.map(_.sql).mkString(", ")})). Since children = Seq(counter), the default already yields counter_diff(<counter.sql>). Other window functions only override sql when they need to inject non-child state — e.g. EWM exposes $alpha/$ignoreNA, NthValue appends " ignore nulls". counter_diff has no such extras; this line can be dropped.
| override def sql: String = s"${prettyName}(${counter.sql})" |
There was a problem hiding this comment.
Removed the sql override from both counter_diff expressions. Thanks!
| override def sql: String = | ||
| s"${prettyName}(${counter.sql}, ${startTime.sql})" |
There was a problem hiding this comment.
Same as the 1-arg form above — children = Seq(counter, startTime) so the default Expression.sql already produces counter_diff(<counter.sql>, <startTime.sql>). timeZoneId is not a child, but it doesn't appear in this SQL string anyway. The override is redundant.
| override def sql: String = | |
| s"${prettyName}(${counter.sql}, ${startTime.sql})" |
There was a problem hiding this comment.
Removed the sql override from both counter_diff expressions. Thanks!
| /** | ||
| * A subtract expression for decimal values which is only used internally. | ||
| * | ||
| * Note that, this expression does not check overflow which is different from `Subtract`. | ||
| */ |
There was a problem hiding this comment.
(Late catch.) Compared to the sibling DecimalAddNoOverflowCheck Scaladoc just above, this one is missing the rationale for why skipping the overflow check is safe. The Add version explains the UnsafeRowWriter safety net; the new Subtract class has a different safety argument (callers pre-filter left >= right >= 0), and right now that argument only lives at the call site in CounterDiff.scala. A reader landing on this class in isolation can't tell why it's sound. Worth expanding, e.g.:
| /** | |
| * A subtract expression for decimal values which is only used internally. | |
| * | |
| * Note that, this expression does not check overflow which is different from `Subtract`. | |
| */ | |
| /** | |
| * A subtract expression for decimal values which is only used internally. | |
| * | |
| * Note that, this expression does not check overflow which is different from `Subtract`. | |
| * It is the caller's responsibility to ensure that the result fits in the declared | |
| * precision and scale. For example, `counter_diff` only invokes this on operands that | |
| * have already been validated to satisfy `left >= right >= 0`, so the result is | |
| * non-negative and bounded above by `left`. | |
| */ |
There was a problem hiding this comment.
I've added the explanation you've proposed to the Scaladoc, which clearly explains why skipping the overflow check is safe for counter_diff. Thanks!
| rows = ( | ||
| df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d")) | ||
| .orderBy("t") | ||
| .collect() | ||
| ) | ||
| self.assertEqual( | ||
| [(r.t, r.d) for r in rows], | ||
| [(1, None), (2, 100), (3, 200), (4, None), (5, 100)], | ||
| ) |
There was a problem hiding this comment.
(Late catch.) This 2-arg branch doesn't actually exercise the 2-arg form. At row 4 (2024-01-02, c=50) the start_time advances AND the counter decreases, so both reset paths fire together — the 2-arg form returns NULL at row 4 because of either condition, and the expected output is byte-identical to the 1-arg form's expected output above. If you removed all start_time logic from CounterDiffWithStartTime, this test would still pass.
Compare to DataFrameWindowFunctionsSuite.scala:835-857, where the two forms produce different outputs (Row(2, null) only in the 2-arg form because the start_time advances between rows 1 and 2 while the counter is still increasing) — that test does discriminate.
Suggestion: shift the start_time advance to a row where the counter does not decrease, so the 2-arg form returns NULL while the 1-arg form returns a positive diff:
| rows = ( | |
| df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d")) | |
| .orderBy("t") | |
| .collect() | |
| ) | |
| self.assertEqual( | |
| [(r.t, r.d) for r in rows], | |
| [(1, None), (2, 100), (3, 200), (4, None), (5, 100)], | |
| ) | |
| rows = ( | |
| df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d")) | |
| .orderBy("t") | |
| .collect() | |
| ) | |
| self.assertEqual( | |
| [(r.t, r.d) for r in rows], | |
| [(1, None), (2, 100), (3, None), (4, None), (5, 100)], | |
| ) |
...combined with shifting row 3's st to datetime.datetime(2024, 1, 2) in the data fixture so the start_time advance happens between rows 2 and 3 (while the counter is still increasing from 200 to 400). test_connect_function.py::test_window_functions separately covers Connect↔Spark equivalence on a fuller dataset, but the unit test for the function itself should still be the place that distinguishes the two forms.
There was a problem hiding this comment.
I've changed the test to include both reset types separately: one for counter value decrease and one for start time increase. Thanks!
* Remove sql overrides * Provide reason for adding DecimalSubtractNoOverflowCheck * Improve counter_diff with start time PySpark test
* Improve NULL start time description and golden file test * Improve golden file test comments
What changes were proposed in this pull request?
This pull request proposes the addition of a new window function,
counter_diff, which would be used to convert cumulative counters to delta format by computing the differences between consecutive values. The function would include special handling for counter resets, when the cumulative value gets reset to zero.Syntax
counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER BY order_exprs)Arguments
value: A cumulative counter. Must be numeric and non-negative.start_time: An optional timestamp parameter which indicates when the counter was last set to zero. It is used to better detect counter resets.partition_exprs: Used to separate independent counters. Good partitioning columns would be the metric name, as well as any attributes tied to the metric.order_exprs: Used to order the rows. Should be the observation timestamp in ascending order.Example
Why are the changes needed?
Counters are metrics with monotonically increasing values. One example is the number of HTTP requests processed on a server. With each request, the counter increases. Counters can be represented in two temporalities: cumulative or delta:
The cumulative representation is typically better for storage and transmission, as it is handles missed observations better.
However, the delta representation is required for performing analytics on the metric, as they can be aggregated and bucketized.
counter_diffreduces the gap between these two representations. Given a cumulative counter, it computes the differences between consecutive values, resulting in the equivalent delta representation for the counter, which can be used in further analysis.Does this PR introduce any user-facing change?
The
counter_diffwindow function is a new function.How was this patch tested?
A new test,
counter-diff.sql, has been added with various SQL queries involvingcounter_diffand their expected outputs.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code with Claude Opus 4.7