Rebalance shards when ingester status changes#6185
Rebalance shards when ingester status changes#6185ncoiffier-celonis wants to merge 11 commits intoquickwit-oss:mainfrom
Conversation
technically the ingest router should just retry when that happens and there should be a path for the router to open a new shard if the ingester being decommissioned was the only one to have shard(s) for this index. Is it not what you observed? |
|
@ncoiffier-celonis I giving you write access to the repo so next time you can push directly on this repo rather than our fork. It makes it easier for me to checkout your changes locally. Though I learnt how to use |
guilload
left a comment
There was a problem hiding this comment.
I think we're close but we need to fix a few issues.
| } | ||
|
|
||
| #[cfg(any(test, feature = "testsuite"))] | ||
| pub async fn for_test_with_ingester_status( |
There was a problem hiding this comment.
nit: seems a bit overkill to me
There was a problem hiding this comment.
Addressed with 0c1c82b: I unified ClusterNode::for_test_with_ingester_status into ClusterNode::for_test
| pub struct IngestController { | ||
| ingester_pool: IngesterPool, | ||
| pub(crate) ingester_pool: IngesterPool, | ||
| pub(crate) stats: IngestControllerStats, |
| let Some(mailbox) = weak_mailbox.upgrade() else { | ||
| return; | ||
| }; | ||
| let mut trigger_rebalance = false; |
There was a problem hiding this comment.
@ncoiffier-celonis please review this tricky logic thoroughly. I'm the initial author of this change and now I'm also reviewing it so I'm more likely to miss something. I could use a second pair of eyes.
There was a problem hiding this comment.
Yeah this logic def needs a comment. Here, we're considering both indexers and ingesters. Indexers run indexing pipelines when they're ready, they are ready to index, so we want to rebuild an index plan. Same thing when they leave.
In addition, we're considering ingesters (technically all indexers are ingesters and vice-versa because we didn't want to expose users to a new service (service as metastore, janitor, control-plane, etc. not micro service as router, ingester, debug info, etc.)
Ingesters have two level of readiness. First one same as indexer, "I'm up and running, I can connect to the metastore". Second one, "I have loaded my WAL".
So we want to rebalance when the ingester is ready ready, which can happens from the perspective of the stream of events as:
- Add(ready, ready)
OR
- Add(ready, not ready)
- Update(ready, ready)
The logic below tries to implement that.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_wait_for_ingester_decommission_elapsed_timeout_not_zero() { |
| // Ingest docs with auto-commit. With a 5s commit timeout, these documents | ||
| // sit uncommitted in the ingesters' WAL - exactly the in-flight state we | ||
| // want to exercise during draining. | ||
| ingest( |
There was a problem hiding this comment.
How do we know the shard for this index is always go to be created on the indexer that we're about to shutdown?
| /// Tests that the graceful shutdown sequence works correctly in a multi-indexer | ||
| /// cluster: shutting down one indexer does NOT cause 500 errors or data loss, | ||
| /// and the cluster eventually rebalances. see #6158 | ||
| #[tokio::test] |
There was a problem hiding this comment.
Very very nice! Let's make sure this is not flaky, though. Run it 1,000 times! This is how I do it (fish):
while true
c t --manifest-path quickwit/Cargo.toml -p quickwit-integration-tests --nocapture -- test_graceful_shutdown_no_data_loss
end| Ok((ingest_router, ingest_router_service, ingester_opt)) | ||
| } | ||
|
|
||
| fn setup_ingester_pool( |
There was a problem hiding this comment.
Same here, we need to be extremely careful about this convoluted logic.
There was a problem hiding this comment.
Now that I've thought more about this, I think we have an issue with this logic. This creates a pool of write-only ingesters, which is great for the logic in quickwit-ingest, but in quickwit-indexing, the source also holds an ingester pool and we still want to be able to read and truncate from ingesters when they are in the retiring and decommissioning status. I don't think we want to actually create and mange those distinct pools so we need to maybe restrict this pool to not initializing ingesters and push the additional filtering logic whereever needed (router, control plane).
quickwit/quickwit-serve/src/lib.rs
Outdated
| if let Some(ingester) = &ingester_opt { | ||
| if let Ok(status) = try_get_ingester_status(ingester).await { | ||
| status != IngesterStatus::Failed | ||
| } else { | ||
| // If we couldn't get the ingester status, it's not looking good, so we set | ||
| // the node to not ready. | ||
| false | ||
| } | ||
| } else { | ||
| true | ||
| } |
There was a problem hiding this comment.
| if let Some(ingester) = &ingester_opt { | |
| if let Ok(status) = try_get_ingester_status(ingester).await { | |
| status != IngesterStatus::Failed | |
| } else { | |
| // If we couldn't get the ingester status, it's not looking good, so we set | |
| // the node to not ready. | |
| false | |
| } | |
| } else { | |
| true | |
| } |
Feels like this logic does not need to be duplicated. Brainfart on my end? WDYT?
| ); | ||
| Some(change) | ||
| } | ||
| ClusterChange::Add(node) | ClusterChange::Update { updated: node, .. } |
There was a problem hiding this comment.
This is going to update the pool each time a chitchat key value changes. This is not buying us anything and will generate some noisy logs.
|
@nadav-govari, I need your eyes on this because:
|
81e493d to
6bebf0d
Compare
|
(I took the liberty to force-push after signing all the individual commits, no code change) |
Description
Attempt to fix #6158
Following @guilload's suggestion here, this PR:
no open shard found on ingestererror)With this approach, even if we have some 10s propagation delay before decomissioning, it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.
Any feedback is welcome!!
How was this PR tested?
In addition of the unit and integration tests, I've run it against a local cluster with 2 indexer and observed that the number of errors reported in #6158 decreases from a few 100 to no errors.
Other approches
This PR is fairly identical to the branch
guilload/ingester-status, rebased onmainand with some additional bugfixes: