From 15564ee10825ffa9d910a4b423aa0ca47db3ba87 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 4 Dec 2025 21:57:23 -0700 Subject: [PATCH 1/4] Add support for AWS IAM login --- kafka_consumer/assets/configuration/spec.yaml | 20 +++--- kafka_consumer/changelog.d/22040.added | 1 + .../datadog_checks/kafka_consumer/client.py | 54 +++++++++++++-- .../datadog_checks/kafka_consumer/config.py | 29 +++++++-- .../kafka_consumer/config_models/instance.py | 1 + .../kafka_consumer/data/conf.yaml.example | 29 +++++++-- kafka_consumer/pyproject.toml | 2 + kafka_consumer/tests/test_unit.py | 65 +++++++++++++++++++ 8 files changed, 174 insertions(+), 27 deletions(-) create mode 100644 kafka_consumer/changelog.d/22040.added diff --git a/kafka_consumer/assets/configuration/spec.yaml b/kafka_consumer/assets/configuration/spec.yaml index c3affd99bca07..d1aa156ab5ad8 100644 --- a/kafka_consumer/assets/configuration/spec.yaml +++ b/kafka_consumer/assets/configuration/spec.yaml @@ -199,26 +199,28 @@ files: description: | Settings for when `sasl_mechanism` is set to `OAUTHBEARER`. options: + - name: method + description: | + The OAuth method to use. Either 'aws_msk_iam' for AWS MSK IAM authentication + or 'oidc' for standard OIDC authentication. Defaults to 'oidc' for backwards compatibility. + value: + type: string + example: aws_msk_iam + display_default: oidc - name: url - required: true - enabled: false description: | - The token endpoint. + The token endpoint URL. Required when method is 'oidc'. value: type: string - name: client_id - required: true - enabled: false description: | - The client identifier. + The client identifier. Required when method is 'oidc'. value: type: string - name: client_secret secret: true - required: true - enabled: false description: | - The client secret. + The client secret. Required when method is 'oidc'. value: type: string - template: instances/tls diff --git a/kafka_consumer/changelog.d/22040.added b/kafka_consumer/changelog.d/22040.added new file mode 100644 index 0000000000000..19a8cb5829ef5 --- /dev/null +++ b/kafka_consumer/changelog.d/22040.added @@ -0,0 +1 @@ +Add support for AWS MSK IAM authentication using SASL/OAUTHBEARER mechanism. \ No newline at end of file diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 46f3d0ccc5159..cbb406f99be5b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -6,6 +6,15 @@ from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition from confluent_kafka.admin import AdminClient +# AWS MSK IAM authentication support +try: + import boto3 + from aws_msk_iam_sasl_signer import MSKAuthTokenProvider + + AWS_MSK_IAM_AVAILABLE = True +except ImportError: + AWS_MSK_IAM_AVAILABLE = False + class KafkaClient: def __init__(self, config, log) -> None: @@ -69,12 +78,45 @@ def __get_authentication_config(self): } if self.config._sasl_mechanism == "OAUTHBEARER": - extras_parameters['sasl.oauthbearer.method'] = "oidc" - extras_parameters["sasl.oauthbearer.client.id"] = self.config._sasl_oauth_token_provider.get("client_id") - extras_parameters["sasl.oauthbearer.token.endpoint.url"] = self.config._sasl_oauth_token_provider.get("url") - extras_parameters["sasl.oauthbearer.client.secret"] = self.config._sasl_oauth_token_provider.get( - "client_secret" - ) + # Default to 'oidc' for backwards compatibility with existing configs + method = self.config._sasl_oauth_token_provider.get("method", "oidc") + + if method == "aws_msk_iam": + # AWS MSK IAM authentication requires OAuth callback + if not AWS_MSK_IAM_AVAILABLE: + raise Exception( + "AWS MSK IAM authentication requires 'aws-msk-iam-sasl-signer-python' library. " + "Install it with: pip install aws-msk-iam-sasl-signer-python" + ) + + # Set up OAuth callback for AWS MSK IAM + # The callback generates AWS IAM authentication tokens + def _aws_msk_iam_oauth_cb(oauth_config): + """OAuth callback that generates AWS MSK IAM authentication tokens.""" + try: + # Get AWS region from config or detect from environment + region = boto3.session.Session().region_name or 'us-east-1' + auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(region) + self.log.debug("Generated AWS MSK IAM token, expires in %s ms", expiry_ms) + return auth_token, expiry_ms / 1000 # Convert to seconds + except Exception as e: + self.log.error("Failed to generate AWS MSK IAM token: %s", e) + raise + + extras_parameters['oauth_cb'] = _aws_msk_iam_oauth_cb + + elif method == "oidc": + # OIDC authentication + extras_parameters['sasl.oauthbearer.method'] = "oidc" + extras_parameters["sasl.oauthbearer.client.id"] = self.config._sasl_oauth_token_provider.get( + "client_id" + ) + extras_parameters["sasl.oauthbearer.token.endpoint.url"] = self.config._sasl_oauth_token_provider.get( + "url" + ) + extras_parameters["sasl.oauthbearer.client.secret"] = self.config._sasl_oauth_token_provider.get( + "client_secret" + ) for key, value in extras_parameters.items(): # Do not add the value if it's not specified diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config.py b/kafka_consumer/datadog_checks/kafka_consumer/config.py index 3f64e8b79ba61..a1c0482b5fcd3 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config.py @@ -129,14 +129,31 @@ def validate_config(self): if self._sasl_oauth_token_provider is None: raise ConfigurationError("sasl_oauth_token_provider required for OAUTHBEARER sasl") - if self._sasl_oauth_token_provider.get("url") is None: - raise ConfigurationError("The `url` setting of `auth_token` reader is required") + if not isinstance(self._sasl_oauth_token_provider, dict): + raise ConfigurationError( + f"sasl_oauth_token_provider must be a dictionary. Got: {type(self._sasl_oauth_token_provider)}" + ) + + # Default to 'oidc' for backwards compatibility with existing configs + method = self._sasl_oauth_token_provider.get("method", "oidc") + + if method == "aws_msk_iam": + # AWS MSK IAM doesn't require additional fields + pass + elif method == "oidc": + # OIDC requires url, client_id, and client_secret + if self._sasl_oauth_token_provider.get("url") is None: + raise ConfigurationError("The `url` setting of `auth_token` reader is required") - elif self._sasl_oauth_token_provider.get("client_id") is None: - raise ConfigurationError("The `client_id` setting of `auth_token` reader is required") + if self._sasl_oauth_token_provider.get("client_id") is None: + raise ConfigurationError("The `client_id` setting of `auth_token` reader is required") - elif self._sasl_oauth_token_provider.get("client_secret") is None: - raise ConfigurationError("The `client_secret` setting of `auth_token` reader is required") + if self._sasl_oauth_token_provider.get("client_secret") is None: + raise ConfigurationError("The `client_secret` setting of `auth_token` reader is required") + else: + raise ConfigurationError( + f"Invalid method '{method}' for sasl_oauth_token_provider. Must be 'aws_msk_iam' or 'oidc'" + ) # If `monitor_unlisted_consumer_groups` is set to true and # using `consumer_groups`, we prioritize `monitor_unlisted_consumer_groups` diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py index 572fe4fcd3caa..156ca19fa70c2 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py @@ -36,6 +36,7 @@ class SaslOauthTokenProvider(BaseModel): ) client_id: Optional[str] = None client_secret: Optional[str] = None + method: Optional[str] = None url: Optional[str] = None diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index e3b203fed4050..e29e85027bd41 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -185,21 +185,38 @@ instances: # sasl_kerberos_domain_name: localhost ## Settings for when `sasl_mechanism` is set to `OAUTHBEARER`. + ## + ## For AWS MSK IAM authentication: + ## sasl_oauth_token_provider: + ## method: aws_msk_iam + ## + ## For OIDC authentication (or omit 'method' for backwards compatibility): + ## sasl_oauth_token_provider: + ## method: oidc + ## url: + ## client_id: + ## client_secret: # # sasl_oauth_token_provider: - ## @param url - string - required - ## The token endpoint. + ## @param method - string - optional - default: oidc + ## The OAuth method to use. Either 'aws_msk_iam' for AWS MSK IAM authentication + ## or 'oidc' for standard OIDC authentication. Defaults to 'oidc' for backwards compatibility. + # + # method: aws_msk_iam + + ## @param url - string - optional + ## The token endpoint URL. Required when method is 'oidc'. # # url: - ## @param client_id - string - required - ## The client identifier. + ## @param client_id - string - optional + ## The client identifier. Required when method is 'oidc'. # # client_id: - ## @param client_secret - string - required - ## The client secret. + ## @param client_secret - string - optional + ## The client secret. Required when method is 'oidc'. # # client_secret: diff --git a/kafka_consumer/pyproject.toml b/kafka_consumer/pyproject.toml index 4f2a8642066cc..f59ddf0431837 100644 --- a/kafka_consumer/pyproject.toml +++ b/kafka_consumer/pyproject.toml @@ -39,6 +39,8 @@ deps = [ "confluent-kafka==2.11.1", "fastavro==1.12.0", "protobuf==6.32.0", + "aws-msk-iam-sasl-signer-python>=1.0.0", + "boto3>=1.28.0", ] [project.urls] diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index f1e4f0954c6dd..1668f57ff48e9 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -182,6 +182,21 @@ def test_tls_verify_is_string(tls_verify, expected, check, kafka_instance): mock_client, id="valid config", ), + pytest.param( + {'sasl_oauth_token_provider': {'method': 'aws_msk_iam'}}, + does_not_raise(), + mock_client, + id="valid AWS MSK IAM config", + ), + pytest.param( + {'sasl_oauth_token_provider': {'method': 'invalid_method'}}, + pytest.raises( + Exception, + match="Invalid method 'invalid_method' for sasl_oauth_token_provider. Must be 'aws_msk_iam' or 'oidc'", + ), + None, + id="invalid method", + ), ], ) def test_oauth_config( @@ -1263,6 +1278,56 @@ def test_count_consumer_contexts(check, kafka_instance): assert kafka_consumer_check.count_consumer_contexts(consumer_offsets) == 3 +@pytest.mark.parametrize( + 'oauth_config, expected_auth_keys', + [ + pytest.param( + {'method': 'aws_msk_iam'}, + ['oauth_cb'], # AWS MSK IAM uses oauth_cb callback, not sasl.oauthbearer.method + id="AWS MSK IAM authentication", + ), + pytest.param( + {'method': 'oidc', 'url': 'http://fake.url', 'client_id': 'test_id', 'client_secret': 'test_secret'}, + { + 'sasl.oauthbearer.method': 'oidc', + 'sasl.oauthbearer.client.id': 'test_id', + 'sasl.oauthbearer.token.endpoint.url': 'http://fake.url', + 'sasl.oauthbearer.client.secret': 'test_secret', + }, + id="OIDC authentication", + ), + ], +) +def test_oauth_authentication_config(oauth_config, expected_auth_keys, kafka_instance, check): + """Test that OAuth authentication configuration is correctly set for both AWS MSK IAM and OIDC.""" + kafka_instance.update( + { + 'monitor_unlisted_consumer_groups': True, + 'security_protocol': 'SASL_SSL', + 'sasl_mechanism': 'OAUTHBEARER', + 'sasl_oauth_token_provider': oauth_config, + } + ) + kafka_consumer_check = check(kafka_instance) + auth_config = kafka_consumer_check.client._KafkaClient__get_authentication_config() + + # Verify security protocol is set + assert auth_config['security.protocol'] == 'sasl_ssl' + assert auth_config['sasl.mechanism'] == 'OAUTHBEARER' + + # Verify OAuth-specific configuration + if isinstance(expected_auth_keys, dict): + # OIDC: verify exact key-value pairs + for key, value in expected_auth_keys.items(): + assert auth_config[key] == value + else: + # AWS MSK IAM: verify oauth_cb callback is present and callable + for key in expected_auth_keys: + assert key in auth_config + if key == 'oauth_cb': + assert callable(auth_config[key]) + + def test_consumer_group_state_fetched_once_per_group(check, kafka_instance, dd_run_check, aggregator): mock_client = seed_mock_client() # Set up two partitions for same topic to check multiple contexts in same consumer group From 00df396a3ecda43aec80367d8091f587c10c05e1 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 5 Dec 2025 12:41:28 -0700 Subject: [PATCH 2/4] fix tests --- .../changelog.d/{22040.added => 22058.added} | 0 .../kafka_consumer/data/conf.yaml.example | 11 ----------- 2 files changed, 11 deletions(-) rename kafka_consumer/changelog.d/{22040.added => 22058.added} (100%) diff --git a/kafka_consumer/changelog.d/22040.added b/kafka_consumer/changelog.d/22058.added similarity index 100% rename from kafka_consumer/changelog.d/22040.added rename to kafka_consumer/changelog.d/22058.added diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index e29e85027bd41..4d4b65863f5de 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -185,17 +185,6 @@ instances: # sasl_kerberos_domain_name: localhost ## Settings for when `sasl_mechanism` is set to `OAUTHBEARER`. - ## - ## For AWS MSK IAM authentication: - ## sasl_oauth_token_provider: - ## method: aws_msk_iam - ## - ## For OIDC authentication (or omit 'method' for backwards compatibility): - ## sasl_oauth_token_provider: - ## method: oidc - ## url: - ## client_id: - ## client_secret: # # sasl_oauth_token_provider: From 0bd4c11de70697221cb43c71160451e0a30fdf2e Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 5 Dec 2025 12:46:10 -0700 Subject: [PATCH 3/4] update dep versions --- agent_requirements.in | 1 + kafka_consumer/pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/agent_requirements.in b/agent_requirements.in index 85e944b3e8db6..6e59c76ad7942 100644 --- a/agent_requirements.in +++ b/agent_requirements.in @@ -1,4 +1,5 @@ aerospike==7.1.1; sys_platform != 'win32' and sys_platform != 'darwin' +aws-msk-iam-sasl-signer-python==1.0.2 aws-requests-auth==0.4.3 azure-identity==1.24.0 beautifulsoup4==4.13.5 diff --git a/kafka_consumer/pyproject.toml b/kafka_consumer/pyproject.toml index f59ddf0431837..6920eefe629ad 100644 --- a/kafka_consumer/pyproject.toml +++ b/kafka_consumer/pyproject.toml @@ -39,8 +39,8 @@ deps = [ "confluent-kafka==2.11.1", "fastavro==1.12.0", "protobuf==6.32.0", - "aws-msk-iam-sasl-signer-python>=1.0.0", - "boto3>=1.28.0", + "aws-msk-iam-sasl-signer-python==1.0.2", + "boto3==1.40.21", ] [project.urls] From d51d7a51207f2aa6fe38d3041cbbc69fcb754ba7 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 5 Dec 2025 12:51:20 -0700 Subject: [PATCH 4/4] license sync --- LICENSE-3rdparty.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 52ca46d09d1f8..dcc319f5da40e 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -4,6 +4,7 @@ PyMySQL,PyPI,MIT,"Copyright (c) 2010, 2013 PyMySQL contributors" PySocks,PyPI,BSD-3-Clause,Copyright 2006 Dan-Haim. All rights reserved. PyYAML,PyPI,MIT,Copyright (c) 2017-2021 Ingy döt Net aerospike,PyPI,Apache-2.0,"Copyright Aerospike, Inc." +aws-msk-iam-sasl-signer-python,PyPI,Apache-2.0,Copyright 2023 Amazon Managed Streaming for Apache Kafka aws-requests-auth,PyPI,BSD-3-Clause,Copyright (c) David Muller. azure-identity,PyPI,MIT,Copyright (c) Microsoft Corporation. beautifulsoup4,PyPI,MIT,Copyright (c) Leonard Richardson