Skip to content

Conversation

@PhongChuong
Copy link
Collaborator

@PhongChuong PhongChuong commented Dec 12, 2025

Refactor by:

  • moving batch size logic from Worker to OutstandingPublishes
  • simplify Batch by passing initial message size instead of the topic string
  • simplify Batch by returning a JoinHandle instead of appending to inflight FuturesUnordered data structure

Towards: #4012

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Dec 12, 2025
@codecov
Copy link

codecov bot commented Dec 12, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 95.28%. Comparing base (07c6f2c) to head (2b79566).
⚠️ Report is 27 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4051   +/-   ##
=======================================
  Coverage   95.27%   95.28%           
=======================================
  Files         173      173           
  Lines        6624     6635   +11     
=======================================
+ Hits         6311     6322   +11     
  Misses        313      313           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review December 12, 2025 00:33
@PhongChuong PhongChuong requested a review from a team as a code owner December 12, 2025 00:33
@PhongChuong PhongChuong requested a review from suzmue December 12, 2025 00:34
Copy link
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update the commit message description to describe a bit about the change that you are making (what has changed / why). Thanks!

// all of those resolve as well.
let mut flushing = std::mem::take(&mut inflight);
while flushing.next().await.is_some() {}
outstanding.flush(self.client.clone(), self.topic_name.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to me like it has changed the behavior so that it is no longer waiting on all of the batches to have returned, which is a regression. Flush just sends the existing batch, but it doesn't wait on any of those to resolve.

Notably this doesn't appear to be caught in the tests so if we can add a test for this too that would be great.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a misunderstanding on flush on my part. In my head, flush is to signal to the publisher to send out all the pending batches/publish but not await until it finishes.

If the contract of flush is to await, then are we waiting for all pending publishes? Or just the pending batch?

},
Some(ToWorker::Flush(tx)) => {
// TODO(#4012): To guarantee ordering, we should wait for the
// inflight batch to complete so that messages are publish in order.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment feels like it belongs within OutstandingPublish NOT here, since this is no longer responsible for deciding when to send the message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment regarding moving inflight back into Worker.

In that case, I think having the comment here makes more sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think this main loop is handling keeping track of which batches have outstanding publishes. But since this is a TODO, I am fine leaving the comment here. But it seems likely this sort of decision should be handled by the OutstandingPublish.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not handle outstanding publish yet as this is simply a refactoring.

Regarding where to handle awaiting the inflight batch, I actually think it is better handled by the Worker as it has it avoids spawning additional tasks in OutstandingPublishes to push pending batches along.

Lets discuss offline outside of this PR.

message_limit: u32,
byte_threshold: u32,
pending_batch: Batch,
inflight: Option<tokio::task::JoinHandle<()>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inflight is unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved inflight back up to Worker. I think having the task control logic purely in Worker is easier to follow then having it distributed across Worker and OutstandingPublishes.
PTAL.

@PhongChuong
Copy link
Collaborator Author

@suzmue , Thanks for the review.
I've added inflight back to worker as having all the task control logic within Worker turns out to be better.
I think we need to discuss the Flush contract in a bit more detail in regards to what it would await on (pending batch vs pending publishes) or if it should await at all.

PTAL.

},
Some(ToWorker::Flush(tx)) => {
// TODO(#4012): To guarantee ordering, we should wait for the
// inflight batch to complete so that messages are publish in order.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think this main loop is handling keeping track of which batches have outstanding publishes. But since this is a TODO, I am fine leaving the comment here. But it seems likely this sort of decision should be handled by the OutstandingPublish.

msg: BundledMessage,
inflight: &mut FuturesUnordered<tokio::task::JoinHandle<()>>,
) {
self.pending_batch.push(msg);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be in a future PR, but what if adding the msg increases the size > the limit of the size of a single message (not just the configured max size but the actual max size we are allowed to send). I think we should:

  1. flush the existing batch
  2. start a new batch with the message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO.

@PhongChuong PhongChuong merged commit 317b8d2 into googleapis:main Dec 17, 2025
29 checks passed
@PhongChuong PhongChuong deleted the orderingRefactor branch December 17, 2025 16:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants