Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
c2662a2
Gossip ingester status
guilload Jan 24, 2026
aab1d8f
Update ingester pool when status changes
guilload Jan 25, 2026
99584f4
Rebalance shards when IngesterStatus changes
ncoiffier-celonis Feb 26, 2026
a1d2a6d
Fix timeout_after being 0, causing to not wait for ingester status pr…
ncoiffier-celonis Feb 26, 2026
03bbb79
Also refresh the ingester pool when an ingester status has changed
ncoiffier-celonis Feb 26, 2026
c7c3609
Add integration test
ncoiffier-celonis Feb 16, 2026
eb38d13
Make setup_ingester_pool and setup_indexer_pool a bit more uniform
ncoiffier-celonis Mar 2, 2026
5ecffd5
make fix
ncoiffier-celonis Mar 2, 2026
6bebf0d
Instrument rebalance_shards calls
ncoiffier-celonis Mar 3, 2026
0c1c82b
Unified ClusterNode::for_test_with_ingester_status into ClusterNode::…
ncoiffier-celonis Mar 5, 2026
7716f56
Remove duplicated readiness check on ingester status
ncoiffier-celonis Mar 5, 2026
e02cc5e
Refactor the ClusterSandbox to add the possibility to dynamically add…
ncoiffier-celonis Mar 5, 2026
8815224
Ensure the shard is created on the indexer that we shutdown in test_g…
ncoiffier-celonis Mar 5, 2026
678bef5
Don't trigger a rebalance for Add(ready, IngesterStatus::Initializing…
ncoiffier-celonis Mar 6, 2026
ad6f9de
Don't refresh the indexer pool when an indexer update its ingester st…
ncoiffier-celonis Mar 6, 2026
84880a1
The ingester_pool should contains all ingesters, not only the ready ones
ncoiffier-celonis Mar 6, 2026
c21aaee
Debug message when no shards to rebalance
ncoiffier-celonis Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use quickwit_indexing::models::{
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId};
Expand Down Expand Up @@ -936,8 +937,9 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
enabled_services: HashSet::new(),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_cpu_capacity: CpuCapacity::zero(),
indexing_tasks: Vec::new(),
indexing_cpu_capacity: CpuCapacity::zero(),
ingester_status: IngesterStatus::default(),
availability_zone: None,
};
let client_grpc_config = make_client_grpc_config(&config.grpc_config)?;
Expand Down
24 changes: 15 additions & 9 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use crate::member::NodeStateExt;
#[derive(Debug, Clone)]
pub enum ClusterChange {
Add(ClusterNode),
Update(ClusterNode),
Update {
previous: ClusterNode,
updated: ClusterNode,
},
Remove(ClusterNode),
}

