Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
}
return future
.thenCompose(__ ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true))
.thenCompose(bundle -> {
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
Expand Down Expand Up @@ -1528,9 +1527,8 @@ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleR
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange)
.thenCompose(__ ->
isBundleOwnedByAnyBroker(namespaceName, bundleRange)
.thenCompose(flag -> {
if (!flag) {
log.info()
Expand All @@ -1540,7 +1538,7 @@ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleR
return CompletableFuture.completedFuture(null);
}
Optional<String> destinationBrokerOpt = Optional.ofNullable(destinationBroker);
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true)
.thenCompose(nsBundle -> pulsar().getNamespaceService()
.unloadNamespaceBundle(nsBundle, destinationBrokerOpt));
Expand Down Expand Up @@ -1583,10 +1581,8 @@ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundl
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getBundleRangeAsync(bundleName))
.thenCompose(bundleRange -> {
return getNamespacePoliciesAsync(namespaceName)
.thenCompose(policies ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, false)
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
Expand All @@ -1602,9 +1598,8 @@ protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsy
.log("Getting hash position for topic list , bundle");
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> {
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
.thenCompose(__ -> {
return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
false, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
Expand Down Expand Up @@ -1972,11 +1967,10 @@ protected CompletableFuture<Void> internalClearNamespaceBundleBacklogAsync(Strin
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
.thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle so backlog can be cleared
// even if not loaded.
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, null))
.thenRun(() -> log.info()
Expand Down Expand Up @@ -2028,11 +2022,10 @@ protected CompletableFuture<Void> internalClearNamespaceBundleBacklogForSubscrip
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
.thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle so backlog can be cleared
// even if not loaded.
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
.thenRun(() -> log.info()
Expand Down Expand Up @@ -2080,10 +2073,8 @@ protected CompletableFuture<Void> internalUnsubscribeNamespaceBundleAsync(String

return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName))
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(__ -> validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, false))
.thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
.thenRun(() -> log.info()
.attr("subscription", subscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private CompletableFuture<NamespaceBundle> getNamespaceBundleRangeAsync(String b
}
});
return ret
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange));
.thenCompose(__ -> validateNamespaceBundleRangeAsync(namespaceName, bundleRange));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,19 @@ public void getListFromBundle(
.attr("bundleRange", bundleRange)
.log("list of topics on namespace bundle");
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);

// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);

isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag -> {
if (!flag) {
log.info()
.attr("namespace", namespaceName)
.attr("bundleRange", bundleRange)
.log("Namespace bundle is not owned by any broker");
asyncResponse.resume(Response.noContent().build());
} else {
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, true, true)
.thenAccept(nsBundle -> {
final var bundleTopics = pulsar().getBrokerService().getMultiLayerTopicsMap()
.get(namespaceName.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -585,7 +584,7 @@ static boolean isValidCluster(PulsarService pulsarService, String cluster) {
return !pulsarService.getConfiguration().isAuthorizationEnabled();
}

protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, BundlesData bundles,
protected CompletableFuture<NamespaceBundle> validateNamespaceBundleRangeAsync(NamespaceName fqnn,
String bundleRange) {
try {
checkArgument(bundleRange.contains("_"), "Invalid bundle range: " + bundleRange);
Expand All @@ -596,77 +595,70 @@ protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, Bundl
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
NamespaceBundle nsBundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn,
hashRange);
NamespaceBundles nsBundles = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn,
bundles);
nsBundles.validateBundle(nsBundle);
return nsBundle;
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn)
.thenApply(nsBundles -> {
try {
nsBundles.validateBundle(nsBundle);
return nsBundle;
} catch (IllegalArgumentException e) {
log.error()
.attr("namespace", fqnn.toString())
.attr("bundleRange", bundleRange)
.exceptionMessage(e)
.log("Invalid bundle range");
throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
} catch (Exception e) {
log.error()
.attr("namespace", fqnn.toString())
.attr("bundleRange", bundleRange)
.exception(e)
.log("Failed to validate namespace bundle");
throw new RestException(e);
}
});
} catch (IllegalArgumentException e) {
log.error()
.attr("namespace", fqnn.toString())
.attr("bundleRange", bundleRange)
.exceptionMessage(e)
.log("Invalid bundle range");
throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
return CompletableFuture.failedFuture(
new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()));
} catch (Exception e) {
log.error()
.attr("bundle", fqnn.toString())
.attr("namespace", fqnn.toString())
.attr("bundleRange", bundleRange)
.exception(e)
.log("Failed to validate namespace bundle");
throw new RestException(e);
return CompletableFuture.failedFuture(new RestException(e));
}
}

/**
* Checks whether a given bundle is currently loaded by any broker.
*/
protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
String bundleRange) {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}

LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();

return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}

protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles,
String bundleRange, boolean authoritative, boolean readOnly) {
try {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
validateBundleOwnership(nsBundle, authoritative, readOnly);
return nsBundle;
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
log.error()
.attr("bundle", fqnn.toString())
.attr("bundleRange", bundleRange)
.exception(e)
.log("Failed to validate namespace bundle");
throw new RestException(e);
}
protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) {
return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
.thenCompose(nsBundle -> {
NamespaceService nsService = pulsar().getNamespaceService();
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
});
}

protected CompletableFuture<NamespaceBundle> validateNamespaceBundleOwnershipAsync(
NamespaceName fqnn, BundlesData bundles, String bundleRange,
NamespaceName fqnn, String bundleRange,
boolean authoritative, boolean readOnly) {
NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
} catch (WebApplicationException wae) {
return CompletableFuture.failedFuture(wae);
}
return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
.thenApply(__ -> nsBundle);
return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
.thenCompose(nsBundle -> validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
.thenApply(__ -> nsBundle));
}

public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritative, boolean readOnly)
Expand Down
Loading