Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4564,6 +4564,20 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.advertise-port
[advertise_port: <int> | default = 7946]

# The cluster label is an optional string to include in outbound packets and
# gossip streams. Other members in the memberlist cluster will discard any
# message whose label doesn't match the configured one, unless the
# 'cluster-label-verification-disabled' configuration option is set to true.
# CLI flag: -memberlist.cluster-label
[cluster_label: <string> | default = ""]

# When true, memberlist doesn't verify that inbound packets and gossip streams
# have the cluster label matching the configured one. This verification should
# be disabled while rolling out the change to the configured cluster label in a
# live memberlist cluster.
# CLI flag: -memberlist.cluster-label-verification-disabled
[cluster_label_verification_disabled: <boolean> | default = false]

# Other cluster members to join. Can be specified multiple times. It can be an
# IP, hostname or an entry specified in the DNS Service Discovery format.
# CLI flag: -memberlist.join
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ ingester:

memberlist:
bind_port: 7946
cluster_label: gossip-demo
join_members:
- localhost:7947
abort_if_cluster_join_fails: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ingester:

memberlist:
bind_port: 7947
cluster_label: gossip-demo
join_members:
- localhost:7946
abort_if_cluster_join_fails: false
Expand Down
4 changes: 3 additions & 1 deletion docs/guides/gossip-ring-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ memberlist:
# defaults to hostname
node_name: "Ingester 1"
bind_port: 7946
cluster_label: "gossip-demo"
join_members:
- localhost:7947
abort_if_cluster_join_fails: false
Expand Down Expand Up @@ -127,9 +128,10 @@ We don't need to change or add `memberlist.join_members` list. This new instance
will discover other peers through it. When using Kubernetes, the suggested setup is to have a headless service pointing to all pods
that want to be part of the gossip cluster, and then point `join_members` to this headless service.

In production, set `memberlist.cluster_label` to the same value on every Cortex process that should share the same gossip cluster. This helps avoid accidentally merging rings with other Cortex, Mimir, or Loki deployments that can reach the same seed addresses.

We also don't need to change `/tmp/cortex/storage` directory in the `blocks_storage.filesystem.dir` field. This is the directory where all ingesters will
"upload" finished blocks. This can also be an S3 or GCP storage, but for simplicity, we use the local filesystem in this example.

After these changes, we can start another Cortex instance using the modified configuration file. This instance will join the ring
and will start receiving samples after it enters the ACTIVE state.

10 changes: 8 additions & 2 deletions docs/guides/migration-kv-store-to-memberlist.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ Update your configuration file and deploy the changes:
ring:
store: memberlist
memberlist:
abort_if_join_fails: false
abort_if_cluster_join_fails: false
bind_port: <gossip-ring-port>
cluster_label: <shared-cluster-label>
cluster_label_verification_disabled: true
join_members:
- gossip-ring.<namespace>.svc.cluster.local:<gossip-ring-port>
...
Expand All @@ -54,6 +56,8 @@ ingester:
> The Memberlist gossip protocol requires a bit of time to propagate the state across the cluster. Setting a 60-second delay ensures that the ingester has enough time to fully sync the existing ring topology from other peers before actively joining and receiving traffic.
>
> **Note:** Make sure to apply this multi KV store configuration to all other components that interact with the ring (e.g. distributors, store-gateways), not just the ingesters.
>
> **Note:** If multiple Cortex, Mimir, or Loki clusters could reach the same gossip seed addresses, configure a shared `memberlist.cluster_label` for your Cortex cluster. For a fresh Memberlist rollout, you can deploy the shared label with `memberlist.cluster_label_verification_disabled: true` and switch verification back to `false` once every memberlist-enabled Cortex process is using the same label. If you are adding labels to an existing unlabeled Memberlist cluster, first roll out `memberlist.cluster_label_verification_disabled: true` everywhere while leaving `memberlist.cluster_label` empty, then roll out the shared label, and finally switch verification back to `false`. This isolates Memberlist traffic only; it does not isolate Consul or Etcd prefixes.

Once deployed, Cortex will begin mirroring primary (Consul) data to Memberlist.