Expand Down Expand Up @@ -246,7 +249,10 @@ async fn compute_cluster_change_events_on_updated(
);
Some(ClusterChange::Remove(updated_node))
} else if previous_node.is_ready() && updated_node.is_ready() {
Some(ClusterChange::Update(updated_node))
Some(ClusterChange::Update {
previous: previous_node,
updated: updated_node,
})
} else {
None
}
Expand Down Expand Up @@ -681,16 +687,16 @@ pub(crate) mod tests {
.await
.unwrap();

let ClusterChange::Update(node) = event else {
let ClusterChange::Update { updated, .. } = event else {
panic!("expected `ClusterChange::Remove` event, got `{event:?}`");
};
assert_eq!(node.chitchat_id(), &updated_chitchat_id);
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
assert_eq!(updated.chitchat_id(), &updated_chitchat_id);
assert_eq!(updated.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!updated.is_self_node());
assert!(updated.is_ready());
assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&node
&updated
);
}
{
Expand Down Expand Up @@ -1009,7 +1015,7 @@ pub(crate) mod tests {
.await;
assert_eq!(events.len(), 1);

let ClusterChange::Update(_node) = events[0].clone() else {
let ClusterChange::Update { .. } = events[0].clone() else {
panic!(
"Expected `ClusterChange::Update` event, got `{:?}`",
events[0]
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ pub async fn create_cluster_for_test_with_id(
self_node_readiness: bool,
) -> anyhow::Result<Cluster> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
use quickwit_proto::ingest::ingester::IngesterStatus;
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], gossip_advertise_port).into();
let self_node = ClusterMember {
node_id,
Expand All @@ -707,6 +708,7 @@ pub async fn create_cluster_for_test_with_id(
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
indexing_tasks: Vec::new(),
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
ingester_status: IngesterStatus::default(),
availability_zone: None,
};
let failure_detector_config = create_failure_detector_config_for_test();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use quickwit_common::tower::ClientGrpcConfig;
use quickwit_config::service::QuickwitService;
use quickwit_config::{GrpcConfig, NodeConfig, TlsConfig};
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::tonic::transport::{Certificate, ClientTlsConfig, Identity};
use time::OffsetDateTime;

Expand Down Expand Up @@ -143,6 +144,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
grpc_advertise_addr: node_config.grpc_advertise_addr,
indexing_tasks,
indexing_cpu_capacity,
ingester_status: IngesterStatus::default(),
availability_zone: node_config.availability_zone.clone(),
};
let failure_detector_config = FailureDetectorConfig {
Expand Down
19 changes: 18 additions & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::str::FromStr;

use anyhow::Context;
use chitchat::{ChitchatId, NodeState, Version};
use quickwit_common::shared_consts::INGESTER_STATUS_KEY;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeId;
use tracing::{error, warn};

Expand Down Expand Up @@ -47,6 +49,8 @@ pub(crate) trait NodeStateExt {

fn size_bytes(&self) -> usize;

fn ingester_status(&self) -> IngesterStatus;

fn availability_zone(&self) -> Option<String>;
}

Expand Down Expand Up @@ -79,6 +83,12 @@ impl NodeStateExt for NodeState {
.sum()
}

fn ingester_status(&self) -> IngesterStatus {
self.get(INGESTER_STATUS_KEY)
.and_then(IngesterStatus::from_json_str_name)
.unwrap_or_default()
}

fn availability_zone(&self) -> Option<String> {
self.get(AVAILABILITY_ZONE_KEY).map(|az| az.to_string())
}
Expand Down Expand Up @@ -108,6 +118,10 @@ pub struct ClusterMember {
pub indexing_tasks: Vec<IndexingTask>,
/// Indexing cpu capacity of the node expressed in milli cpu.
pub indexing_cpu_capacity: CpuCapacity,
/// Status of the ingester service running on the node. `IngesterStatus::Unspecified` if the
/// node is not an ingester.
pub ingester_status: IngesterStatus,
/// Whether the node is ready to serve requests.
pub is_ready: bool,
/// Availability zone the node is running in, if enabled.
pub availability_zone: Option<String>,
Expand Down Expand Up @@ -159,10 +173,12 @@ pub(crate) fn build_cluster_member(
.map(|enabled_services_str| {
parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id)
})?;
let availability_zone = node_state.availability_zone();
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state);
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let ingester_status = node_state.ingester_status();
let availability_zone = node_state.availability_zone();

let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
generation_id: chitchat_id.generation_id.into(),
Expand All @@ -172,6 +188,7 @@ pub(crate) fn build_cluster_member(
grpc_advertise_addr,
indexing_tasks,
indexing_cpu_capacity,
ingester_status,
availability_zone,
};
Ok(member)
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use chitchat::{ChitchatId, NodeState};
use quickwit_config::service::QuickwitService;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeIdRef;
use tonic::transport::Channel;

Expand All @@ -46,6 +47,7 @@ impl ClusterNode {
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_cpu_capacity,
ingester_status: member.ingester_status,
is_ready: member.is_ready,
is_self_node,
};
Expand All @@ -62,7 +64,9 @@ impl ClusterNode {
is_self_node: bool,
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
ingester_status: IngesterStatus,
) -> Self {
use quickwit_common::shared_consts::INGESTER_STATUS_KEY;
use quickwit_common::tower::{ClientGrpcConfig, make_channel};

use crate::cluster::set_indexing_tasks_in_node_state;
Expand All @@ -75,6 +79,7 @@ impl ClusterNode {
let mut node_state = NodeState::for_test();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());
node_state.set(INGESTER_STATUS_KEY, ingester_status.as_json_str_name());
set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state);
Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap()
}
Expand Down Expand Up @@ -125,6 +130,10 @@ impl ClusterNode {
self.inner.indexing_capacity
}

pub fn ingester_status(&self) -> IngesterStatus {
self.inner.ingester_status
}

pub fn is_ready(&self) -> bool {
self.inner.is_ready
}
Expand Down Expand Up @@ -163,6 +172,7 @@ struct InnerNode {
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: CpuCapacity,
ingester_status: IngesterStatus,
is_ready: bool,
is_self_node: bool,
}
5 changes: 4 additions & 1 deletion quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ pub fn split_deletion_grace_period() -> Duration {
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
/// Key prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// Key used in chitchat to broadcast the status of an ingester.
pub const INGESTER_STATUS_KEY: &str = "ingester.status";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ where
.collect()
}

/// Returns all the key-value pairs in the pool.
pub fn keys_values(&self) -> Vec<(K, V)> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}

/// Returns all the values in the pool.
pub fn values(&self) -> Vec<V> {
self.pool
Expand Down
Loading