From 925c1543dceb662f263519e74593b4e0c32b7c24 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 13 Apr 2026 18:19:59 +0800 Subject: [PATCH 1/6] Add authorization operation metrics --- .../authorization/AuthorizationService.java | 103 ++++++++++++------ .../metrics/AuthorizationMetrics.java | 45 ++++++++ .../AuthorizationServiceTest.java | 45 ++++++++ 3 files changed, 161 insertions(+), 32 deletions(-) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 255d6c0379779..da40df6b869c3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationParameters; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; @@ -101,12 +102,13 @@ public CompletableFuture isSuperUser(AuthenticationParameters authParam } public CompletableFuture isSuperUser(String user, AuthenticationDataSource authenticationData) { - return provider.isSuperUser(user, authenticationData, conf); + return recordAuthorizationDenial(provider.isSuperUser(user, authenticationData, conf), "superuser", "check"); } public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { - return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData); + return recordAuthorizationDenial(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), + "tenant_admin", "check"); } /** @@ -547,7 +549,8 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTenantOperationAsync(tenantName, role, operation, authData); + return recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, operation, authData), + "tenant", operation.name().toLowerCase()); } public CompletableFuture allowTenantOperationAsync(String tenantName, @@ -556,7 +559,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("tenant", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync( @@ -577,18 +580,23 @@ public CompletableFuture allowBrokerOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("broker", brokerOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + "broker", brokerOperation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, + authData), + "broker", brokerOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData); + return recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + "broker", brokerOperation.name().toLowerCase()); } } @@ -598,18 +606,22 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("cluster", clusterOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + "cluster", clusterOperation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), + "cluster", clusterOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData); + return recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + "cluster", clusterOperation.name().toLowerCase()); } } @@ -620,18 +632,22 @@ public CompletableFuture allowClusterPolicyOperationAsync(String cluste String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("cluster_policy", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, role, - policy, operation, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, originalRole, - policy, operation, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData); + return recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); } } @@ -675,7 +691,8 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData); + return recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, role, operation, + authData), "namespace", operation.name().toLowerCase()); } public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -684,7 +701,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("namespace", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync( @@ -718,7 +735,8 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); + return recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, + role, authData), "namespace_policy", operation.name().toLowerCase()); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -728,7 +746,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("namespace_policy", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -781,7 +799,8 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData); + return recordAuthorizationDenial(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, + authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operation.name().toLowerCase()); } public CompletableFuture allowTopicPolicyOperationAsync(TopicName topicName, @@ -791,7 +810,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicPolicyOperationAsync( @@ -852,8 +871,9 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, return CompletableFuture.completedFuture(true); } - CompletableFuture allowFuture = - provider.allowTopicOperationAsync(topicName, role, operation, authData); + CompletableFuture allowFuture = recordAuthorizationDenial( + provider.allowTopicOperationAsync(topicName, role, operation, authData), + "topic", operation.name().toLowerCase()); return allowFuture.whenComplete((allowed, exception) -> { if (exception == null) { if (allowed) { @@ -885,7 +905,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, AuthenticationDataSource originalAuthData, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("topic", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -905,7 +925,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("topic", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -953,4 +973,23 @@ public CompletableFuture>> getPermissionsAsync(Names public CompletableFuture>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) { return provider.getSubscriptionPermissionsAsync(namespaceName); } + + private CompletableFuture deniedFuture(String resourceType, String operation) { + AuthorizationMetrics.recordFailure(resourceType, operation); + return CompletableFuture.completedFuture(false); + } + + private CompletableFuture recordAuthorizationDenial(CompletableFuture authorizationFuture, + String resourceType, + String operation) { + return authorizationFuture.whenComplete((allowed, exception) -> { + if (exception == null) { + if (Boolean.TRUE.equals(allowed)) { + AuthorizationMetrics.recordSuccess(resourceType, operation); + } else if (Boolean.FALSE.equals(allowed)) { + AuthorizationMetrics.recordFailure(resourceType, operation); + } + } + }); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java new file mode 100644 index 0000000000000..a8166f198297c --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authorization.metrics; + +import io.prometheus.client.Counter; + +public final class AuthorizationMetrics { + public static final String AUTHORIZATION_OPERATIONS_METRIC_NAME = "pulsar_authorization_operations_total"; + public static final String RESULT_SUCCESS = "success"; + public static final String RESULT_FAILURE = "failure"; + public static final String RESOURCE_TYPE_TOPIC_POLICY = "topic_policy"; + + private static final Counter authorizationOperations = Counter.build() + .name(AUTHORIZATION_OPERATIONS_METRIC_NAME) + .help("Pulsar authorization operations") + .labelNames("resource_type", "operation", "result") + .register(); + + private AuthorizationMetrics() { + } + + public static void recordSuccess(String resourceType, String operation) { + authorizationOperations.labels(resourceType, operation, RESULT_SUCCESS).inc(); + } + + public static void recordFailure(String resourceType, String operation) { + authorizationOperations.labels(resourceType, operation, RESULT_FAILURE).inc(); + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java index 6f9dffa11b948..481a633625b8a 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java @@ -20,9 +20,11 @@ import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; +import io.prometheus.client.CollectorRegistry; import java.util.HashSet; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -132,4 +134,47 @@ public void testTopicPolicyOperationAsync(String role, String originalRole, bool PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get(); checkResult(shouldPass, isAuthorized); } + + @Test + public void testAuthorizationFailureMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "fail.client", null).get(); + double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() throws Exception { + double before = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), + "failure"); + boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), + NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); + double after = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), + "failure"); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationSuccessMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "pass.client", null).get(); + double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + + assertTrue(isAuthorized); + assertTrue(after - before == 1.0d); + } + + private double getAuthorizationOperations(String resourceType, String operation, String result) { + Double sample = CollectorRegistry.defaultRegistry.getSampleValue( + AuthorizationMetrics.AUTHORIZATION_OPERATIONS_METRIC_NAME, + new String[] {"resource_type", "operation", "result"}, + new String[] {resourceType, operation, result}); + return sample == null ? 0.0d : sample; + } } From 5a2f1fa823898eb5b6b3d014f0d1014061048ed5 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 13 Apr 2026 18:26:16 +0800 Subject: [PATCH 2/6] Add OpenTelemetry authorization metrics --- .../authorization/AuthorizationService.java | 14 ++- .../metrics/AuthorizationMetrics.java | 30 ++++- .../pulsar/broker/service/BrokerService.java | 2 +- .../OpenTelemetryAuthorizationStatsTest.java | 107 ++++++++++++++++++ 4 files changed, 145 insertions(+), 8 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index da40df6b869c3..cae6f6b40de78 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.authorization; import static java.util.concurrent.TimeUnit.SECONDS; +import io.opentelemetry.api.OpenTelemetry; import java.net.SocketAddress; import java.util.List; import java.util.Map; @@ -62,10 +63,17 @@ public class AuthorizationService { private final PulsarResources resources; private final AuthorizationProvider provider; private final ServiceConfiguration conf; + private final AuthorizationMetrics authorizationMetrics; public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources) throws PulsarServerException { + this(conf, pulsarResources, OpenTelemetry.noop()); + } + + public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources, OpenTelemetry openTelemetry) + throws PulsarServerException { this.conf = conf; + this.authorizationMetrics = new AuthorizationMetrics(openTelemetry); try { final String providerClassname = conf.getAuthorizationProvider(); if (StringUtils.isNotBlank(providerClassname)) { @@ -975,7 +983,7 @@ public CompletableFuture>> getSubscriptionPermissionsAsy } private CompletableFuture deniedFuture(String resourceType, String operation) { - AuthorizationMetrics.recordFailure(resourceType, operation); + authorizationMetrics.recordFailure(resourceType, operation); return CompletableFuture.completedFuture(false); } @@ -985,9 +993,9 @@ private CompletableFuture recordAuthorizationDenial(CompletableFuture { if (exception == null) { if (Boolean.TRUE.equals(allowed)) { - AuthorizationMetrics.recordSuccess(resourceType, operation); + authorizationMetrics.recordSuccess(resourceType, operation); } else if (Boolean.FALSE.equals(allowed)) { - AuthorizationMetrics.recordFailure(resourceType, operation); + authorizationMetrics.recordFailure(resourceType, operation); } } }); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java index a8166f198297c..2a18fee62f81a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -18,13 +18,22 @@ */ package org.apache.pulsar.broker.authorization.metrics; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; import io.prometheus.client.Counter; -public final class AuthorizationMetrics { +public class AuthorizationMetrics { public static final String AUTHORIZATION_OPERATIONS_METRIC_NAME = "pulsar_authorization_operations_total"; + public static final String AUTHORIZATION_COUNTER_METRIC_NAME = "pulsar.authorization.operation.count"; + public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.authorization"; public static final String RESULT_SUCCESS = "success"; public static final String RESULT_FAILURE = "failure"; public static final String RESOURCE_TYPE_TOPIC_POLICY = "topic_policy"; + public static final AttributeKey RESOURCE_TYPE_KEY = AttributeKey.stringKey("pulsar.authorization.type"); + public static final AttributeKey OPERATION_KEY = AttributeKey.stringKey("pulsar.authorization.operation"); + public static final AttributeKey RESULT_KEY = AttributeKey.stringKey("pulsar.authorization.result"); private static final Counter authorizationOperations = Counter.build() .name(AUTHORIZATION_OPERATIONS_METRIC_NAME) @@ -32,14 +41,27 @@ public final class AuthorizationMetrics { .labelNames("resource_type", "operation", "result") .register(); - private AuthorizationMetrics() { + private final LongCounter authorizationCounter; + + public AuthorizationMetrics(OpenTelemetry openTelemetry) { + var meter = openTelemetry.getMeter(INSTRUMENTATION_SCOPE_NAME); + authorizationCounter = meter.counterBuilder(AUTHORIZATION_COUNTER_METRIC_NAME) + .setDescription("The number of authorization operations") + .setUnit("{operation}") + .build(); } - public static void recordSuccess(String resourceType, String operation) { + public void recordSuccess(String resourceType, String operation) { authorizationOperations.labels(resourceType, operation, RESULT_SUCCESS).inc(); + authorizationCounter.add(1, Attributes.of(RESOURCE_TYPE_KEY, resourceType, + OPERATION_KEY, operation, + RESULT_KEY, RESULT_SUCCESS)); } - public static void recordFailure(String resourceType, String operation) { + public void recordFailure(String resourceType, String operation) { authorizationOperations.labels(resourceType, operation, RESULT_FAILURE).inc(); + authorizationCounter.add(1, Attributes.of(RESOURCE_TYPE_KEY, resourceType, + OPERATION_KEY, operation, + RESULT_KEY, RESULT_FAILURE)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7ecb6f2bdfd4b..3b144f3730aab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -374,7 +374,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws this.statsUpdater = new SingleThreadNonConcurrentFixedRateScheduler("pulsar-stats-updater"); this.authorizationService = new AuthorizationService( - pulsar.getConfiguration(), pulsar().getPulsarResources()); + pulsar.getConfiguration(), pulsar().getPulsarResources(), pulsar.getOpenTelemetry().getOpenTelemetry()); this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java new file mode 100644 index 0000000000000..1202482fa6201 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import io.opentelemetry.api.common.Attributes; +import java.util.HashSet; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.authorization.MockAuthorizationProvider; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryAuthorizationStatsTest extends BrokerTestBase { + + private AuthorizationService authorizationService; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setAuthorizationEnabled(true); + conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName()); + HashSet proxyRoles = new HashSet<>(); + proxyRoles.add("pass.proxy"); + proxyRoles.add("fail.proxy"); + conf.setProxyRoles(proxyRoles); + authorizationService = new AuthorizationService(conf, null, pulsar.getOpenTelemetry().getOpenTelemetry()); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + @Test + public void testAuthorizationSuccess() throws Exception { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "pass.client", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "topic", + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_SUCCESS), + 1); + } + + @Test + public void testAuthorizationFailure() throws Exception { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "fail.client", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "topic", + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), + 1); + } + + @Test + public void testAuthorizationFailureForInvalidOriginalPrincipal() throws Exception { + authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), + NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "namespace", + AuthorizationMetrics.OPERATION_KEY, "packages", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), + 1); + } +} From 2fdcb959d2ee9e02895c08b226b65f5f4be2f894 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sat, 25 Apr 2026 11:41:26 +0800 Subject: [PATCH 3/6] [improve][broker] Use authorization resource type constants --- .../authorization/AuthorizationService.java | 47 ++++++++++--------- .../metrics/AuthorizationMetrics.java | 12 ++++- .../AuthorizationServiceTest.java | 20 ++++---- .../OpenTelemetryAuthorizationStatsTest.java | 6 +-- 4 files changed, 50 insertions(+), 35 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index cae6f6b40de78..7f803e90da518 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -110,13 +110,14 @@ public CompletableFuture isSuperUser(AuthenticationParameters authParam } public CompletableFuture isSuperUser(String user, AuthenticationDataSource authenticationData) { - return recordAuthorizationDenial(provider.isSuperUser(user, authenticationData, conf), "superuser", "check"); + return recordAuthorizationDenial(provider.isSuperUser(user, authenticationData, conf), + AuthorizationMetrics.RESOURCE_TYPE_SUPERUSER, "check"); } public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { return recordAuthorizationDenial(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), - "tenant_admin", "check"); + AuthorizationMetrics.RESOURCE_TYPE_TENANT_ADMIN, "check"); } /** @@ -558,7 +559,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, operation, authData), - "tenant", operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_TENANT, operation.name().toLowerCase()); } public CompletableFuture allowTenantOperationAsync(String tenantName, @@ -567,7 +568,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("tenant", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TENANT, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync( @@ -588,23 +589,23 @@ public CompletableFuture allowBrokerOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("broker", brokerOperation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), - "broker", brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, authData), - "broker", brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), - "broker", brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); } } @@ -614,22 +615,22 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("cluster", clusterOperation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), - "cluster", clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), - "cluster", clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), - "cluster", clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); } } @@ -640,22 +641,22 @@ public CompletableFuture allowClusterPolicyOperationAsync(String cluste String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("cluster_policy", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), - "cluster_policy", operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), - "cluster_policy", operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), - "cluster_policy", operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); } } @@ -700,7 +701,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, role, operation, - authData), "namespace", operation.name().toLowerCase()); + authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operation.name().toLowerCase()); } public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -709,7 +710,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("namespace", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync( @@ -744,7 +745,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, - role, authData), "namespace_policy", operation.name().toLowerCase()); + role, authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operation.name().toLowerCase()); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -754,7 +755,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("namespace_policy", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -881,7 +882,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, CompletableFuture allowFuture = recordAuthorizationDenial( provider.allowTopicOperationAsync(topicName, role, operation, authData), - "topic", operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); return allowFuture.whenComplete((allowed, exception) -> { if (exception == null) { if (allowed) { @@ -913,7 +914,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, AuthenticationDataSource originalAuthData, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) { - return deniedFuture("topic", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -933,7 +934,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture("topic", operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java index 2a18fee62f81a..08a97da470cd3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -30,8 +30,18 @@ public class AuthorizationMetrics { public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.authorization"; public static final String RESULT_SUCCESS = "success"; public static final String RESULT_FAILURE = "failure"; + public static final String RESOURCE_TYPE_SUPERUSER = "superuser"; + public static final String RESOURCE_TYPE_TENANT_ADMIN = "tenant_admin"; + public static final String RESOURCE_TYPE_TENANT = "tenant"; + public static final String RESOURCE_TYPE_BROKER = "broker"; + public static final String RESOURCE_TYPE_CLUSTER = "cluster"; + public static final String RESOURCE_TYPE_CLUSTER_POLICY = "cluster_policy"; + public static final String RESOURCE_TYPE_NAMESPACE = "namespace"; + public static final String RESOURCE_TYPE_NAMESPACE_POLICY = "namespace_policy"; + public static final String RESOURCE_TYPE_TOPIC = "topic"; public static final String RESOURCE_TYPE_TOPIC_POLICY = "topic_policy"; - public static final AttributeKey RESOURCE_TYPE_KEY = AttributeKey.stringKey("pulsar.authorization.type"); + public static final AttributeKey RESOURCE_TYPE_KEY = + AttributeKey.stringKey("pulsar.authorization.resource.type"); public static final AttributeKey OPERATION_KEY = AttributeKey.stringKey("pulsar.authorization.operation"); public static final AttributeKey RESULT_KEY = AttributeKey.stringKey("pulsar.authorization.result"); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java index 481a633625b8a..8479106ce714d 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java @@ -137,10 +137,12 @@ public void testTopicPolicyOperationAsync(String role, String originalRole, bool @Test public void testAuthorizationFailureMetricForTopicOperation() throws Exception { - double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), TopicOperation.PRODUCE, null, "fail.client", null).get(); - double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); assertFalse(isAuthorized); assertTrue(after - before == 1.0d); @@ -148,12 +150,12 @@ public void testAuthorizationFailureMetricForTopicOperation() throws Exception { @Test public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() throws Exception { - double before = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), - "failure"); + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, + NamespaceOperation.PACKAGES.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); - double after = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), - "failure"); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, + NamespaceOperation.PACKAGES.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); assertFalse(isAuthorized); assertTrue(after - before == 1.0d); @@ -161,10 +163,12 @@ public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() throws E @Test public void testAuthorizationSuccessMetricForTopicOperation() throws Exception { - double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_SUCCESS); boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), TopicOperation.PRODUCE, null, "pass.client", null).get(); - double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_SUCCESS); assertTrue(isAuthorized); assertTrue(after - before == 1.0d); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java index 1202482fa6201..45b44f54a22f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java @@ -73,7 +73,7 @@ public void testAuthorizationSuccess() throws Exception { assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, - Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "topic", + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, AuthorizationMetrics.OPERATION_KEY, "produce", AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_SUCCESS), 1); @@ -86,7 +86,7 @@ public void testAuthorizationFailure() throws Exception { assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, - Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "topic", + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, AuthorizationMetrics.OPERATION_KEY, "produce", AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), 1); @@ -99,7 +99,7 @@ public void testAuthorizationFailureForInvalidOriginalPrincipal() throws Excepti assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, - Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, "namespace", + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, AuthorizationMetrics.OPERATION_KEY, "packages", AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), 1); From 6055378334d8f6878d6b477b448a67f27c2fc242 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sat, 25 Apr 2026 12:33:23 +0800 Subject: [PATCH 4/6] [improve][broker] Add authorization metrics package info --- .../authorization/metrics/package-info.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java new file mode 100644 index 0000000000000..a0ec5d99f5815 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Authorization metrics. + */ +package org.apache.pulsar.broker.authorization.metrics; From be7f41423acf601c11381ad9c2f9a7405cf6a996 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sun, 26 Apr 2026 04:45:30 +0800 Subject: [PATCH 5/6] [fix][broker] Handle null authorization metric operations --- .../authorization/AuthorizationService.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 7f803e90da518..5b7a6ac02c9e1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -60,6 +60,8 @@ @CustomLog public class AuthorizationService { + private static final String UNKNOWN_OPERATION = "unknown"; + private final PulsarResources resources; private final AuthorizationProvider provider; private final ServiceConfiguration conf; @@ -559,7 +561,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, operation, authData), - AuthorizationMetrics.RESOURCE_TYPE_TENANT, operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_TENANT, operationName(operation)); } public CompletableFuture allowTenantOperationAsync(String tenantName, @@ -568,7 +570,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TENANT, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TENANT, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync( @@ -589,23 +591,23 @@ public CompletableFuture allowBrokerOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), - AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, authData), - AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), - AuthorizationMetrics.RESOURCE_TYPE_BROKER, brokerOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); } } @@ -615,22 +617,22 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, clusterOperation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); } } @@ -641,22 +643,22 @@ public CompletableFuture allowClusterPolicyOperationAsync(String cluste String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); final var isOriginalAuthorizedFuture = recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { return recordAuthorizationDenial( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), - AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } } @@ -701,7 +703,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, role, operation, - authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operation.name().toLowerCase()); + authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operationName(operation)); } public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -710,7 +712,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync( @@ -745,7 +747,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, - role, authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operation.name().toLowerCase()); + role, authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -755,7 +757,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -809,7 +811,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic return CompletableFuture.completedFuture(true); } return recordAuthorizationDenial(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, - authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operation.name().toLowerCase()); + authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operationName(operation)); } public CompletableFuture allowTopicPolicyOperationAsync(TopicName topicName, @@ -819,7 +821,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicPolicyOperationAsync( @@ -882,7 +884,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, CompletableFuture allowFuture = recordAuthorizationDenial( provider.allowTopicOperationAsync(topicName, role, operation, authData), - AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); + AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); return allowFuture.whenComplete((allowed, exception) -> { if (exception == null) { if (allowed) { @@ -914,7 +916,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, AuthenticationDataSource originalAuthData, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -934,7 +936,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operation.name().toLowerCase()); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -988,6 +990,10 @@ private CompletableFuture deniedFuture(String resourceType, String oper return CompletableFuture.completedFuture(false); } + private static String operationName(Enum operation) { + return operation == null ? UNKNOWN_OPERATION : operation.name().toLowerCase(); + } + private CompletableFuture recordAuthorizationDenial(CompletableFuture authorizationFuture, String resourceType, String operation) { From 4c512ffae76f2394eec8b91206a438887b6908c7 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sun, 26 Apr 2026 12:48:16 +0800 Subject: [PATCH 6/6] [improve][broker] Add authorization error metric result --- .../authorization/AuthorizationService.java | 51 ++++++++++--------- .../metrics/AuthorizationMetrics.java | 18 ++++--- .../AuthorizationServiceTest.java | 19 +++++++ .../MockAuthorizationProvider.java | 3 ++ .../OpenTelemetryAuthorizationStatsTest.java | 20 ++++++++ 5 files changed, 81 insertions(+), 30 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 5b7a6ac02c9e1..30e3eac8054a4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -112,13 +112,13 @@ public CompletableFuture isSuperUser(AuthenticationParameters authParam } public CompletableFuture isSuperUser(String user, AuthenticationDataSource authenticationData) { - return recordAuthorizationDenial(provider.isSuperUser(user, authenticationData, conf), + return recordAuthorizationOperation(provider.isSuperUser(user, authenticationData, conf), AuthorizationMetrics.RESOURCE_TYPE_SUPERUSER, "check"); } public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { - return recordAuthorizationDenial(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), + return recordAuthorizationOperation(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), AuthorizationMetrics.RESOURCE_TYPE_TENANT_ADMIN, "check"); } @@ -560,7 +560,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, operation, authData), + return recordAuthorizationOperation(provider.allowTenantOperationAsync(tenantName, role, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_TENANT, operationName(operation)); } @@ -595,17 +595,17 @@ public CompletableFuture allowBrokerOperationAsync(String clusterName, } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = recordAuthorizationDenial( + final var isRoleAuthorizedFuture = recordAuthorizationOperation( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); - final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, authData), AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return recordAuthorizationDenial( + return recordAuthorizationOperation( provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); } @@ -621,16 +621,16 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = recordAuthorizationDenial( + final var isRoleAuthorizedFuture = recordAuthorizationOperation( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); - final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return recordAuthorizationDenial( + return recordAuthorizationOperation( provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); } @@ -639,24 +639,24 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, public CompletableFuture allowClusterPolicyOperationAsync(String clusterName, PolicyName policy, PolicyOperation operation, - String originalRole, - String role, - AuthenticationDataSource authData) { + String originalRole, + String role, + AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = recordAuthorizationDenial( + final var isRoleAuthorizedFuture = recordAuthorizationOperation( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); - final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return recordAuthorizationDenial( + return recordAuthorizationOperation( provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } @@ -702,7 +702,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, role, operation, + return recordAuthorizationOperation(provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operationName(operation)); } @@ -746,8 +746,9 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, - role, authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); + return recordAuthorizationOperation( + provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -810,7 +811,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return recordAuthorizationDenial(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, + return recordAuthorizationOperation(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operationName(operation)); } @@ -882,7 +883,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, return CompletableFuture.completedFuture(true); } - CompletableFuture allowFuture = recordAuthorizationDenial( + CompletableFuture allowFuture = recordAuthorizationOperation( provider.allowTopicOperationAsync(topicName, role, operation, authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); return allowFuture.whenComplete((allowed, exception) -> { @@ -994,11 +995,13 @@ private static String operationName(Enum operation) { return operation == null ? UNKNOWN_OPERATION : operation.name().toLowerCase(); } - private CompletableFuture recordAuthorizationDenial(CompletableFuture authorizationFuture, - String resourceType, - String operation) { + private CompletableFuture recordAuthorizationOperation(CompletableFuture authorizationFuture, + String resourceType, + String operation) { return authorizationFuture.whenComplete((allowed, exception) -> { - if (exception == null) { + if (exception != null) { + authorizationMetrics.recordError(resourceType, operation); + } else { if (Boolean.TRUE.equals(allowed)) { authorizationMetrics.recordSuccess(resourceType, operation); } else if (Boolean.FALSE.equals(allowed)) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java index 08a97da470cd3..c227642ece7e0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -30,6 +30,7 @@ public class AuthorizationMetrics { public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.authorization"; public static final String RESULT_SUCCESS = "success"; public static final String RESULT_FAILURE = "failure"; + public static final String RESULT_ERROR = "error"; public static final String RESOURCE_TYPE_SUPERUSER = "superuser"; public static final String RESOURCE_TYPE_TENANT_ADMIN = "tenant_admin"; public static final String RESOURCE_TYPE_TENANT = "tenant"; @@ -62,16 +63,21 @@ public AuthorizationMetrics(OpenTelemetry openTelemetry) { } public void recordSuccess(String resourceType, String operation) { - authorizationOperations.labels(resourceType, operation, RESULT_SUCCESS).inc(); - authorizationCounter.add(1, Attributes.of(RESOURCE_TYPE_KEY, resourceType, - OPERATION_KEY, operation, - RESULT_KEY, RESULT_SUCCESS)); + record(resourceType, operation, RESULT_SUCCESS); } public void recordFailure(String resourceType, String operation) { - authorizationOperations.labels(resourceType, operation, RESULT_FAILURE).inc(); + record(resourceType, operation, RESULT_FAILURE); + } + + public void recordError(String resourceType, String operation) { + record(resourceType, operation, RESULT_ERROR); + } + + private void record(String resourceType, String operation, String result) { + authorizationOperations.labels(resourceType, operation, result).inc(); authorizationCounter.add(1, Attributes.of(RESOURCE_TYPE_KEY, resourceType, OPERATION_KEY, operation, - RESULT_KEY, RESULT_FAILURE)); + RESULT_KEY, result)); } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java index 8479106ce714d..96ef696fb47cc 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java @@ -20,8 +20,10 @@ import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; import io.prometheus.client.CollectorRegistry; import java.util.HashSet; +import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; @@ -174,6 +176,23 @@ public void testAuthorizationSuccessMetricForTopicOperation() throws Exception { assertTrue(after - before == 1.0d); } + @Test + public void testAuthorizationErrorMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_ERROR); + try { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "error.client", null).get(); + fail("Expected authorization provider error"); + } catch (ExecutionException e) { + // Expected. + } + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_ERROR); + + assertTrue(after - before == 1.0d); + } + private double getAuthorizationOperations(String resourceType, String operation, String result) { Double sample = CollectorRegistry.defaultRegistry.getSampleValue( AuthorizationMetrics.AUTHORIZATION_OPERATIONS_METRIC_NAME, diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java index 9f9cd92cd0a75..e254dfb50bed6 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java @@ -38,6 +38,9 @@ public class MockAuthorizationProvider implements AuthorizationProvider { private CompletableFuture shouldPass(String role) { + if (role != null && role.startsWith("error")) { + return CompletableFuture.failedFuture(new RuntimeException("Authorization provider error")); + } return CompletableFuture.completedFuture(role != null && role.startsWith("pass")); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java index 45b44f54a22f5..061e4785e354b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java @@ -19,8 +19,10 @@ package org.apache.pulsar.broker.stats; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.testng.AssertJUnit.fail; import io.opentelemetry.api.common.Attributes; import java.util.HashSet; +import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.MockAuthorizationProvider; @@ -104,4 +106,22 @@ public void testAuthorizationFailureForInvalidOriginalPrincipal() throws Excepti AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), 1); } + + @Test + public void testAuthorizationError() throws Exception { + try { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "error.client", null).get(); + fail("Expected authorization provider error"); + } catch (ExecutionException e) { + // Expected. + } + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_ERROR), + 1); + } }