diff --git a/README.md b/README.md index 37baa8a3..f663ecc4 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,84 @@ Choose the authentication method that best suits your needs based on your environment and security requirements. For more details, please refer to the [Zitadel documentation on authenticating service users](https://zitadel.com/docs/guides/integrate/service-users/authenticate-service-users). +## Advanced Configuration + +The SDK provides a `TransportOptions` object that allows you to customise +the underlying HTTP transport used for both OpenID discovery and API calls. + +### Disabling TLS Verification + +In development or testing environments with self-signed certificates, you can +disable TLS verification entirely: + +```python +from zitadel_client import Zitadel, TransportOptions + +options = TransportOptions(insecure=True) + +zitadel = Zitadel.with_client_credentials( + "https://your-instance.zitadel.cloud", + "client-id", + "client-secret", + transport_options=options, +) +``` + +### Using a Custom CA Certificate + +If your Zitadel instance uses a certificate signed by a private CA, you can +provide the path to the CA certificate in PEM format: + +```python +from zitadel_client import Zitadel, TransportOptions + +options = TransportOptions(ca_cert_path="/path/to/ca.pem") + +zitadel = Zitadel.with_client_credentials( + "https://your-instance.zitadel.cloud", + "client-id", + "client-secret", + transport_options=options, +) +``` + +### Custom Default Headers + +You can attach default headers to every outgoing request. This is useful for +custom routing or tracing headers: + +```python +from zitadel_client import Zitadel, TransportOptions + +options = TransportOptions(default_headers={"X-Custom-Header": "my-value"}) + +zitadel = Zitadel.with_client_credentials( + "https://your-instance.zitadel.cloud", + "client-id", + "client-secret", + transport_options=options, +) +``` + +### Proxy Configuration + +If your environment requires routing traffic through an HTTP proxy, you can +specify the proxy URL. To authenticate with the proxy, embed the credentials +directly in the URL: + +```python +from zitadel_client import Zitadel, TransportOptions + +options = TransportOptions(proxy_url="http://user:pass@proxy:8080") + +zitadel = Zitadel.with_client_credentials( + "https://your-instance.zitadel.cloud", + "client-id", + "client-secret", + transport_options=options, +) +``` + ## Design and Dependencies This SDK is designed to be lean and efficient, focusing on providing a diff --git a/pyproject.toml b/pyproject.toml index d8c6eb82..75c00ee9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dev = [ "pytest-cov>=2.8.1", "tox>=3.9.0", "types-python-dateutil>=2.8.19.14", - "testcontainers==3.7.1", + "testcontainers>=4.14.0,<5.0.0", "python-dotenv==1.1.1", "ruff>=0.12.4", "sphinx==7.4.7", diff --git a/test/fixtures/ca.pem b/test/fixtures/ca.pem new file mode 100644 index 00000000..7ef3bf71 --- /dev/null +++ b/test/fixtures/ca.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDOjCCAiKgAwIBAgIUYtCHt3J95fUpagYaFNw8M1/oV7kwDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MCAXDTI2MDMwNDA1MTcwNVoYDzIxMjYw +MjA4MDUxNzA1WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQCVY1jORnqyVB9tUgYYo9U3uYCVtCzWt3lGCoxDpxAb +LlpNnqOxG33ugRbNTY/QBht37Q37PjBahMJxkRE7EPsqi2Bz2fsZMyB7pJgP5iTA +0cILFyFzGpgUkXjmtsozKy0jAHpnzHGALjtzoKgp4SxCrWSp/MYtfMkBP9xbEpf1 +IYYQyyiISgic0/vO+nUEjyR/ULFP+nd48KjOHwWIHqwMY3nuzqScshAsyIZzSRT0 +ND2TLK1rxGoITqsOg2yTxRWwP0khvE08Y/59BGfWZq0svBCp2E3sIXg2Z3hlie7o +n+3P0F00kQfrEvkTi/cHv2vuhJpnlHxmTgJBRwhWE2+xAgMBAAGjgYEwfzAdBgNV +HQ4EFgQUPOzmGXHMRu3zIZqKLad8EkHkZvowHwYDVR0jBBgwFoAUPOzmGXHMRu3z +IZqKLad8EkHkZvowDwYDVR0TAQH/BAUwAwEB/zAsBgNVHREEJTAjgglsb2NhbGhv +c3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZIhvcNAQELBQADggEBAD/z +IRzYSBp6qPrvVgIX5/mEwN6ylp1J1pTC8nPQRozg0X2SEnRxz1DGBa1l046QVew2 +3+LuGYWtVkTzEtiX7BN2jSshX8d8Ss73+psZOye6t8VcAmEeVVdnqU+EzVAhM1DP +mUiNxJPHgK2cZkpV2BHB0Ccu7qVfaIFvTk2OdbGOsQ7+r2l562kUDzCFvBo/mskO +xiIt3YMZrpyLJJzvgi+fIo351oqLvTKOHw30FelAPIHo/A2OgngsM31HvwxROYlr +C5mET6wnOtjTQbKORADTGQ8D3sJCjQJ/AI34Q4C2q/PBljVL8JKoAPzwviYAuqdd +NIIKpaYUzng24gw7+50= +-----END CERTIFICATE----- diff --git a/test/fixtures/keystore.p12 b/test/fixtures/keystore.p12 new file mode 100644 index 00000000..9f1c66b1 Binary files /dev/null and b/test/fixtures/keystore.p12 differ diff --git a/test/fixtures/mappings/oidc-discovery.json b/test/fixtures/mappings/oidc-discovery.json new file mode 100644 index 00000000..3e2930a3 --- /dev/null +++ b/test/fixtures/mappings/oidc-discovery.json @@ -0,0 +1,19 @@ +{ + "request": { + "method": "GET", + "url": "/.well-known/openid-configuration" + }, + "response": { + "status": 200, + "headers": { + "Content-Type": "application/json" + }, + "jsonBody": { + "issuer": "{{request.baseUrl}}", + "token_endpoint": "{{request.baseUrl}}/oauth/v2/token", + "authorization_endpoint": "{{request.baseUrl}}/oauth/v2/authorize", + "userinfo_endpoint": "{{request.baseUrl}}/oidc/v1/userinfo", + "jwks_uri": "{{request.baseUrl}}/oauth/v2/keys" + } + } +} diff --git a/test/fixtures/mappings/settings.json b/test/fixtures/mappings/settings.json new file mode 100644 index 00000000..524a367a --- /dev/null +++ b/test/fixtures/mappings/settings.json @@ -0,0 +1,16 @@ +{ + "request": { + "method": "POST", + "url": "/zitadel.settings.v2.SettingsService/GetGeneralSettings" + }, + "response": { + "status": 200, + "headers": { + "Content-Type": "application/json" + }, + "jsonBody": { + "defaultLanguage": "{{request.scheme}}", + "defaultOrgId": "{{request.headers.X-Custom-Header}}" + } + } +} diff --git a/test/fixtures/mappings/token.json b/test/fixtures/mappings/token.json new file mode 100644 index 00000000..b4227d3f --- /dev/null +++ b/test/fixtures/mappings/token.json @@ -0,0 +1,17 @@ +{ + "request": { + "method": "POST", + "url": "/oauth/v2/token" + }, + "response": { + "status": 200, + "headers": { + "Content-Type": "application/json" + }, + "jsonBody": { + "access_token": "test-token-12345", + "token_type": "Bearer", + "expires_in": 3600 + } + } +} diff --git a/test/fixtures/squid.conf b/test/fixtures/squid.conf new file mode 100644 index 00000000..f3e60d4b --- /dev/null +++ b/test/fixtures/squid.conf @@ -0,0 +1,3 @@ +http_port 3128 +acl all src all +http_access allow all diff --git a/test/test_transport_options.py b/test/test_transport_options.py new file mode 100644 index 00000000..92601208 --- /dev/null +++ b/test/test_transport_options.py @@ -0,0 +1,39 @@ +import unittest + +from zitadel_client.transport_options import TransportOptions + + +class TransportOptionsTest(unittest.TestCase): + def test_defaults_returns_empty(self) -> None: + self.assertEqual({}, TransportOptions.defaults().to_session_kwargs()) + + def test_insecure_sets_verify_false(self) -> None: + opts = TransportOptions(insecure=True) + self.assertEqual({"verify": False}, opts.to_session_kwargs()) + + def test_ca_cert_path_sets_verify(self) -> None: + opts = TransportOptions(ca_cert_path="/path/to/ca.pem") + self.assertEqual({"verify": "/path/to/ca.pem"}, opts.to_session_kwargs()) + + def test_proxy_url_sets_proxies(self) -> None: + opts = TransportOptions(proxy_url="http://proxy:3128") + self.assertEqual( + {"proxies": {"http": "http://proxy:3128", "https": "http://proxy:3128"}}, + opts.to_session_kwargs(), + ) + + def test_insecure_takes_precedence_over_ca_cert(self) -> None: + opts = TransportOptions(insecure=True, ca_cert_path="/nonexistent/ca.pem") + self.assertEqual({"verify": False}, opts.to_session_kwargs()) + + def test_immutability(self) -> None: + opts = TransportOptions.defaults() + with self.assertRaises(AttributeError): + opts.insecure = True # type: ignore[misc] + + def test_defaults_factory(self) -> None: + opts = TransportOptions.defaults() + self.assertEqual({}, dict(opts.default_headers)) + self.assertIsNone(opts.ca_cert_path) + self.assertFalse(opts.insecure) + self.assertIsNone(opts.proxy_url) diff --git a/test/test_zitadel.py b/test/test_zitadel.py index faecaafa..39094f46 100644 --- a/test/test_zitadel.py +++ b/test/test_zitadel.py @@ -1,17 +1,32 @@ import importlib import inspect +import os import pkgutil import unittest +import urllib.request +from typing import Optional + +from testcontainers.core.container import DockerContainer +from testcontainers.core.network import Network +from testcontainers.core.wait_strategies import PortWaitStrategy +from testcontainers.core.waiting_utils import wait_container_is_ready from zitadel_client.auth.no_auth_authenticator import NoAuthAuthenticator +from zitadel_client.transport_options import TransportOptions from zitadel_client.zitadel import Zitadel +FIXTURES_DIR = os.path.join(os.path.dirname(__file__), "fixtures") + + +@wait_container_is_ready() +def _wait_for_wiremock(host: str, port: str) -> None: + url = f"http://{host}:{port}/__admin/mappings" + with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310 + if resp.status != 200: + raise ConnectionError(f"WireMock not ready: {resp.status}") + class ZitadelServicesTest(unittest.TestCase): - """ - Test to verify that all API service classes defined in the "zitadel_client.api" namespace - are registered as attributes in the Zitadel class. - """ def test_services_dynamic(self) -> None: expected = set() @@ -30,3 +45,114 @@ def test_services_dynamic(self) -> None: and getattr(zitadel, attr).__class__.__module__.startswith("zitadel_client.api") } self.assertEqual(expected, actual) + + +class ZitadelTransportTest(unittest.TestCase): + host: Optional[str] = None + http_port: Optional[str] = None + https_port: Optional[str] = None + proxy_port: Optional[str] = None + ca_cert_path: Optional[str] = None + wiremock: DockerContainer = None + proxy: DockerContainer = None + network: Network = None + + @classmethod + def setup_class(cls) -> None: + cls.ca_cert_path = os.path.join(FIXTURES_DIR, "ca.pem") + keystore_path = os.path.join(FIXTURES_DIR, "keystore.p12") + squid_conf = os.path.join(FIXTURES_DIR, "squid.conf") + + cls.network = Network().create() + + cls.wiremock = ( + DockerContainer("wiremock/wiremock:3.12.1") + .with_network(cls.network) + .with_network_aliases("wiremock") + .with_exposed_ports(8080, 8443) + .with_volume_mapping(keystore_path, "/home/wiremock/keystore.p12", mode="ro") + .with_volume_mapping( + os.path.join(FIXTURES_DIR, "mappings"), "/home/wiremock/mappings", mode="ro" + ) + .with_command( + "--https-port 8443" + " --https-keystore /home/wiremock/keystore.p12" + " --keystore-password password" + " --keystore-type PKCS12" + " --global-response-templating" + ) + ) + cls.wiremock.start() + + cls.proxy = ( + DockerContainer("ubuntu/squid:6.10-24.10_beta") + .with_network(cls.network) + .with_exposed_ports(3128) + .with_volume_mapping(squid_conf, "/etc/squid/squid.conf", mode="ro") + .waiting_for(PortWaitStrategy(3128)) + ) + cls.proxy.start() + + cls.host = cls.wiremock.get_container_host_ip() + cls.http_port = cls.wiremock.get_exposed_port(8080) + cls.https_port = cls.wiremock.get_exposed_port(8443) + cls.proxy_port = cls.proxy.get_exposed_port(3128) + + _wait_for_wiremock(cls.host, cls.http_port) + + @classmethod + def teardown_class(cls) -> None: + if cls.proxy is not None: + cls.proxy.stop() + if cls.wiremock is not None: + cls.wiremock.stop() + if cls.network is not None: + cls.network.remove() + + def test_custom_ca_cert(self) -> None: + zitadel = Zitadel.with_client_credentials( + f"https://{self.host}:{self.https_port}", + "dummy-client", + "dummy-secret", + transport_options=TransportOptions(ca_cert_path=self.ca_cert_path), + ) + response = zitadel.settings.get_general_settings({}) + self.assertEqual("https", response.default_language) + + def test_insecure_mode(self) -> None: + zitadel = Zitadel.with_client_credentials( + f"https://{self.host}:{self.https_port}", + "dummy-client", + "dummy-secret", + transport_options=TransportOptions(insecure=True), + ) + response = zitadel.settings.get_general_settings({}) + self.assertEqual("https", response.default_language) + + def test_default_headers(self) -> None: + zitadel = Zitadel.with_client_credentials( + f"http://{self.host}:{self.http_port}", + "dummy-client", + "dummy-secret", + transport_options=TransportOptions(default_headers={"X-Custom-Header": "test-value"}), + ) + response = zitadel.settings.get_general_settings({}) + self.assertEqual("http", response.default_language) + self.assertEqual("test-value", response.default_org_id) + + def test_proxy_url(self) -> None: + zitadel = Zitadel.with_access_token( + "http://wiremock:8080", + "test-token", + transport_options=TransportOptions(proxy_url=f"http://{self.host}:{self.proxy_port}"), + ) + response = zitadel.settings.get_general_settings({}) + self.assertEqual("http", response.default_language) + + def test_no_ca_cert_fails(self) -> None: + with self.assertRaises(Exception): # noqa: B017 + Zitadel.with_client_credentials( + f"https://{self.host}:{self.https_port}", + "dummy-client", + "dummy-secret", + ) diff --git a/uv.lock b/uv.lock index dfeb60a6..b79bef59 100644 --- a/uv.lock +++ b/uv.lock @@ -412,18 +412,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bc/58/6b3d24e6b9bc474a2dcdee65dfd1f008867015408a271562e4b690561a4d/cryptography-46.0.5-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:8456928655f856c6e1533ff59d5be76578a7157224dbd9ce6872f25055ab9ab7", size = 3407605, upload-time = "2026-02-10T19:18:29.233Z" }, ] -[[package]] -name = "deprecation" -version = "2.1.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "packaging" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/5a/d3/8ae2869247df154b64c1884d7346d412fed0c49df84db635aab2d1c40e62/deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff", size = 173788, upload-time = "2020-04-20T14:23:38.738Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/02/c3/253a89ee03fc9b9682f1541728eb66db7db22148cd94f89ab22528cd1e1b/deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a", size = 11178, upload-time = "2020-04-20T14:23:36.581Z" }, -] - [[package]] name = "distlib" version = "0.4.0" @@ -1135,15 +1123,18 @@ wheels = [ [[package]] name = "testcontainers" -version = "3.7.1" +version = "4.14.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "deprecation" }, { name = "docker" }, + { name = "python-dotenv" }, + { name = "typing-extensions" }, + { name = "urllib3" }, { name = "wrapt" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/8b/02/ef62dec9e4f804189c44df23f0b86897c738d38e9c48282fcd410308632f/testcontainers-4.14.1.tar.gz", hash = "sha256:316f1bb178d829c003acd650233e3ff3c59a833a08d8661c074f58a4fbd42a64", size = 80148, upload-time = "2026-01-31T23:13:46.915Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b3/37/38c595414d764cb1d9f3a0c907878c4146a21505ab974c63bcf3d8145807/testcontainers-3.7.1-py2.py3-none-any.whl", hash = "sha256:7f48cef4bf0ccd78f1a4534d4b701a003a3bace851f24eae58a32f9e3f0aeba0", size = 45321, upload-time = "2022-12-06T17:55:37.701Z" }, + { url = "https://files.pythonhosted.org/packages/c8/31/5e7b23f9e43ff7fd46d243808d70c5e8daf3bc08ecf5a7fb84d5e38f7603/testcontainers-4.14.1-py3-none-any.whl", hash = "sha256:03dfef4797b31c82e7b762a454b6afec61a2a512ad54af47ab41e4fa5415f891", size = 125640, upload-time = "2026-01-31T23:13:45.464Z" }, ] [[package]] @@ -1445,7 +1436,7 @@ dev = [ { name = "python-dotenv", specifier = "==1.1.1" }, { name = "ruff", specifier = ">=0.12.4" }, { name = "sphinx", specifier = "==7.4.7" }, - { name = "testcontainers", specifier = "==3.7.1" }, + { name = "testcontainers", specifier = ">=4.14.0,<5.0.0" }, { name = "tox", specifier = ">=3.9.0" }, { name = "ty", specifier = ">=0.0.4" }, { name = "types-python-dateutil", specifier = ">=2.8.19.14" }, diff --git a/zitadel_client/__init__.py b/zitadel_client/__init__.py index 4b1be525..2a35cec6 100644 --- a/zitadel_client/__init__.py +++ b/zitadel_client/__init__.py @@ -8,4 +8,5 @@ ZitadelError, # noqa F401 ) from .models import * # noqa: F403, F401 +from .transport_options import TransportOptions # noqa F401 from .zitadel import Zitadel # noqa F401 diff --git a/zitadel_client/api_client.py b/zitadel_client/api_client.py index 94a6ce34..334baf05 100644 --- a/zitadel_client/api_client.py +++ b/zitadel_client/api_client.py @@ -65,6 +65,8 @@ def __init__( self.rest_client = rest.RESTClientObject(configuration) self.default_headers = {"User-Agent": configuration.user_agent} + if configuration.default_headers: + self.default_headers.update(configuration.default_headers) if header_name is not None and header_value is not None: self.default_headers[header_name] = header_value self.client_side_validation = configuration.client_side_validation diff --git a/zitadel_client/auth/client_credentials_authenticator.py b/zitadel_client/auth/client_credentials_authenticator.py index 58e19f38..a95cd8b3 100644 --- a/zitadel_client/auth/client_credentials_authenticator.py +++ b/zitadel_client/auth/client_credentials_authenticator.py @@ -1,5 +1,5 @@ import sys -from typing import Dict, Set +from typing import Dict, Optional, Set if sys.version_info >= (3, 12): from typing import override @@ -13,6 +13,7 @@ OAuthAuthenticatorBuilder, ) from zitadel_client.auth.open_id import OpenId +from zitadel_client.transport_options import TransportOptions class ClientCredentialsAuthenticator(OAuthAuthenticator): @@ -22,7 +23,14 @@ class ClientCredentialsAuthenticator(OAuthAuthenticator): Uses client_id and client_secret to obtain an access token from the OAuth2 token endpoint. """ - def __init__(self, open_id: OpenId, client_id: str, client_secret: str, auth_scopes: Set[str]): + def __init__( + self, + open_id: OpenId, + client_id: str, + client_secret: str, + auth_scopes: Set[str], + transport_options: Optional[TransportOptions] = None, + ): """ Constructs a ClientCredentialsAuthenticator. @@ -30,14 +38,24 @@ def __init__(self, open_id: OpenId, client_id: str, client_secret: str, auth_sco :param client_id: The OAuth client identifier. :param client_secret: The OAuth client secret. :param auth_scopes: The scope(s) for the token request. + :param transport_options: Optional transport options for TLS, proxy, and headers. """ + opts = transport_options or TransportOptions.defaults() + + session = OAuth2Session( + client_id=client_id, + client_secret=client_secret, + scope=" ".join(auth_scopes), + **opts.to_session_kwargs(), + ) + + if opts.default_headers: + session.headers.update(opts.default_headers) + super().__init__( open_id, - OAuth2Session( - client_id=client_id, - client_secret=client_secret, - scope=" ".join(auth_scopes), - ), + session, + transport_options=opts, ) @override @@ -50,16 +68,19 @@ def get_grant(self) -> Dict[str, str]: return {"grant_type": "client_credentials"} @staticmethod - def builder(host: str, client_id: str, client_secret: str) -> "ClientCredentialsAuthenticatorBuilder": + def builder( + host: str, client_id: str, client_secret: str, transport_options: Optional[TransportOptions] = None + ) -> "ClientCredentialsAuthenticatorBuilder": """ Returns a builder for constructing a ClientCredentialsAuthenticator. :param host: The base URL for the OAuth provider. :param client_id: The OAuth client identifier. :param client_secret: The OAuth client secret. + :param transport_options: Optional transport options for TLS, proxy, and headers. :return: A ClientCredentialsAuthenticatorBuilder instance. """ - return ClientCredentialsAuthenticatorBuilder(host, client_id, client_secret) + return ClientCredentialsAuthenticatorBuilder(host, client_id, client_secret, transport_options=transport_options) class ClientCredentialsAuthenticatorBuilder(OAuthAuthenticatorBuilder["ClientCredentialsAuthenticatorBuilder"]): @@ -70,15 +91,16 @@ class ClientCredentialsAuthenticatorBuilder(OAuthAuthenticatorBuilder["ClientCre required for the client credentials flow. """ - def __init__(self, host: str, client_id: str, client_secret: str): + def __init__(self, host: str, client_id: str, client_secret: str, transport_options: Optional[TransportOptions] = None): """ Initializes the ClientCredentialsAuthenticatorBuilder with host, client ID, and client secret. :param host: The base URL for the OAuth provider. :param client_id: The OAuth client identifier. :param client_secret: The OAuth client secret. + :param transport_options: Optional transport options for TLS, proxy, and headers. """ - super().__init__(host) + super().__init__(host, transport_options=transport_options) self.client_id = client_id self.client_secret = client_secret @@ -88,4 +110,10 @@ def build(self) -> ClientCredentialsAuthenticator: :return: A configured ClientCredentialsAuthenticator. """ - return ClientCredentialsAuthenticator(self.open_id, self.client_id, self.client_secret, self.auth_scopes) + return ClientCredentialsAuthenticator( + self.open_id, + self.client_id, + self.client_secret, + self.auth_scopes, + transport_options=self.transport_options, + ) diff --git a/zitadel_client/auth/oauth_authenticator.py b/zitadel_client/auth/oauth_authenticator.py index 5ff5dd6e..195e6aa6 100644 --- a/zitadel_client/auth/oauth_authenticator.py +++ b/zitadel_client/auth/oauth_authenticator.py @@ -8,6 +8,7 @@ from zitadel_client import ZitadelError from zitadel_client.auth.authenticator import Authenticator, Token from zitadel_client.auth.open_id import OpenId +from zitadel_client.transport_options import TransportOptions class OAuthAuthenticator(Authenticator, ABC): @@ -19,16 +20,23 @@ class OAuthAuthenticator(Authenticator, ABC): oauth_session: An OAuth2Session instance used for fetching tokens. """ - def __init__(self, open_id: OpenId, oauth_session: OAuth2Session): + def __init__( + self, + open_id: OpenId, + oauth_session: OAuth2Session, + transport_options: Optional[TransportOptions] = None, + ): """ Constructs an OAuthAuthenticator. :param open_id: An object that must implement get_host_endpoint() and get_token_endpoint(). :param oauth_session: The scope for the token request. + :param transport_options: Optional transport options for TLS, proxy, and headers. """ super().__init__(open_id.get_host_endpoint()) self.open_id = open_id self.token: Optional[Token] = None + self.transport_options = transport_options or TransportOptions.defaults() self.oauth_session = oauth_session self._lock = Lock() @@ -92,14 +100,20 @@ class OAuthAuthenticatorBuilder(ABC, Generic[T]): This builder provides common configuration options such as the OpenId instance and authentication scopes. """ - def __init__(self, host: str): + def __init__( + self, + host: str, + transport_options: Optional[TransportOptions] = None, + ): """ Initializes the OAuthAuthenticatorBuilder with a given host. :param host: The base URL for the OAuth provider. + :param transport_options: Optional transport options for TLS, proxy, and headers. """ super().__init__() - self.open_id = OpenId(host) + self.transport_options = transport_options or TransportOptions.defaults() + self.open_id = OpenId(host, transport_options=self.transport_options) self.auth_scopes = {"openid", "urn:zitadel:iam:org:project:id:zitadel:aud"} def scopes(self: T, *auth_scopes: str) -> T: diff --git a/zitadel_client/auth/open_id.py b/zitadel_client/auth/open_id.py index a352d953..e2be4505 100644 --- a/zitadel_client/auth/open_id.py +++ b/zitadel_client/auth/open_id.py @@ -1,9 +1,12 @@ import json -import urllib +import ssl import urllib.error import urllib.request +from typing import Optional from urllib.parse import urljoin +from zitadel_client.transport_options import TransportOptions + class OpenId: """ @@ -16,7 +19,20 @@ class OpenId: host_endpoint: str token_endpoint: str - def __init__(self, hostname: str): + def __init__( # noqa: C901 + self, + hostname: str, + transport_options: Optional[TransportOptions] = None, + ): + """ + Initialize the OpenId configuration fetcher. + + :param hostname: The Zitadel instance hostname or URL. + :param transport_options: Optional transport options for TLS, proxy, and headers. + """ + if transport_options is None: + transport_options = TransportOptions.defaults() + # noinspection HttpUrlsUsage if not (hostname.startswith("http://") or hostname.startswith("https://")): hostname = "https://" + hostname @@ -29,7 +45,37 @@ def __init__(self, hostname: str): if not well_known_url.lower().startswith(("http://", "https://")): raise ValueError("Invalid URL scheme. Only 'http' and 'https' are allowed.") - with urllib.request.urlopen(well_known_url) as response: # noqa S310 + request = urllib.request.Request(well_known_url) # noqa: S310 + if transport_options.default_headers: + for header_name, header_value in transport_options.default_headers.items(): + request.add_header(header_name, header_value) + + if transport_options.insecure: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + elif transport_options.ca_cert_path: + ctx = ssl.create_default_context() + ctx.check_hostname = True + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(transport_options.ca_cert_path) + else: + ctx = None + + if transport_options.proxy_url: + proxy_handler = urllib.request.ProxyHandler( + {"http": transport_options.proxy_url, "https": transport_options.proxy_url} + ) + if ctx: + https_handler = urllib.request.HTTPSHandler(context=ctx) + opener = urllib.request.build_opener(proxy_handler, https_handler) + else: + opener = urllib.request.build_opener(proxy_handler) + response_ctx = opener.open(request) + else: + response_ctx = urllib.request.urlopen(request, context=ctx) # noqa: S310 + + with response_ctx as response: if response.status != 200: raise Exception(f"Failed to fetch OpenID configuration: HTTP {response.status}") config = json.loads(response.read().decode("utf-8")) diff --git a/zitadel_client/auth/web_token_authenticator.py b/zitadel_client/auth/web_token_authenticator.py index da2a6eac..8121f5ee 100644 --- a/zitadel_client/auth/web_token_authenticator.py +++ b/zitadel_client/auth/web_token_authenticator.py @@ -10,6 +10,7 @@ OAuthAuthenticatorBuilder, ) from zitadel_client.auth.open_id import OpenId +from zitadel_client.transport_options import TransportOptions class WebTokenAuthenticator(OAuthAuthenticator): @@ -30,9 +31,10 @@ def __init__( jwt_lifetime: timedelta = timedelta(hours=1), jwt_algorithm: str = "RS256", key_id: Optional[str] = None, + transport_options: Optional[TransportOptions] = None, ): """ - Constructs a JWTAuthenticator. + Constructs a WebTokenAuthenticator. :param open_id: The base URL for the OAuth provider. :param auth_scopes: The scope(s) for the token request. @@ -42,8 +44,19 @@ def __init__( :param private_key: The private key used to sign the JWT. :param jwt_lifetime: Lifetime of the JWT in seconds. :param jwt_algorithm: The JWT signing algorithm (default "RS256"). + :param transport_options: Optional transport options for TLS, proxy, and headers. """ - super().__init__(open_id, OAuth2Session(scope=" ".join(auth_scopes))) + opts = transport_options or TransportOptions.defaults() + + session = OAuth2Session( + scope=" ".join(auth_scopes), + **opts.to_session_kwargs(), + ) + + if opts.default_headers: + session.headers.update(opts.default_headers) + + super().__init__(open_id, session, transport_options=opts) self.jwt_issuer = jwt_issuer self.jwt_subject = jwt_subject self.jwt_audience = jwt_audience @@ -87,7 +100,9 @@ def get_grant(self) -> Dict[str, str]: raise Exception("Failed to generate JWT assertion: " + str(e)) from e @classmethod - def from_json(cls, host: str, json_path: str) -> "WebTokenAuthenticator": + def from_json( + cls, host: str, json_path: str, transport_options: Optional[TransportOptions] = None + ) -> "WebTokenAuthenticator": """ Create a WebTokenAuthenticatorBuilder instance from a JSON configuration file. @@ -101,6 +116,7 @@ def from_json(cls, host: str, json_path: str) -> "WebTokenAuthenticator": :param host: Base URL for the API endpoints. :param json_path: File path to the JSON configuration file. + :param transport_options: Optional transport options for TLS, proxy, and headers. :return: A new instance of WebTokenAuthenticator. :raises Exception: If the file cannot be read, the JSON is invalid, or required keys are missing. @@ -115,28 +131,35 @@ def from_json(cls, host: str, json_path: str) -> "WebTokenAuthenticator": private_key = config.get("key") key_id = config.get("keyId") if not user_id or not key_id or not private_key: - raise Exception("Missing required keys 'userId', 'key_id' or 'key' in JSON file.") + raise Exception("Missing required keys 'userId', 'keyId' or 'key' in JSON file.") - return (WebTokenAuthenticator.builder(host, user_id, private_key)).key_identifier(key_id).build() + return ( + (WebTokenAuthenticator.builder(host, user_id, private_key, transport_options=transport_options)) + .key_identifier(key_id) + .build() + ) @staticmethod - def builder(host: str, user_id: str, private_key: str) -> "WebTokenAuthenticatorBuilder": + def builder( + host: str, user_id: str, private_key: str, transport_options: Optional[TransportOptions] = None + ) -> "WebTokenAuthenticatorBuilder": """ - Returns a builder for constructing a JWTAuthenticator. + Returns a builder for constructing a WebTokenAuthenticator. :param host: The base URL for the OAuth provider. :param user_id: The user identifier, used as both the issuer and subject. :param private_key: The private key used to sign the JWT. - :return: A JWTAuthenticatorBuilder instance. + :param transport_options: Optional transport options for TLS, proxy, and headers. + :return: A WebTokenAuthenticatorBuilder instance. """ - return WebTokenAuthenticatorBuilder(host, user_id, user_id, host, private_key) + return WebTokenAuthenticatorBuilder(host, user_id, user_id, host, private_key, transport_options=transport_options) class WebTokenAuthenticatorBuilder(OAuthAuthenticatorBuilder["WebTokenAuthenticatorBuilder"]): """ - Builder for JWTAuthenticator. + Builder for WebTokenAuthenticator. - Provides a fluent API for configuring and constructing a JWTAuthenticator instance. + Provides a fluent API for configuring and constructing a WebTokenAuthenticator instance. """ def __init__( @@ -147,17 +170,19 @@ def __init__( jwt_audience: str, private_key: str, key_id: Optional[str] = None, + transport_options: Optional[TransportOptions] = None, ): """ - Initializes the JWTAuthenticatorBuilder with required parameters. + Initializes the WebTokenAuthenticatorBuilder with required parameters. :param host: The base URL for API endpoints. :param jwt_issuer: The issuer claim for the JWT. :param jwt_subject: The subject claim for the JWT. :param jwt_audience: The audience claim for the JWT. :param private_key: The PEM-formatted private key used for signing the JWT. + :param transport_options: Optional transport options for TLS, proxy, and headers. """ - super().__init__(host) + super().__init__(host, transport_options=transport_options) self.jwt_issuer = jwt_issuer self.jwt_subject = jwt_subject self.jwt_audience = jwt_audience @@ -177,12 +202,12 @@ def token_lifetime_seconds(self, seconds: int) -> "WebTokenAuthenticatorBuilder" def build(self) -> WebTokenAuthenticator: """ - Builds and returns a new JWTAuthenticator instance using the configured parameters. + Builds and returns a new WebTokenAuthenticator instance using the configured parameters. This method inlines the JWT assertion generation logic and passes all required - configuration to the JWTAuthenticator constructor. + configuration to the WebTokenAuthenticator constructor. - :return: A new JWTAuthenticator instance. + :return: A new WebTokenAuthenticator instance. """ return WebTokenAuthenticator( open_id=self.open_id, @@ -193,6 +218,7 @@ def build(self) -> WebTokenAuthenticator: private_key=self.private_key, jwt_lifetime=self.jwt_lifetime, key_id=self.key_id, + transport_options=self.transport_options, ) def key_identifier(self, key_id: Optional[str]) -> "WebTokenAuthenticatorBuilder": diff --git a/zitadel_client/configuration.py b/zitadel_client/configuration.py index 577078f6..964cff7d 100644 --- a/zitadel_client/configuration.py +++ b/zitadel_client/configuration.py @@ -81,6 +81,8 @@ def __init__( self.socket_options = None self.datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z" self.date_format = "%Y-%m-%d" + self.default_headers: Dict[str, str] = {} + self.proxy_url: Optional[str] = None def __deepcopy__(self, memo: Dict[int, Any]) -> Self: cls = self.__class__ diff --git a/zitadel_client/rest.py b/zitadel_client/rest.py index c3d569e8..0db82255 100644 --- a/zitadel_client/rest.py +++ b/zitadel_client/rest.py @@ -49,8 +49,12 @@ def __init__(self, configuration) -> None: # type: ignore # https pool manager self.pool_manager: urllib3.PoolManager - # noinspection PyArgumentList - self.pool_manager = urllib3.PoolManager(**pool_args) # ty: ignore[invalid-argument-type] + if configuration.proxy_url: + # noinspection PyArgumentList + self.pool_manager = urllib3.ProxyManager(configuration.proxy_url, **pool_args) # ty: ignore[invalid-argument-type] + else: + # noinspection PyArgumentList + self.pool_manager = urllib3.PoolManager(**pool_args) # ty: ignore[invalid-argument-type] def request( # noqa C901 too complex self, diff --git a/zitadel_client/transport_options.py b/zitadel_client/transport_options.py new file mode 100644 index 00000000..623cfb01 --- /dev/null +++ b/zitadel_client/transport_options.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass, field +from types import MappingProxyType +from typing import Mapping, Optional + + +@dataclass(frozen=True) +class TransportOptions: + """Immutable transport options for configuring HTTP connections. + + :param default_headers: Default HTTP headers sent to the origin server with every request. + :param ca_cert_path: Path to a custom CA certificate file for TLS verification. + :param insecure: Whether to disable TLS certificate verification. + :param proxy_url: Proxy URL for HTTP connections. + """ + + default_headers: Mapping[str, str] = field(default_factory=dict) + ca_cert_path: Optional[str] = None + insecure: bool = False + proxy_url: Optional[str] = None + + def __post_init__(self) -> None: + object.__setattr__(self, "default_headers", MappingProxyType(dict(self.default_headers))) + + @staticmethod + def defaults() -> "TransportOptions": + """Returns a TransportOptions instance with all default values.""" + return TransportOptions() + + def to_session_kwargs(self) -> dict: + """Builds keyword arguments for an authlib OAuth2Session.""" + kwargs: dict = {} + if self.insecure: + kwargs["verify"] = False + elif self.ca_cert_path: + kwargs["verify"] = self.ca_cert_path + if self.proxy_url: + kwargs["proxies"] = {"http": self.proxy_url, "https": self.proxy_url} + return kwargs diff --git a/zitadel_client/zitadel.py b/zitadel_client/zitadel.py index e6935171..f6d56ffc 100644 --- a/zitadel_client/zitadel.py +++ b/zitadel_client/zitadel.py @@ -36,6 +36,7 @@ from zitadel_client.auth.personal_access_token_authenticator import PersonalAccessTokenAuthenticator from zitadel_client.auth.web_token_authenticator import WebTokenAuthenticator from zitadel_client.configuration import Configuration +from zitadel_client.transport_options import TransportOptions class Zitadel: @@ -182,38 +183,104 @@ def __exit__( pass @staticmethod - def with_access_token(host: str, access_token: str) -> "Zitadel": + def _apply_transport_options( + config: Configuration, + transport_options: TransportOptions, + ) -> None: + """ + Apply transport options to the SDK configuration. + + :param config: The Configuration instance to modify. + :param transport_options: Transport options for TLS, proxy, and headers. + """ + config.default_headers = dict(transport_options.default_headers) + if transport_options.ca_cert_path: + config.ssl_ca_cert = transport_options.ca_cert_path + if transport_options.insecure: + config.verify_ssl = False + if transport_options.proxy_url: + config.proxy_url = transport_options.proxy_url + + @staticmethod + def with_access_token( + host: str, + access_token: str, + *, + transport_options: Optional[TransportOptions] = None, + ) -> "Zitadel": """ Initialize the SDK with a Personal Access Token (PAT). :param host: API URL (e.g., "https://api.zitadel.example.com"). :param access_token: Personal Access Token for Bearer authentication. + :param transport_options: Optional transport options for TLS, proxy, and headers. :return: Configured Zitadel client instance. :see: https://zitadel.com/docs/guides/integrate/service-users/personal-access-token """ - return Zitadel(PersonalAccessTokenAuthenticator(host, access_token)) + resolved = transport_options or TransportOptions.defaults() + + def mutate_config(config: Configuration) -> None: + Zitadel._apply_transport_options(config, resolved) + + return Zitadel(PersonalAccessTokenAuthenticator(host, access_token), mutate_config=mutate_config) @staticmethod - def with_client_credentials(host: str, client_id: str, client_secret: str) -> "Zitadel": + def with_client_credentials( + host: str, + client_id: str, + client_secret: str, + *, + transport_options: Optional[TransportOptions] = None, + ) -> "Zitadel": """ Initialize the SDK using OAuth2 Client Credentials flow. :param host: API URL. :param client_id: OAuth2 client identifier. :param client_secret: OAuth2 client secret. + :param transport_options: Optional transport options for TLS, proxy, and headers. :return: Configured Zitadel client instance with token auto-refresh. :see: https://zitadel.com/docs/guides/integrate/service-users/client-credentials """ - return Zitadel(ClientCredentialsAuthenticator.builder(host, client_id, client_secret).build()) + resolved = transport_options or TransportOptions.defaults() + + authenticator = ClientCredentialsAuthenticator.builder( + host, + client_id, + client_secret, + transport_options=resolved, + ).build() + + def mutate_config(config: Configuration) -> None: + Zitadel._apply_transport_options(config, resolved) + + return Zitadel(authenticator, mutate_config=mutate_config) @staticmethod - def with_private_key(host: str, key_file: str) -> "Zitadel": + def with_private_key( + host: str, + key_file: str, + *, + transport_options: Optional[TransportOptions] = None, + ) -> "Zitadel": """ Initialize the SDK via Private Key JWT assertion. :param host: API URL. :param key_file: Path to service account JSON or PEM key file. + :param transport_options: Optional transport options for TLS, proxy, and headers. :return: Configured Zitadel client instance using JWT assertion. :see: https://zitadel.com/docs/guides/integrate/service-users/private-key-jwt """ - return Zitadel(WebTokenAuthenticator.from_json(host, key_file)) + resolved = transport_options or TransportOptions.defaults() + + authenticator = WebTokenAuthenticator.from_json( + host, + key_file, + transport_options=resolved, + ) + + def mutate_config(config: Configuration) -> None: + Zitadel._apply_transport_options(config, resolved) + + return Zitadel(authenticator, mutate_config=mutate_config)