-
Notifications
You must be signed in to change notification settings - Fork 98
refactor(pubsub): move worker logic into OutstandingPublishes #4051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4051 +/- ##
=======================================
Coverage 95.27% 95.28%
=======================================
Files 173 173
Lines 6624 6635 +11
=======================================
+ Hits 6311 6322 +11
Misses 313 313 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
suzmue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also update the commit message description to describe a bit about the change that you are making (what has changed / why). Thanks!
src/pubsub/src/publisher/worker.rs
Outdated
| // all of those resolve as well. | ||
| let mut flushing = std::mem::take(&mut inflight); | ||
| while flushing.next().await.is_some() {} | ||
| outstanding.flush(self.client.clone(), self.topic_name.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to me like it has changed the behavior so that it is no longer waiting on all of the batches to have returned, which is a regression. Flush just sends the existing batch, but it doesn't wait on any of those to resolve.
Notably this doesn't appear to be caught in the tests so if we can add a test for this too that would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was a misunderstanding on flush on my part. In my head, flush is to signal to the publisher to send out all the pending batches/publish but not await until it finishes.
If the contract of flush is to await, then are we waiting for all pending publishes? Or just the pending batch?
| }, | ||
| Some(ToWorker::Flush(tx)) => { | ||
| // TODO(#4012): To guarantee ordering, we should wait for the | ||
| // inflight batch to complete so that messages are publish in order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment feels like it belongs within OutstandingPublish NOT here, since this is no longer responsible for deciding when to send the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment regarding moving inflight back into Worker.
In that case, I think having the comment here makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think this main loop is handling keeping track of which batches have outstanding publishes. But since this is a TODO, I am fine leaving the comment here. But it seems likely this sort of decision should be handled by the OutstandingPublish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not handle outstanding publish yet as this is simply a refactoring.
Regarding where to handle awaiting the inflight batch, I actually think it is better handled by the Worker as it has it avoids spawning additional tasks in OutstandingPublishes to push pending batches along.
Lets discuss offline outside of this PR.
src/pubsub/src/publisher/worker.rs
Outdated
| message_limit: u32, | ||
| byte_threshold: u32, | ||
| pending_batch: Batch, | ||
| inflight: Option<tokio::task::JoinHandle<()>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inflight is unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved inflight back up to Worker. I think having the task control logic purely in Worker is easier to follow then having it distributed across Worker and OutstandingPublishes.
PTAL.
Co-authored-by: Suzy Mueller <[email protected]>
|
@suzmue , Thanks for the review. PTAL. |
| }, | ||
| Some(ToWorker::Flush(tx)) => { | ||
| // TODO(#4012): To guarantee ordering, we should wait for the | ||
| // inflight batch to complete so that messages are publish in order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think this main loop is handling keeping track of which batches have outstanding publishes. But since this is a TODO, I am fine leaving the comment here. But it seems likely this sort of decision should be handled by the OutstandingPublish.
| msg: BundledMessage, | ||
| inflight: &mut FuturesUnordered<tokio::task::JoinHandle<()>>, | ||
| ) { | ||
| self.pending_batch.push(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be in a future PR, but what if adding the msg increases the size > the limit of the size of a single message (not just the configured max size but the actual max size we are allowed to send). I think we should:
- flush the existing batch
- start a new batch with the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO.
Refactor by:
Towards: #4012