Expand Down Expand Up @@ -87,4 +91,6 @@ ingester:
```
> **Note:** Again, ensure this update is applied across all components.

After the updated configuration is fully deployed across your cluster and everything is running stably, you can remove your Consul cluster.
After the updated configuration is fully deployed across your cluster and everything is running stably, you can remove your Consul cluster.

If you enabled `memberlist.cluster_label_verification_disabled: true` during the migration, finish the rollout by setting it back to `false` once every memberlist-enabled Cortex process is using the same `memberlist.cluster_label`.
110 changes: 110 additions & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,94 @@ func TestSingleBinaryWithMemberlist(t *testing.T) {
})
}

func TestSingleBinaryWithMemberlistClusterLabelIsolation(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

clusterA1 := newSingleBinary("cluster-a-1", "", "", map[string]string{
"-memberlist.cluster-label": "cluster-a",
"-memberlist.abort-if-join-fails": "false",
})
clusterA2 := newSingleBinary("cluster-a-2", "", networkName+"-cluster-a-1:8000", map[string]string{
"-memberlist.cluster-label": "cluster-a",
"-memberlist.abort-if-join-fails": "false",
})
clusterB1 := newSingleBinary("cluster-b-1", "", networkName+"-cluster-a-1:8000", map[string]string{
"-memberlist.cluster-label": "cluster-b",
"-memberlist.abort-if-join-fails": "false",
})
clusterB2 := newSingleBinary("cluster-b-2", "", networkName+"-cluster-b-1:8000", map[string]string{
"-memberlist.cluster-label": "cluster-b",
"-memberlist.abort-if-join-fails": "false",
})

require.NoError(t, s.StartAndWaitReady(clusterA1))
require.NoError(t, s.StartAndWaitReady(clusterB1))
require.NoError(t, s.StartAndWaitReady(clusterA2, clusterB2))

requireMemberlistClusterState(t, 2, 2*512, clusterA1, clusterA2)
requireMemberlistClusterState(t, 2, 2*512, clusterB1, clusterB2)

// Verify cross-cluster isolation: clusterB1 must NOT see clusterA members.
// Wait a short observation window to ensure member counts remain stable.
time.Sleep(5 * time.Second)
requireMemberlistClusterState(t, 2, 2*512, clusterA1, clusterA2)
requireMemberlistClusterState(t, 2, 2*512, clusterB1, clusterB2)
}

func TestSingleBinaryWithMemberlistClusterLabelRollingMigration(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

const clusterLabel = "migration-cluster"

configs := []struct {
name string
join string
}{
{name: "migration-cortex-1", join: networkName + "-migration-cortex-2:8000"},
{name: "migration-cortex-2", join: networkName + "-migration-cortex-1:8000"},
{name: "migration-cortex-3", join: networkName + "-migration-cortex-1:8000"},
}

cortexServices := make([]*e2ecortex.CortexService, 0, len(configs))
for _, cfg := range configs {
cortexServices = append(cortexServices, newMigrationSingleBinary(cfg.name, cfg.join, "", true))
}

require.NoError(t, s.StartAndWaitReady(cortexServices[0]))
require.NoError(t, s.StartAndWaitReady(cortexServices[1], cortexServices[2]))
requireMemberlistClusterState(t, 3, 3*512, cortexServices...)

for i, cfg := range configs {
replacement := newMigrationSingleBinary(cfg.name, cfg.join, clusterLabel, true)
require.NoError(t, s.Stop(cortexServices[i]))
require.NoError(t, s.StartAndWaitReady(replacement))
cortexServices[i] = replacement
requireMemberlistClusterState(t, 3, 3*512, cortexServices...)
}

for i, cfg := range configs {
replacement := newMigrationSingleBinary(cfg.name, cfg.join, clusterLabel, false)
require.NoError(t, s.Stop(cortexServices[i]))
require.NoError(t, s.StartAndWaitReady(replacement))
cortexServices[i] = replacement
requireMemberlistClusterState(t, 3, 3*512, cortexServices...)
}
}

func testSingleBinaryEnv(t *testing.T, tlsEnabled bool, flags map[string]string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -162,6 +250,28 @@ func newSingleBinary(name string, servername string, join string, testFlags map[
return serv
}

func newMigrationSingleBinary(name string, join string, clusterLabel string, verificationDisabled bool) *e2ecortex.CortexService {
flags := map[string]string{
"-memberlist.abort-if-join-fails": "false",
"-memberlist.cluster-label-verification-disabled": fmt.Sprintf("%t", verificationDisabled),
}

if clusterLabel != "" {
flags["-memberlist.cluster-label"] = clusterLabel
}

return newSingleBinary(name, "", join, flags)
}

func requireMemberlistClusterState(t *testing.T, expectedMembers, expectedTokens int, services ...*e2ecortex.CortexService) {
t.Helper()

for _, service := range services {
require.NoError(t, service.WaitSumMetrics(e2e.Equals(float64(expectedMembers)), "memberlist_client_cluster_members_count"))
require.NoError(t, service.WaitSumMetrics(e2e.Equals(float64(expectedTokens)), "cortex_ring_tokens_total"))
}
}

func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type KVConfig struct {
AdvertiseAddr string `yaml:"advertise_addr"`
AdvertisePort int `yaml:"advertise_port"`

ClusterLabel string `yaml:"cluster_label"`
ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled"`

// List of members to join
JoinMembers flagext.StringSlice `yaml:"join_members"`
MinJoinBackoff time.Duration `yaml:"min_join_backoff"`
Expand Down Expand Up @@ -209,6 +212,8 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.")
f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.")
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.")

cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix)
}
Expand Down Expand Up @@ -406,6 +411,8 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {

mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr
mlCfg.AdvertisePort = m.cfg.AdvertisePort
mlCfg.Label = m.cfg.ClusterLabel
mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled

if m.cfg.NodeName != "" {
mlCfg.Name = m.cfg.NodeName
Expand All @@ -415,6 +422,10 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name)
}

if mlCfg.Label != "" {
level.Info(m.logger).Log("msg", "Using memberlist cluster label", "cluster_label", mlCfg.Label, "skip_inbound_label_check", mlCfg.SkipInboundLabelCheck)
}

mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false)
mlCfg.Transport = tr

Expand Down
Loading
Loading