Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PyMySQL,PyPI,MIT,"Copyright (c) 2010, 2013 PyMySQL contributors"
PySocks,PyPI,BSD-3-Clause,Copyright 2006 Dan-Haim. All rights reserved.
PyYAML,PyPI,MIT,Copyright (c) 2017-2021 Ingy döt Net
aerospike,PyPI,Apache-2.0,"Copyright Aerospike, Inc."
aws-msk-iam-sasl-signer-python,PyPI,Apache-2.0,Copyright 2023 Amazon Managed Streaming for Apache Kafka
aws-requests-auth,PyPI,BSD-3-Clause,Copyright (c) David Muller.
azure-identity,PyPI,MIT,Copyright (c) Microsoft Corporation.
beautifulsoup4,PyPI,MIT,Copyright (c) Leonard Richardson
Expand Down
1 change: 1 addition & 0 deletions agent_requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
aerospike==7.1.1; sys_platform != 'win32' and sys_platform != 'darwin'
aws-msk-iam-sasl-signer-python==1.0.2
aws-requests-auth==0.4.3
azure-identity==1.24.0
beautifulsoup4==4.13.5
Expand Down
20 changes: 11 additions & 9 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,28 @@ files:
description: |
Settings for when `sasl_mechanism` is set to `OAUTHBEARER`.
options:
- name: method
description: |
The OAuth method to use. Either 'aws_msk_iam' for AWS MSK IAM authentication
or 'oidc' for standard OIDC authentication. Defaults to 'oidc' for backwards compatibility.
value:
type: string
example: aws_msk_iam
display_default: oidc
- name: url
required: true
enabled: false
description: |
The token endpoint.
The token endpoint URL. Required when method is 'oidc'.
value:
type: string
- name: client_id
required: true
enabled: false
description: |
The client identifier.
The client identifier. Required when method is 'oidc'.
value:
type: string
- name: client_secret
secret: true
required: true
enabled: false
description: |
The client secret.
The client secret. Required when method is 'oidc'.
value:
type: string
- template: instances/tls
Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/22058.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for AWS MSK IAM authentication using SASL/OAUTHBEARER mechanism.
54 changes: 48 additions & 6 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition
from confluent_kafka.admin import AdminClient

# AWS MSK IAM authentication support
try:
import boto3
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

AWS_MSK_IAM_AVAILABLE = True
except ImportError:
AWS_MSK_IAM_AVAILABLE = False


class KafkaClient:
def __init__(self, config, log) -> None:
Expand Down Expand Up @@ -69,12 +78,45 @@ def __get_authentication_config(self):
}

if self.config._sasl_mechanism == "OAUTHBEARER":
extras_parameters['sasl.oauthbearer.method'] = "oidc"
extras_parameters["sasl.oauthbearer.client.id"] = self.config._sasl_oauth_token_provider.get("client_id")
extras_parameters["sasl.oauthbearer.token.endpoint.url"] = self.config._sasl_oauth_token_provider.get("url")
extras_parameters["sasl.oauthbearer.client.secret"] = self.config._sasl_oauth_token_provider.get(
"client_secret"
)
# Default to 'oidc' for backwards compatibility with existing configs
method = self.config._sasl_oauth_token_provider.get("method", "oidc")

if method == "aws_msk_iam":
# AWS MSK IAM authentication requires OAuth callback
if not AWS_MSK_IAM_AVAILABLE:
raise Exception(
"AWS MSK IAM authentication requires 'aws-msk-iam-sasl-signer-python' library. "
"Install it with: pip install aws-msk-iam-sasl-signer-python"
)

# Set up OAuth callback for AWS MSK IAM
# The callback generates AWS IAM authentication tokens
def _aws_msk_iam_oauth_cb(oauth_config):
"""OAuth callback that generates AWS MSK IAM authentication tokens."""
try:
# Get AWS region from config or detect from environment
region = boto3.session.Session().region_name or 'us-east-1'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be validated in config.py somehow instead? Defaulting to us-east-1 seems like it could cause subtle failures that are a headache to debug

auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(region)
self.log.debug("Generated AWS MSK IAM token, expires in %s ms", expiry_ms)
return auth_token, expiry_ms / 1000 # Convert to seconds
except Exception as e:
self.log.error("Failed to generate AWS MSK IAM token: %s", e)
raise

extras_parameters['oauth_cb'] = _aws_msk_iam_oauth_cb

elif method == "oidc":
# OIDC authentication
extras_parameters['sasl.oauthbearer.method'] = "oidc"
extras_parameters["sasl.oauthbearer.client.id"] = self.config._sasl_oauth_token_provider.get(
"client_id"
)
extras_parameters["sasl.oauthbearer.token.endpoint.url"] = self.config._sasl_oauth_token_provider.get(
"url"
)
extras_parameters["sasl.oauthbearer.client.secret"] = self.config._sasl_oauth_token_provider.get(
"client_secret"
)

for key, value in extras_parameters.items():
# Do not add the value if it's not specified
Expand Down
29 changes: 23 additions & 6 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,31 @@ def validate_config(self):
if self._sasl_oauth_token_provider is None:
raise ConfigurationError("sasl_oauth_token_provider required for OAUTHBEARER sasl")

if self._sasl_oauth_token_provider.get("url") is None:
raise ConfigurationError("The `url` setting of `auth_token` reader is required")
if not isinstance(self._sasl_oauth_token_provider, dict):
raise ConfigurationError(
f"sasl_oauth_token_provider must be a dictionary. Got: {type(self._sasl_oauth_token_provider)}"
)

