Skip to content

[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828

Open
pnikic-db wants to merge 4 commits into
apache:masterfrom
pnikic-db:counter-diff-window-function
Open

[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828
pnikic-db wants to merge 4 commits into
apache:masterfrom
pnikic-db:counter-diff-window-function

Conversation

@pnikic-db
Copy link
Copy Markdown
Contributor

@pnikic-db pnikic-db commented May 12, 2026

What changes were proposed in this pull request?

This pull request proposes the addition of a new window function, counter_diff, which would be used to convert cumulative counters to delta format by computing the differences between consecutive values. The function would include special handling for counter resets, when the cumulative value gets reset to zero.

Syntax

counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER BY order_exprs)

Arguments

  • value: A cumulative counter. Must be numeric and non-negative.
  • start_time: An optional timestamp parameter which indicates when the counter was last set to zero. It is used to better detect counter resets.
  • partition_exprs: Used to separate independent counters. Good partitioning columns would be the metric name, as well as any attributes tied to the metric.
  • order_exprs: Used to order the rows. Should be the observation timestamp in ascending order.

Example

SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
FROM VALUES
  ('http_requests', TIMESTAMP '2026-01-01T00:00:00', 100),
  ('http_requests', TIMESTAMP '2026-01-01T00:01:00', 200),
  ('http_requests', TIMESTAMP '2026-01-01T00:02:00', 400)
  AS tab (m, t, c)
m t c diff
http_requests 2026-01-01 00:00:00 100 NULL
http_requests 2026-01-01 00:01:00 200 100
http_requests 2026-01-01 00:02:00 400 200

Why are the changes needed?

Counters are metrics with monotonically increasing values. One example is the number of HTTP requests processed on a server. With each request, the counter increases. Counters can be represented in two temporalities: cumulative or delta:

  • With delta temporality, each observation represents the increase of the counter since the last observation.
  • With cumulative temporality, each observation represents the total accumulated value of the counter.
    • With cumulative counters, it is possible for the counter to reset to zero, for example when a restart occurs.

The cumulative representation is typically better for storage and transmission, as it is handles missed observations better.

  • For delta counters, if a single observation is lost, the increase is lost from the total counter value.
  • For cumulative counters, the observation is lost, but the total counter value does not decrease.

However, the delta representation is required for performing analytics on the metric, as they can be aggregated and bucketized.

counter_diff reduces the gap between these two representations. Given a cumulative counter, it computes the differences between consecutive values, resulting in the equivalent delta representation for the counter, which can be used in further analysis.

Does this PR introduce any user-facing change?

The counter_diff window function is a new function.

How was this patch tested?

A new test, counter-diff.sql, has been added with various SQL queries involving counter_diff and their expected outputs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code with Claude Opus 4.7

@pnikic-db pnikic-db force-pushed the counter-diff-window-function branch from 96d71a6 to a29e7d4 Compare May 12, 2026 16:31
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This PR adds a new SQL/Scala/Python window function counter_diff that converts monotonically increasing cumulative counters into delta form, with built-in handling of counter resets (value decrease, or optional start_time advance) and negative-value validation.

Prior state and problem. counter_diff is expressible today with c - lag(c) IGNORE NULLS OVER (...) plus user-written CASE/error logic, but the SQL gets verbose and easy to get wrong — especially when NULL counters and start_time interact (which row defines the baseline?). This PR consolidates that into one window function with consistent semantics.

Design approach. A new AggregateWindowFunction (CounterDiffBase + two case-class subclasses, dispatched by CounterDiffExpressionBuilder via a sentinel DefaultStartTime and an eq-identity check). It inherits the fixed ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame and carries prevCounter/currCounter (plus prevStartTime/currStartTime for the 2-arg form) in the agg buffer. Rows with NULL counter are excluded from baseline tracking so the next non-NULL row diffs against the most recent non-NULL prev. This is more efficient than expanding into multiple lag window columns via an analyzer rewrite, at the cost of carrying a custom expression class.

Key design decisions.

  • Two distinct case classes (CounterDiff / CounterDiffWithStartTime) for the 1-arg / 2-arg forms, justified by the different buffer sizes (2 attrs vs 4). Centralizing into a single class with Option[Expression] startTime would reduce duplication but at the cost of conditional buffer construction — defensible either way.
  • Negative counter is a hard runtime error (COUNTER_DIFF_NEGATIVE_COUNTER_VALUE) with no permissive variant. The 2-arg form additionally errors on start_time decrease (COUNTER_DIFF_START_TIME_DECREASED). Asymmetry between start_time decrease (error) vs advance (NULL reset) is intentional.
  • DECIMAL subtraction uses a new internal DecimalSubtractNoOverflowCheck (mirror of DecimalAddNoOverflowCheck) that keeps the result in the input precision/scale. Sound because the curr < 0 and curr < prev guards fire before the subtraction, so the result is always in [0, curr] and fits.
  • NULL start_time on a non-NULL-counter row is treated as "skip the reset check on this row" — all start_time comparisons evaluate to NULL and fall through to the regular diff path. A NULL counter on a row whose start_time changes absorbs that change, and the next non-NULL row sees the cumulative shift from the last non-NULL row. The golden file covers these subtle interactions.

Implementation sketch. New sql/catalyst/.../CounterDiff.scala carries the expression classes and the builder. FunctionRegistry adds the entry after the other window functions. Public-API additions go to sql/api/.../functions.scala, python/pyspark/sql/functions/builtin.py, and python/pyspark/sql/connect/functions/builtin.py. Tests: SQL golden file counter-diff.sql, a test in DataFrameWindowFunctionsSuite, and a Python test in test_functions.py. The expression-schema doc and Python rst/__init__ are also updated.

OSS Spark applicability. PR is filed against apache/spark directly — N/A.

General notes (couldn't anchor to a diff line)

  • python/pyspark/sql/tests/connect/test_connect_function.py::test_window_functions (lines 728-749) runs every window function side-by-side through Connect and classic. Worth adding (CF.counter_diff("c"), SF.counter_diff("c")) and a 2-arg variant there to cover the Connect path that python/pyspark/sql/connect/functions/builtin.py adds.

expressions match {
case Seq(value) =>
CounterDiff(value)
case Seq(value, startTime) if startTime eq DefaultStartTime =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatch between CounterDiff and CounterDiffWithStartTime relies on eq reference identity against the singleton DefaultStartTime. This works because FunctionRegistry calls rearrange and build back-to-back with no intervening transform, and defaultRearrange inserts the same instance verbatim. Worth a short code comment so a future reader (and anyone adding a transform between rearrange and build) understands the invariant: eq rather than == is intentional, because Literal.create(null, NullType) is structurally equal to a user-typed counter_diff(c, NULL) argument but should NOT collapse to the 1-arg form.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment has been added which explains the missing single-parameter form, the use of DefaultStartTime and eq and the signature. If the comment should go into more details, please let me know.

Comment thread sql/api/src/main/scala/org/apache/spark/sql/functions.scala Outdated
Comment thread python/pyspark/sql/functions/builtin.py Outdated
* Add counter_diff Connect + classic test
* Remove "optional" from counter_diff with start time doc comment
* Fix incorrect counter_diff parameter name in PySpark docstring
* Remove redundant "nullable" override for counter_diff
* Improve expression description for counter_diff for the NULL counter case
* Remove single-parameter case from counter_diff match and explain reasoning
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status

6 addressed, 0 remaining, 3 new (all 3 late catches from my prior review).

All six findings from the prior review are addressed in Address counter_diff PR comments. The three follow-up nits below are all things I should have flagged in the first pass — apologies for the late catches. None are blockers, so approving; happy to leave them as drive-by improvements.

/**
* The SQL representation of the single-parameter form of the counter_diff function.
*/
override def sql: String = s"${prettyName}(${counter.sql})"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Late catch — apologies for not flagging this last round.) This override produces the same string as the default Expression.sql (which returns ${prettyName}(${children.map(_.sql).mkString(", ")})). Since children = Seq(counter), the default already yields counter_diff(<counter.sql>). Other window functions only override sql when they need to inject non-child state — e.g. EWM exposes $alpha/$ignoreNA, NthValue appends " ignore nulls". counter_diff has no such extras; this line can be dropped.

Suggested change
override def sql: String = s"${prettyName}(${counter.sql})"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the sql override from both counter_diff expressions. Thanks!

Comment on lines +280 to +281
override def sql: String =
s"${prettyName}(${counter.sql}, ${startTime.sql})"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the 1-arg form above — children = Seq(counter, startTime) so the default Expression.sql already produces counter_diff(<counter.sql>, <startTime.sql>). timeZoneId is not a child, but it doesn't appear in this SQL string anyway. The override is redundant.

Suggested change
override def sql: String =
s"${prettyName}(${counter.sql}, ${startTime.sql})"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the sql override from both counter_diff expressions. Thanks!

Comment on lines +250 to +254
/**
* A subtract expression for decimal values which is only used internally.
*
* Note that, this expression does not check overflow which is different from `Subtract`.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Late catch.) Compared to the sibling DecimalAddNoOverflowCheck Scaladoc just above, this one is missing the rationale for why skipping the overflow check is safe. The Add version explains the UnsafeRowWriter safety net; the new Subtract class has a different safety argument (callers pre-filter left >= right >= 0), and right now that argument only lives at the call site in CounterDiff.scala. A reader landing on this class in isolation can't tell why it's sound. Worth expanding, e.g.:

Suggested change
/**
* A subtract expression for decimal values which is only used internally.
*
* Note that, this expression does not check overflow which is different from `Subtract`.
*/
/**
* A subtract expression for decimal values which is only used internally.
*
* Note that, this expression does not check overflow which is different from `Subtract`.
* It is the caller's responsibility to ensure that the result fits in the declared
* precision and scale. For example, `counter_diff` only invokes this on operands that
* have already been validated to satisfy `left >= right >= 0`, so the result is
* non-negative and bounded above by `left`.
*/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the explanation you've proposed to the Scaladoc, which clearly explains why skipping the overflow check is safe for counter_diff. Thanks!

Comment on lines +1976 to +1984
rows = (
df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d"))
.orderBy("t")
.collect()
)
self.assertEqual(
[(r.t, r.d) for r in rows],
[(1, None), (2, 100), (3, 200), (4, None), (5, 100)],
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Late catch.) This 2-arg branch doesn't actually exercise the 2-arg form. At row 4 (2024-01-02, c=50) the start_time advances AND the counter decreases, so both reset paths fire together — the 2-arg form returns NULL at row 4 because of either condition, and the expected output is byte-identical to the 1-arg form's expected output above. If you removed all start_time logic from CounterDiffWithStartTime, this test would still pass.

Compare to DataFrameWindowFunctionsSuite.scala:835-857, where the two forms produce different outputs (Row(2, null) only in the 2-arg form because the start_time advances between rows 1 and 2 while the counter is still increasing) — that test does discriminate.

Suggestion: shift the start_time advance to a row where the counter does not decrease, so the 2-arg form returns NULL while the 1-arg form returns a positive diff:

Suggested change
rows = (
df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d"))
.orderBy("t")
.collect()
)
self.assertEqual(
[(r.t, r.d) for r in rows],
[(1, None), (2, 100), (3, 200), (4, None), (5, 100)],
)
rows = (
df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d"))
.orderBy("t")
.collect()
)
self.assertEqual(
[(r.t, r.d) for r in rows],
[(1, None), (2, 100), (3, None), (4, None), (5, 100)],
)

...combined with shifting row 3's st to datetime.datetime(2024, 1, 2) in the data fixture so the start_time advance happens between rows 2 and 3 (while the counter is still increasing from 200 to 400). test_connect_function.py::test_window_functions separately covers Connect↔Spark equivalence on a fuller dataset, but the unit test for the function itself should still be the place that distinguishes the two forms.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the test to include both reset types separately: one for counter value decrease and one for start time increase. Thanks!

pnikic-db added 2 commits May 14, 2026 12:08
* Remove sql overrides
* Provide reason for adding DecimalSubtractNoOverflowCheck
* Improve counter_diff with start time PySpark test
* Improve NULL start time description and golden file test
* Improve golden file test comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants