diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c94c59df2bfaf..4820a61eebe36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -612,8 +612,7 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund } return future .thenCompose(__ -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, - bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, true)) .thenCompose(bundle -> { return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) @@ -1528,9 +1527,8 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR } }) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) + .thenCompose(__ -> + isBundleOwnedByAnyBroker(namespaceName, bundleRange) .thenCompose(flag -> { if (!flag) { log.info() @@ -1540,7 +1538,7 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR return CompletableFuture.completedFuture(null); } Optional destinationBrokerOpt = Optional.ofNullable(destinationBroker); - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, true) .thenCompose(nsBundle -> pulsar().getNamespaceService() .unloadNamespaceBundle(nsBundle, destinationBrokerOpt)); @@ -1583,10 +1581,8 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> getBundleRangeAsync(bundleName)) .thenCompose(bundleRange -> { - return getNamespacePoliciesAsync(namespaceName) - .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false)) + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, + authoritative, false) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, pulsar().getNamespaceService() .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), @@ -1602,9 +1598,8 @@ protected CompletableFuture internalGetTopicHashPositionsAsy .log("Getting hash position for topic list , bundle"); return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> { - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + .thenCompose(__ -> { + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, false, true) .thenCompose(nsBundle -> pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle)) @@ -1972,11 +1967,10 @@ protected CompletableFuture internalClearNamespaceBundleBacklogAsync(Strin // check cluster ownership for a given global namespace: redirect if peer-cluster owns it return validateGlobalNamespaceOwnershipAsync(namespaceName); }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> + .thenCompose(__ -> // Allow acquiring ownership for an unassigned bundle so backlog can be cleared // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(bundle -> clearBacklogAsync(bundle, null)) .thenRun(() -> log.info() @@ -2028,11 +2022,10 @@ protected CompletableFuture internalClearNamespaceBundleBacklogForSubscrip // check cluster ownership for a given global namespace: redirect if peer-cluster owns it return validateGlobalNamespaceOwnershipAsync(namespaceName); }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> + .thenCompose(__ -> // Allow acquiring ownership for an unassigned bundle so backlog can be cleared // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(bundle -> clearBacklogAsync(bundle, subscription)) .thenRun(() -> log.info() @@ -2080,10 +2073,8 @@ protected CompletableFuture internalUnsubscribeNamespaceBundleAsync(String return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false)) + .thenCompose(__ -> validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, + authoritative, false)) .thenCompose(bundle -> unsubscribeAsync(bundle, subscription)) .thenRun(() -> log.info() .attr("subscription", subscription) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java index 2b9c13596c1dc..2c448a5e9d5da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java @@ -69,7 +69,6 @@ private CompletableFuture getNamespaceBundleRangeAsync(String b } }); return ret - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply(policies -> validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange)); + .thenCompose(__ -> validateNamespaceBundleRangeAsync(namespaceName, bundleRange)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 2f33c15d77f0a..52c4addd9914e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -478,12 +478,11 @@ public void getListFromBundle( .attr("bundleRange", bundleRange) .log("list of topics on namespace bundle"); validateNamespaceOperation(namespaceName, NamespaceOperation.GET_BUNDLE); - Policies policies = getNamespacePolicies(namespaceName); // check cluster ownership for a given global namespace: redirect if peer-cluster owns it validateGlobalNamespaceOwnership(namespaceName); - isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> { + isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag -> { if (!flag) { log.info() .attr("namespace", namespaceName) @@ -491,7 +490,7 @@ public void getListFromBundle( .log("Namespace bundle is not owned by any broker"); asyncResponse.resume(Response.noContent().build()); } else { - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true) + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, true, true) .thenAccept(nsBundle -> { final var bundleTopics = pulsar().getBrokerService().getMultiLayerTopicsMap() .get(namespaceName.toString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 109d44ad8513e..fbb65de9fe71e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -585,7 +584,7 @@ static boolean isValidCluster(PulsarService pulsarService, String cluster) { return !pulsarService.getConfiguration().isAuthorizationEnabled(); } - protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, BundlesData bundles, + protected CompletableFuture validateNamespaceBundleRangeAsync(NamespaceName fqnn, String bundleRange) { try { checkArgument(bundleRange.contains("_"), "Invalid bundle range: " + bundleRange); @@ -596,77 +595,70 @@ protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, Bundl (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); NamespaceBundle nsBundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn, hashRange); - NamespaceBundles nsBundles = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn, - bundles); - nsBundles.validateBundle(nsBundle); - return nsBundle; + return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn) + .thenApply(nsBundles -> { + try { + nsBundles.validateBundle(nsBundle); + return nsBundle; + } catch (IllegalArgumentException e) { + log.error() + .attr("namespace", fqnn.toString()) + .attr("bundleRange", bundleRange) + .exceptionMessage(e) + .log("Invalid bundle range"); + throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); + } catch (Exception e) { + log.error() + .attr("namespace", fqnn.toString()) + .attr("bundleRange", bundleRange) + .exception(e) + .log("Failed to validate namespace bundle"); + throw new RestException(e); + } + }); } catch (IllegalArgumentException e) { log.error() .attr("namespace", fqnn.toString()) .attr("bundleRange", bundleRange) .exceptionMessage(e) .log("Invalid bundle range"); - throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); + return CompletableFuture.failedFuture( + new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage())); } catch (Exception e) { log.error() - .attr("bundle", fqnn.toString()) + .attr("namespace", fqnn.toString()) .attr("bundleRange", bundleRange) .exception(e) .log("Failed to validate namespace bundle"); - throw new RestException(e); + return CompletableFuture.failedFuture(new RestException(e)); } } /** * Checks whether a given bundle is currently loaded by any broker. */ - protected CompletableFuture isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles, - String bundleRange) { - NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - NamespaceService nsService = pulsar().getNamespaceService(); - - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return nsService.checkOwnershipPresentAsync(nsBundle); - } - - LookupOptions options = LookupOptions.builder() - .authoritative(false) - .requestHttps(isRequestHttps()) - .readOnly(true) - .loadTopicsInBundle(false).build(); - - return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); - } - - protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles, - String bundleRange, boolean authoritative, boolean readOnly) { - try { - NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - validateBundleOwnership(nsBundle, authoritative, readOnly); - return nsBundle; - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - log.error() - .attr("bundle", fqnn.toString()) - .attr("bundleRange", bundleRange) - .exception(e) - .log("Failed to validate namespace bundle"); - throw new RestException(e); - } + protected CompletableFuture isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) { + return validateNamespaceBundleRangeAsync(fqnn, bundleRange) + .thenCompose(nsBundle -> { + NamespaceService nsService = pulsar().getNamespaceService(); + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + return nsService.checkOwnershipPresentAsync(nsBundle); + } + LookupOptions options = LookupOptions.builder() + .authoritative(false) + .requestHttps(isRequestHttps()) + .readOnly(true) + .loadTopicsInBundle(false).build(); + return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); + }); } protected CompletableFuture validateNamespaceBundleOwnershipAsync( - NamespaceName fqnn, BundlesData bundles, String bundleRange, + NamespaceName fqnn, String bundleRange, boolean authoritative, boolean readOnly) { - NamespaceBundle nsBundle; - try { - nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - } catch (WebApplicationException wae) { - return CompletableFuture.failedFuture(wae); - } - return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly) - .thenApply(__ -> nsBundle); + return validateNamespaceBundleRangeAsync(fqnn, bundleRange) + .thenCompose(nsBundle -> validateBundleOwnershipAsync(nsBundle, authoritative, readOnly) + .thenApply(__ -> nsBundle)); } public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritative, boolean readOnly) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 4ac64405c607d..e3f5dba1e6b6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2571,4 +2571,124 @@ public void testGetClusterAntiAffinityNamespaces() throws Exception { assertEquals(namespacesResp, namespacesWithFullPath); } + @Test + public void testBundleValidationWithNonExistentNamespace() throws Exception { + String nonExistentNs = "non-existent-namespace"; + String bundleRange = "0x00000000_0x80000000"; + + // Test unload on non-existent namespace - should return 404 + // The error is thrown by validateGlobalNamespaceOwnershipAsync before reaching bundle validation + AsyncResponse unloadResponse = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadResponse, testTenant, nonExistentNs, + bundleRange, false, null); + ArgumentCaptor unloadCaptor = ArgumentCaptor.forClass(RestException.class); + verify(unloadResponse, timeout(5000).times(1)).resume(unloadCaptor.capture()); + assertEquals(unloadCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test split on non-existent namespace - should return 404 + AsyncResponse splitResponse = mock(AsyncResponse.class); + namespaces.splitNamespaceBundle(splitResponse, testTenant, nonExistentNs, + bundleRange, false, true, null, null); + ArgumentCaptor splitCaptor = ArgumentCaptor.forClass(RestException.class); + verify(splitResponse, timeout(5000).times(1)).resume(splitCaptor.capture()); + assertEquals(splitCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test clear backlog on non-existent namespace - should return 404 + AsyncResponse clearResponse = mock(AsyncResponse.class); + namespaces.clearNamespaceBundleBacklog(clearResponse, testTenant, nonExistentNs, + bundleRange, false); + ArgumentCaptor clearCaptor = ArgumentCaptor.forClass(RestException.class); + verify(clearResponse, timeout(5000).times(1)).resume(clearCaptor.capture()); + assertEquals(clearCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test clear backlog for subscription on non-existent namespace - should return 404 + AsyncResponse clearSubResponse = mock(AsyncResponse.class); + namespaces.clearNamespaceBundleBacklogForSubscription(clearSubResponse, testTenant, nonExistentNs, + bundleRange, "test-sub", false); + ArgumentCaptor clearSubCaptor = ArgumentCaptor.forClass(RestException.class); + verify(clearSubResponse, timeout(5000).times(1)).resume(clearSubCaptor.capture()); + assertEquals(clearSubCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + } + + @Test + public void testBundleValidationAfterSplit() throws Exception { + URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + String bundledNsLocal = "test-bundle-validation-after-split"; + List boundaries = List.of("0x00000000", "0xffffffff"); + BundlesData bundleData = BundlesData.builder() + .boundaries(boundaries) + .numBundles(boundaries.size() - 1) + .build(); + createBundledTestNamespaces(this.testTenant, bundledNsLocal, bundleData); + final NamespaceName testNs = NamespaceName.get(this.testTenant, bundledNsLocal); + + OwnershipCache mockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache()); + doReturn(CompletableFuture.completedFuture(null)).when(mockOwnershipCache) + .disableOwnership(any(NamespaceBundle.class)); + Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); + ownership.setAccessible(true); + ownership.set(pulsar.getNamespaceService(), mockOwnershipCache); + mockWebUrl(localWebServiceUrl, testNs); + + // Split the bundle + AsyncResponse splitResponse = mock(AsyncResponse.class); + namespaces.splitNamespaceBundle(splitResponse, testTenant, bundledNsLocal, + "0x00000000_0xffffffff", + false, true, null, null); + ArgumentCaptor splitCaptor = ArgumentCaptor.forClass(Response.class); + verify(splitResponse, timeout(5000).times(1)).resume(splitCaptor.capture()); + + // Verify split was successful + BundlesData bundlesDataAfterSplit = (BundlesData) asyncRequests(ctx -> namespaces.getBundlesData(ctx, + testTenant, bundledNsLocal)); + assertNotNull(bundlesDataAfterSplit); + assertEquals(bundlesDataAfterSplit.getBoundaries().size(), 3); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(0), "0x00000000"); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(1), "0x7fffffff"); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(2), "0xffffffff"); + + // Now test bundle validation with the old (invalid) bundle range - should return 412 + AsyncResponse unloadOldBundleResponse = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadOldBundleResponse, testTenant, bundledNsLocal, + "0x00000000_0xffffffff", false, null); + ArgumentCaptor unloadOldCaptor = ArgumentCaptor.forClass(RestException.class); + verify(unloadOldBundleResponse, timeout(5000).times(1)).resume(unloadOldCaptor.capture()); + assertEquals(unloadOldCaptor.getValue().getResponse().getStatus(), + Response.Status.PRECONDITION_FAILED.getStatusCode(), + "Old bundle range after split should return 412"); + + // Test bundle validation with new valid bundle ranges - should succeed + doReturn(true).when(nsSvc) + .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs))); + doReturn(CompletableFuture.completedFuture(null)).when(nsSvc) + .unloadNamespaceBundle(any(NamespaceBundle.class)); + + AsyncResponse unloadNewBundle1Response = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadNewBundle1Response, testTenant, bundledNsLocal, + "0x00000000_0x7fffffff", false, null); + ArgumentCaptor newBundle1Captor = ArgumentCaptor.forClass(Response.class); + verify(unloadNewBundle1Response, timeout(5000).times(1)).resume(newBundle1Captor.capture()); + assertEquals(newBundle1Captor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode(), + "New bundle range should be valid"); + + AsyncResponse unloadNewBundle2Response = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadNewBundle2Response, testTenant, bundledNsLocal, + "0x7fffffff_0xffffffff", false, null); + ArgumentCaptor newBundle2Captor = ArgumentCaptor.forClass(Response.class); + verify(unloadNewBundle2Response, timeout(5000).times(1)).resume(newBundle2Captor.capture()); + assertEquals(newBundle2Captor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode(), + "New bundle range should be valid"); + + // cleanup + resetBroker(); + } + }