# Default to 'oidc' for backwards compatibility with existing configs
method = self._sasl_oauth_token_provider.get("method", "oidc")

if method == "aws_msk_iam":
# AWS MSK IAM doesn't require additional fields
pass
elif method == "oidc":
# OIDC requires url, client_id, and client_secret
if self._sasl_oauth_token_provider.get("url") is None:
raise ConfigurationError("The `url` setting of `auth_token` reader is required")

elif self._sasl_oauth_token_provider.get("client_id") is None:
raise ConfigurationError("The `client_id` setting of `auth_token` reader is required")
if self._sasl_oauth_token_provider.get("client_id") is None:
raise ConfigurationError("The `client_id` setting of `auth_token` reader is required")

elif self._sasl_oauth_token_provider.get("client_secret") is None:
raise ConfigurationError("The `client_secret` setting of `auth_token` reader is required")
if self._sasl_oauth_token_provider.get("client_secret") is None:
raise ConfigurationError("The `client_secret` setting of `auth_token` reader is required")
else:
raise ConfigurationError(
f"Invalid method '{method}' for sasl_oauth_token_provider. Must be 'aws_msk_iam' or 'oidc'"
)

# If `monitor_unlisted_consumer_groups` is set to true and
# using `consumer_groups`, we prioritize `monitor_unlisted_consumer_groups`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SaslOauthTokenProvider(BaseModel):
)
client_id: Optional[str] = None
client_secret: Optional[str] = None
method: Optional[str] = None
url: Optional[str] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,24 @@ instances:
#
# sasl_oauth_token_provider:

## @param url - string - required
## The token endpoint.
## @param method - string - optional - default: oidc
## The OAuth method to use. Either 'aws_msk_iam' for AWS MSK IAM authentication
## or 'oidc' for standard OIDC authentication. Defaults to 'oidc' for backwards compatibility.
#
# method: aws_msk_iam

## @param url - string - optional
## The token endpoint URL. Required when method is 'oidc'.
#
# url: <URL>

## @param client_id - string - required
## The client identifier.
## @param client_id - string - optional
## The client identifier. Required when method is 'oidc'.
#
# client_id: <CLIENT_ID>

## @param client_secret - string - required
## The client secret.
## @param client_secret - string - optional
## The client secret. Required when method is 'oidc'.
#
# client_secret: <CLIENT_SECRET>

Expand Down
2 changes: 2 additions & 0 deletions kafka_consumer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ deps = [
"confluent-kafka==2.11.1",
"fastavro==1.12.0",
"protobuf==6.32.0",
"aws-msk-iam-sasl-signer-python==1.0.2",
"boto3==1.40.21",
]

[project.urls]
Expand Down
65 changes: 65 additions & 0 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def test_tls_verify_is_string(tls_verify, expected, check, kafka_instance):
mock_client,
id="valid config",
),
pytest.param(
{'sasl_oauth_token_provider': {'method': 'aws_msk_iam'}},
does_not_raise(),
mock_client,
id="valid AWS MSK IAM config",
),
pytest.param(
{'sasl_oauth_token_provider': {'method': 'invalid_method'}},
pytest.raises(
Exception,
match="Invalid method 'invalid_method' for sasl_oauth_token_provider. Must be 'aws_msk_iam' or 'oidc'",
),
None,
id="invalid method",
),
],
)
def test_oauth_config(
Expand Down Expand Up @@ -1263,6 +1278,56 @@ def test_count_consumer_contexts(check, kafka_instance):
assert kafka_consumer_check.count_consumer_contexts(consumer_offsets) == 3


@pytest.mark.parametrize(
'oauth_config, expected_auth_keys',
[
pytest.param(
{'method': 'aws_msk_iam'},
['oauth_cb'], # AWS MSK IAM uses oauth_cb callback, not sasl.oauthbearer.method
id="AWS MSK IAM authentication",
),
pytest.param(
{'method': 'oidc', 'url': 'http://fake.url', 'client_id': 'test_id', 'client_secret': 'test_secret'},
{
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': 'test_id',
'sasl.oauthbearer.token.endpoint.url': 'http://fake.url',
'sasl.oauthbearer.client.secret': 'test_secret',
},
id="OIDC authentication",
),
],
)
def test_oauth_authentication_config(oauth_config, expected_auth_keys, kafka_instance, check):
"""Test that OAuth authentication configuration is correctly set for both AWS MSK IAM and OIDC."""
kafka_instance.update(
{
'monitor_unlisted_consumer_groups': True,
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'OAUTHBEARER',
'sasl_oauth_token_provider': oauth_config,
}
)
kafka_consumer_check = check(kafka_instance)
auth_config = kafka_consumer_check.client._KafkaClient__get_authentication_config()

# Verify security protocol is set
assert auth_config['security.protocol'] == 'sasl_ssl'
assert auth_config['sasl.mechanism'] == 'OAUTHBEARER'

# Verify OAuth-specific configuration
if isinstance(expected_auth_keys, dict):
# OIDC: verify exact key-value pairs
for key, value in expected_auth_keys.items():
assert auth_config[key] == value
else:
# AWS MSK IAM: verify oauth_cb callback is present and callable
for key in expected_auth_keys:
assert key in auth_config
if key == 'oauth_cb':
assert callable(auth_config[key])


def test_consumer_group_state_fetched_once_per_group(check, kafka_instance, dd_run_check, aggregator):
mock_client = seed_mock_client()
# Set up two partitions for same topic to check multiple contexts in same consumer group
Expand Down
Loading