diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 3e9efbecdecb..926bf3fdb342 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -112,6 +112,8 @@ import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceChecker; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator; +import org.apache.pinot.controller.helix.core.replication.RealtimeSegmentCopier; +import org.apache.pinot.controller.helix.core.replication.TableReplicator; import org.apache.pinot.controller.helix.core.retention.RetentionManager; import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory; import org.apache.pinot.controller.helix.core.util.HelixSetupUtils; @@ -218,6 +220,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected TaskMetricsEmitter _taskMetricsEmitter; protected PoolingHttpClientConnectionManager _connectionManager; protected TenantRebalancer _tenantRebalancer; + protected TableReplicator _tableReplicator; // This executor should be used by all code paths for user initiated rebalances, so that the controller config // CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored. protected ExecutorService _rebalancerExecutorService; @@ -617,6 +620,7 @@ private void setUpPinotController() { _rebalancerExecutorService); _tenantRebalancer = new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, _rebalancerExecutorService); + _tableReplicator = new TableReplicator(_helixResourceManager, _executorService, new RealtimeSegmentCopier(_config)); // Setting up periodic tasks List controllerPeriodicTasks = setupControllerPeriodicTasks(); @@ -678,6 +682,7 @@ protected void configure() { bind(_sqlQueryExecutor).to(SqlQueryExecutor.class); bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class); bind(_tenantRebalancer).to(TenantRebalancer.class); + bind(_tableReplicator).to(TableReplicator.class); bind(_tableSizeReader).to(TableSizeReader.class); bind(_storageQuotaChecker).to(StorageQuotaChecker.class); bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index a0decdabdded..f3603da96a59 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -405,6 +405,7 @@ public static long getRandomInitialDelayInSeconds() { public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK = "controller.tenant.rebalance.maxJobsInZK"; public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK = "controller.reload.segment.maxJobsInZK"; public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK = "controller.force.commit.maxJobsInZK"; + public static final String CONFIG_OF_MAX_TABLE_REPLICATION_JOBS_IN_ZK = "controller.table.replication.maxJobsInZK"; private final Map _invalidConfigs = new ConcurrentHashMap<>(); @@ -1401,4 +1402,8 @@ public List getTimeseriesLanguages() { public boolean getSegmentCompletionGroupCommitEnabled() { return getProperty(CONTROLLER_SEGMENT_COMPLETION_GROUP_COMMIT_ENABLED, true); } + + public int getMaxTableReplicationZkJobs() { + return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java index 8583ff36a3cb..e46eeed8b42e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java @@ -25,11 +25,17 @@ import java.util.Map; import javax.annotation.Nullable; +/** + * Payload for the copy table request. + */ @JsonIgnoreProperties(ignoreUnknown = true) public class CopyTablePayload { private String _sourceClusterUri; private Map _headers; + + private String _destinationClusterUri; + private Map _destinationClusterHeaders; /** * Broker tenant for the new table. * MUST NOT contain the tenant type suffix, i.e. _BROKER. @@ -40,6 +46,7 @@ public class CopyTablePayload { * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE. */ private String _serverTenant; + private Integer _backfillParallism; /** * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. We will use this field to let user specify @@ -51,14 +58,20 @@ public class CopyTablePayload { public CopyTablePayload( @JsonProperty(value = "sourceClusterUri", required = true) String sourceClusterUri, @JsonProperty("sourceClusterHeaders") Map headers, + @JsonProperty(value = "destinationClusterUri", required = true) String destinationClusterUri, + @JsonProperty(value = "destinationClusterHeaders") Map destinationClusterHeaders, @JsonProperty(value = "brokerTenant", required = true) String brokerTenant, @JsonProperty(value = "serverTenant", required = true) String serverTenant, - @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap) { + @JsonProperty("tagPoolReplacementMap") @Nullable Map tagPoolReplacementMap, + @JsonProperty("backfillParallism") @Nullable Integer backfillParallism) { _sourceClusterUri = sourceClusterUri; _headers = headers; + _destinationClusterUri = destinationClusterUri; + _destinationClusterHeaders = destinationClusterHeaders; _brokerTenant = brokerTenant; _serverTenant = serverTenant; _tagPoolReplacementMap = tagPoolReplacementMap; + _backfillParallism = backfillParallism; } @JsonGetter("sourceClusterUri") @@ -71,6 +84,16 @@ public Map getHeaders() { return _headers; } + @JsonGetter("destinationClusterUri") + public String getDestinationClusterUri() { + return _destinationClusterUri; + } + + @JsonGetter("destinationClusterHeaders") + public Map getDestinationClusterHeaders() { + return _destinationClusterHeaders; + } + @JsonGetter("brokerTenant") public String getBrokerTenant() { return _brokerTenant; @@ -81,6 +104,11 @@ public String getServerTenant() { return _serverTenant; } + @JsonGetter("backfillParallism") + public Integer getBackfillParallism() { + return _backfillParallism; + } + @JsonGetter("tagPoolReplacementMap") public Map getTagPoolReplacementMap() { return _tagPoolReplacementMap; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index b749fddc4b2a..10bc758b7021 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -108,6 +109,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.replication.TableReplicator; import org.apache.pinot.controller.recommender.RecommenderDriver; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; import org.apache.pinot.controller.util.CompletionServiceHelper; @@ -205,6 +207,9 @@ public class PinotTableRestletResource { @Inject HttpClientConnectionManager _connectionManager; + @Inject + TableReplicator _tableReplicator; + /** * API to create a table. Before adding, validations will be done (min number of replicas, checking offline and * realtime table configs match, checking for tenants existing). @@ -382,6 +387,9 @@ public CopyTableResponse copyTable( response.setTableConfig(realtimeTableConfig); response.setWatermarkInductionResult(watermarkInductionResult); } + String jobID = UUID.randomUUID().toString(); + _tableReplicator.replicateTable(jobID, realtimeTableConfig.getTableName(), copyTablePayload, + watermarkInductionResult); return response; } catch (Exception e) { LOGGER.error("[copyTable] Error copying table: {}", tableName, e); @@ -425,6 +433,18 @@ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, CopyTab } } + @GET + @Path("/tables/copyStatus/{jobId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TABLE_COPY_STATUS) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted table replication job", + notes = "Get status for a submitted table replication job") + public JsonNode getForceCommitJobStatus( + @ApiParam(value = "job id", required = true) @PathParam("jobId") String id) { + return JsonUtils.objectToJsonNode( + _pinotHelixResourceManager.getControllerJobZKMetadata(id, ControllerJobTypes.TABLE_REPLICATION)); + } + @PUT @Produces(MediaType.APPLICATION_JSON) @Path("/tables/recommender") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 7caa81d9d790..9407643ba25a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2519,6 +2519,29 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.FORCE_COMMIT); } + public boolean addNewTableReplicationJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + WatermarkInductionResult res) + throws JsonProcessingException { + Map jobMetadata = + commonTableReplicationJobMetadata(tableNameWithType, jobId, jobSubmissionTimeMs, res); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + } + + public Map commonTableReplicationJobMetadata(String tableNameWithType, String jobId, + long jobSubmissionTimeMs, WatermarkInductionResult res) + throws JsonProcessingException { + Map jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REPLICATION.name()); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED, + Integer.toString(res.getHistoricalSegments().size())); + jobMetadata.put(CommonConstants.ControllerJob.CONSUMER_WATERMARKS, JsonUtils.objectToString(res.getWatermarks())); + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "IN_PROGRESS"); + return jobMetadata; + } + /** * Adds a new job metadata for controller job like table rebalance or reload into ZK * @param jobId job's UUID @@ -4931,7 +4954,23 @@ public WatermarkInductionResult getConsumerWatermarks(String tableName) throws T } return new WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq, startOffset); }).collect(Collectors.toList()); - return new WatermarkInductionResult(watermarks); + + Map partGroupToLatestSeq = watermarks.stream().collect( + Collectors.toMap(WatermarkInductionResult.Watermark::getPartitionGroupId, + WatermarkInductionResult.Watermark::getSequenceNumber)); + List historicalSegments = new ArrayList<>(); + for (String segment : idealState.getRecord().getMapFields().keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); + if (llcSegmentName != null) { + int partitionGroupId = llcSegmentName.getPartitionGroupId(); + int seq = llcSegmentName.getSequenceNumber(); + if (partGroupToLatestSeq.containsKey(partitionGroupId) && partGroupToLatestSeq.get(partitionGroupId) == seq) { + continue; + } + } + historicalSegments.add(segment); + } + return new WatermarkInductionResult(watermarks, historicalSegments); } /* diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java index 4d6a4aa8da0c..2c7dadbf5b38 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java @@ -31,6 +31,8 @@ public class WatermarkInductionResult { private List _watermarks; + private List _historicalSegments; + /** * The @JsonCreator annotation marks this constructor to be used for deserializing * a JSON array back into a WaterMarks object. @@ -38,8 +40,10 @@ public class WatermarkInductionResult { * @param watermarks The list of watermarks. */ @JsonCreator - public WatermarkInductionResult(@JsonProperty("watermarks") List watermarks) { + public WatermarkInductionResult(@JsonProperty("watermarks") List watermarks, + @JsonProperty("historicalSegments") List historicalSegments) { _watermarks = watermarks; + _historicalSegments = historicalSegments; } /** @@ -52,6 +56,11 @@ public List getWatermarks() { return _watermarks; } + @JsonGetter("historicalSegments") + public List getHistoricalSegments() { + return _historicalSegments; + } + /** * Represents a single watermark with its partitionGroupId, sequence, and offset. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java index 18e67c69a10a..5eedf78079db 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java @@ -40,7 +40,8 @@ public enum ControllerJobTypes implements ControllerJobType { RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, - TENANT_REBALANCE; + TENANT_REBALANCE, + TABLE_REPLICATION; private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJobTypes.class); private static final EnumMap ZK_NUM_JOBS_LIMIT = new EnumMap<>(ControllerJobTypes.class); @@ -55,6 +56,7 @@ public static void init(ControllerConf controllerConf) { ZK_NUM_JOBS_LIMIT.put(FORCE_COMMIT, controllerConf.getMaxForceCommitZkJobs()); ZK_NUM_JOBS_LIMIT.put(TABLE_REBALANCE, controllerConf.getMaxTableRebalanceZkJobs()); ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE, controllerConf.getMaxTenantRebalanceZkJobs()); + ZK_NUM_JOBS_LIMIT.put(TABLE_REPLICATION, controllerConf.getMaxTableReplicationZkJobs()); } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java new file mode 100644 index 000000000000..d82f156cae97 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/NoOpSegmentCopier.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Map; +import org.apache.pinot.controller.api.resources.CopyTablePayload; + +/** + * A no-op segment copier for testing purposes. + */ +public class NoOpSegmentCopier implements SegmentCopier { + + @Override + public void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata) { + // No-op + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java new file mode 100644 index 000000000000..52c99c550fe0 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopier.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copies a realtime segment from source to destination. + */ +public class RealtimeSegmentCopier implements SegmentCopier { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentCopier.class); + private static final String SEGMENT_UPLOAD_ENDPOINT_TEMPLATE = "/segments?tableName=%s"; + + private final String _destinationDeepStoreUri; + private final HttpClient _httpClient; + + public RealtimeSegmentCopier(ControllerConf controllerConf) { + this(controllerConf, HttpClient.getInstance()); + } + + public RealtimeSegmentCopier(ControllerConf controllerConf, HttpClient httpClient) { + _destinationDeepStoreUri = controllerConf.getDataDir(); + _httpClient = httpClient; + } + + + /** + * Copies a segment to the destination cluster. + * + * This method performs the following steps: + * 1. Get the source segment URI from ZK metadata. + * 2. Copy the segment from the source deep store to the destination deep store. + * 3. Upload the segment to the destination controller. + * + * @param tableNameWithType Table name with type suffix + * @param segmentName Segment name + * @param copyTablePayload Payload for copying a table + * @param segmentZKMetadata ZK metadata for the segment + */ + @Override + public void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata) { + if (!tableNameWithType.endsWith("_REALTIME")) { + throw new IllegalArgumentException("Table name must end with _REALTIME"); + } + String tableName = tableNameWithType.substring(0, tableNameWithType.lastIndexOf("_REALTIME")); + try { + // 1. Get the the source segment uri + String downloadUrl = segmentZKMetadata.get("segment.download.url"); + if (downloadUrl == null) { + throw new RuntimeException("Download URL not found in segment ZK metadata for segment: " + segmentName); + } + + // 2. Copy the segment from the source deep store to the destination deep store + String destSegmentUriStr = _destinationDeepStoreUri + "/" + tableName + "/" + segmentName; + LOGGER.info("[copyTable] Copying segment: {} from url: {} to destination: {}", segmentName, downloadUrl, + destSegmentUriStr); + URI sourceSegmentUri = new URI(downloadUrl); + URI destSegmentUri = new URI(destSegmentUriStr); + PinotFS sourcePinotFS = getPinotFS(sourceSegmentUri); + PinotFS destPinotFS = getPinotFS(destSegmentUri); + + // TODO: use local file system as an intermediate store to support different file system + if (sourcePinotFS != destPinotFS) { + throw new IllegalArgumentException("Copy files across different file system is not supported"); + } + + if (!destPinotFS.exists(destSegmentUri)) { + if (!destPinotFS.copy(sourceSegmentUri, destSegmentUri)) { + throw new RuntimeException("Failed to copy segment " + segmentName + " from " + downloadUrl + " to " + + destSegmentUriStr); + } + LOGGER.info("[copyTable] Copied segment {} from {} to {}", segmentName, sourceSegmentUri, destSegmentUri); + } else { + LOGGER.info("[copyTable] Segment {} already exists at destination {}", segmentName, destSegmentUri); + } + + // 3. Upload the segment to the destination controller + LOGGER.info("[copyTable] Uploading segment {} to destination controller", segmentName); + String dstControllerURIStr = copyTablePayload.getDestinationClusterUri(); + + // TODO: Refactor SegmentPushUtils.java and FileUploadDownloadClient to dedup code + RetryPolicies.exponentialBackoffRetryPolicy(1, 5000, 5).attempt(() -> { + try { + SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException( + _httpClient.sendRequest( + getSendSegmentUriRequest(dstControllerURIStr, destSegmentUriStr, + copyTablePayload.getDestinationClusterHeaders(), tableName, "REALTIME"), + HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)); + LOGGER.info("[copyTable] Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, + destSegmentUriStr, dstControllerURIStr, response.getStatusCode(), + response.getResponse()); + return true; + } catch (HttpErrorStatusException e) { + int statusCode = e.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("[copyTable] Caught temporary error when pushing table: {} segment uri: {} to {}, will retry", + tableName, destSegmentUriStr, dstControllerURIStr, e); + return false; + } else { + // Permanent exception + LOGGER.error("[copyTable] Caught permanent error when pushing table: {} segment uri: {} to {}, won't retry", + tableName, destSegmentUriStr, dstControllerURIStr, e); + throw e; + } + } + }); + } catch (Exception e) { + LOGGER.error("[copyTable] Caught exception while copying segment {}", segmentName, e); + throw new RuntimeException(e); + } + } + + static ClassicHttpRequest getSendSegmentUriRequest(String controllerUriStr, String downloadUri, + Map headers, String tableNameWithoutType, String tableType) throws URISyntaxException { + URI segmentPushURI = new URI(controllerUriStr + "/v2/segments?tableName=" + tableNameWithoutType + "&tableType=" + + tableType); + ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(segmentPushURI).setVersion(HttpVersion.HTTP_1_1) + .setHeader( + FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()) + .setHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, downloadUri) + .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE); + for (Map.Entry pair: headers.entrySet()) { + requestBuilder.setHeader(pair.getKey(), pair.getValue()); + } + return requestBuilder.build(); + } + + static String getScheme(URI uri) { + if (uri.getScheme() != null) { + return uri.getScheme(); + } + return PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + + PinotFS getPinotFS(URI uri) { + String scheme = getScheme(uri); + if (!PinotFSFactory.isSchemeSupported(scheme)) { + throw new IllegalArgumentException("File scheme " + scheme + " is not supported."); + } + return PinotFSFactory.create(scheme); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java new file mode 100644 index 000000000000..a656be2524e7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/SegmentCopier.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Map; +import org.apache.pinot.controller.api.resources.CopyTablePayload; + + +/** + * An interface to copy a segment to the destination cluster. + */ +public interface SegmentCopier { + + /** + * Copies a segment to the destination cluster. + * @param tableNameWithType Table name with type suffix + * @param segmentName Segment name + * @param copyTablePayload Payload for copying a table + * @param segmentZKMetadata ZK metadata for the segment + */ + void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload, + Map segmentZKMetadata); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java new file mode 100644 index 000000000000..88b22e250e51 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationObserver.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +public interface TableReplicationObserver { + + enum Trigger { + START_TRIGGER, + SEGMENT_REPLICATE_COMPLETED_TRIGGER, + SEGMENT_REPLICATE_ERRORED_TRIGGER, + } + + void onTrigger(Trigger trigger, String segmentName); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java new file mode 100644 index 000000000000..c2a223df3625 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStats.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.annotation.JsonGetter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Tracks the progress of table replication. + */ +public class TableReplicationProgressStats { + + public enum SegmentStatus { + COMPLETED, + ERROR, + } + + private final AtomicInteger _remainingSegments; + private final BlockingQueue _segmentsFailToCopy = new LinkedBlockingQueue<>(); + + public TableReplicationProgressStats(int segmentSize) { + _remainingSegments = new AtomicInteger(segmentSize); + } + + /** + * Updates the status of a segment and returns the number of remaining segments. + * @param segment The segment name. + * @param status The status of the segment replication. + * @return The number of remaining segments to be replicated. + */ + public int updateSegmentStatus(String segment, SegmentStatus status) { + if (status == SegmentStatus.ERROR) { + _segmentsFailToCopy.add(segment); + } + return _remainingSegments.addAndGet(-1); + } + + @JsonGetter("remainingSegments") + public int getRemainingSegments() { + return _remainingSegments.get(); + } + + @JsonGetter("segmentsFailToCopy") + public List getSegmentsFailToCopy() { + return new ArrayList<>(_segmentsFailToCopy); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java new file mode 100644 index 000000000000..7df558ee61bd --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/TableReplicator.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replicates a table from a source cluster to a destination cluster. + */ +public class TableReplicator { + private static final Logger LOGGER = LoggerFactory.getLogger(TableReplicator.class); + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final ExecutorService _executorService; + private final SegmentCopier _segmentCopier; + private final HttpClient _httpClient; + + public TableReplicator(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService, + SegmentCopier segmentCopier) { + this(pinotHelixResourceManager, executorService, segmentCopier, HttpClient.getInstance()); + } + + public TableReplicator(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService, + SegmentCopier segmentCopier, HttpClient httpClient) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _executorService = executorService; + _segmentCopier = segmentCopier; + _httpClient = httpClient; + } + + /** + * Replicates the table by copying segments from source to destination. + * + * This method performs the following steps: + * 1. Fetch ZK metadata for all segments of the table from the source cluster. + * 2. Register a new controller job in Zookeeper to track the replication progress. + * 3. Initialize a {@link ZkBasedTableReplicationObserver} to update the job status in Zookeeper. + * 4. Submit tasks to the executor service to copy segments in parallel. + * 5. Each task copies a segment and triggers the observer to update the progress. + * + * @param jobId The job ID. + * @param tableNameWithType The table name with type. + * @param copyTablePayload The payload containing the source and destination cluster information. + * @param res The watermark induction result. + * @throws Exception If an error occurs during replication. + */ + public void replicateTable(String jobId, String tableNameWithType, CopyTablePayload copyTablePayload, + WatermarkInductionResult res) + throws Exception { + // TODO: throw IllegalStateException if any previous jobs doesn't expire. + // TODO: replication job canceling mechanism + LOGGER.info("[copyTable] Start replicating table: {} with jobId: {}", tableNameWithType, jobId); + ControllerRequestURLBuilder urlBuilder = + ControllerRequestURLBuilder.baseUrl(copyTablePayload.getSourceClusterUri()); + URI zkMetadataUri = new URI(urlBuilder.forSegmentZkMetadata(tableNameWithType)); + SimpleHttpResponse zkMetadataResponse = HttpClient.wrapAndThrowHttpException( + _httpClient.sendGetRequest(zkMetadataUri, copyTablePayload.getHeaders())); + String zkMetadataJson = zkMetadataResponse.getResponse(); + Map> zkMetadataMap = + new ObjectMapper().readValue(zkMetadataJson, new TypeReference>>() { + }); + LOGGER.info("[copyTable] Fetched ZK metadata for {} segments", zkMetadataMap.size()); + + List segments = new ArrayList<>(res.getHistoricalSegments()); + long submitTS = System.currentTimeMillis(); + + if (!_pinotHelixResourceManager.addNewTableReplicationJob(tableNameWithType, jobId, submitTS, res)) { + throw new Exception("Failed to add segments to replicated table"); + } + ZkBasedTableReplicationObserver observer = new ZkBasedTableReplicationObserver(jobId, tableNameWithType, res, + _pinotHelixResourceManager); + observer.onTrigger(TableReplicationObserver.Trigger.START_TRIGGER, null); + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(segments); + int parallelism = copyTablePayload.getBackfillParallism() != null + ? copyTablePayload.getBackfillParallism() + : res.getWatermarks().size(); + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + while (true) { + String segment = q.poll(); + if (segment == null) { + break; + } + try { + LOGGER.info("[copyTable] Starting to copy segment: {} for table: {}", segment, tableNameWithType); + Map segmentZKMetadata = zkMetadataMap.get(segment); + if (segmentZKMetadata == null) { + throw new RuntimeException("Segment ZK metadata not found for segment: " + segment); + } + _segmentCopier.copy(tableNameWithType, segment, copyTablePayload, segmentZKMetadata); + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, segment); + } catch (Exception e) { + LOGGER.error("Caught exception while replicating table segment", e); + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_ERRORED_TRIGGER, segment); + } + } + }); + } + LOGGER.info("[copyTable] Submitted replication tasks to executor service for job: {}", jobId); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java new file mode 100644 index 000000000000..ccdb714ce9c0 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserver.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Observes the table replication progress and updates the status in Zookeeper. + */ +public class ZkBasedTableReplicationObserver implements TableReplicationObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableReplicationObserver.class); + + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final String _jobId; + private final String _tableNameWithType; + private final TableReplicationProgressStats _progressStats; + private final WatermarkInductionResult _res; + + public ZkBasedTableReplicationObserver(String jobId, String tableNameWithType, WatermarkInductionResult res, + PinotHelixResourceManager pinotHelixResourceManager) { + _jobId = jobId; + _tableNameWithType = tableNameWithType; + _res = res; + _pinotHelixResourceManager = pinotHelixResourceManager; + _progressStats = new TableReplicationProgressStats(res.getHistoricalSegments().size()); + } + + @Override + public void onTrigger(Trigger trigger, String segmentName) { + switch (trigger) { + // Table + case START_TRIGGER: + // in case of zero segments to be copied, track stats in ZK + trackStatsInZk(); + break; + case SEGMENT_REPLICATE_COMPLETED_TRIGGER: + // Update progress stats and track in ZK every 100 segments + int remaining = _progressStats.updateSegmentStatus(segmentName, + TableReplicationProgressStats.SegmentStatus.COMPLETED); + if (remaining % 100 == 0) { + trackStatsInZk(); + } + break; + case SEGMENT_REPLICATE_ERRORED_TRIGGER: + // Update progress stats and track in ZK immediately on error + _progressStats.updateSegmentStatus(segmentName, TableReplicationProgressStats.SegmentStatus.ERROR); + trackStatsInZk(); + break; + default: + } + } + + private void trackStatsInZk() { + LOGGER.info("[copyTable] Tracking replication stats in ZK for job: {}", _jobId); + try { + Map jobMetadata = _pinotHelixResourceManager + .commonTableReplicationJobMetadata(_tableNameWithType, _jobId, System.currentTimeMillis(), _res); + String progress = JsonUtils.objectToString(_progressStats); + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_PROGRESS, progress); + int remaining = JsonUtils.stringToObject(progress, JsonNode.class).get("remainingSegments").asInt(); + if (remaining == 0) { + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "COMPLETED"); + } else { + jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "IN_PROGRESS"); + } + _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION); + } catch (JsonProcessingException e) { + LOGGER.error("Error serialising replication stats to JSON for persisting to ZK {}", _jobId, e); + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java index abbfa71e3ace..29995023afb1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java @@ -36,8 +36,9 @@ public void testTweakRealtimeTableConfig() throws Exception { String brokerTenant = "testBroker"; String serverTenant = "testServer"; - CopyTablePayload copyTablePayload = new CopyTablePayload("http://localhost:9000", null, brokerTenant, - serverTenant, Map.of("server1_REALTIME", "testServer_REALTIME")); + CopyTablePayload copyTablePayload = new CopyTablePayload("http://localhost:9000", null, + "http://localhost:9000", null, brokerTenant, serverTenant, + Map.of("server1_REALTIME", "testServer_REALTIME"), null); PinotTableRestletResource.tweakRealtimeTableConfig(tableConfig, copyTablePayload); assertEquals(tableConfig.get("tenants").get("broker").asText(), brokerTenant); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index f4f876e4a553..101632df7843 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1642,6 +1642,15 @@ public void testGetConsumerWatermarks() helixAdminField.set(_helixResourceManager, spyHelixAdmin); IdealState idealState = new IdealState(realtimeTableName); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 0, 100, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "ONLINE"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 0, 101, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "CONSUMING"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 1, 199, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "ONLINE"); + idealState.setPartitionState(new LLCSegmentName(rawTableName, 1, 200, + System.currentTimeMillis()).getSegmentName(), SERVER_NAME_TAGGED, "CONSUMING"); + doReturn(idealState).when(spyHelixAdmin).getResourceIdealState(any(), eq(realtimeTableName)); // Test happy path @@ -1663,6 +1672,17 @@ public void testGetConsumerWatermarks() assertEquals(inProgressWatermark.getSequenceNumber(), 200L); assertEquals(inProgressWatermark.getOffset(), 789L); + List historicalSegments = waterMarkInductionResult.getHistoricalSegments(); + assertEquals(historicalSegments.size(), 2); + for (String segment : historicalSegments) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); + if (llcSegmentName.getPartitionGroupId() == 0) { + assertEquals(llcSegmentName.getSequenceNumber(), 100); + } else { + assertEquals(llcSegmentName.getSequenceNumber(), 199); + } + } + // recover the original values helixAdminField.set(_helixResourceManager, originalHelixAdmin); llcManagerField.set(_helixResourceManager, originalLlcManager); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java new file mode 100644 index 000000000000..09d973a6998b --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/RealtimeSegmentCopierTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RealtimeSegmentCopierTest { + + @Mock + private ControllerConf _controllerConf; + @Mock + private HttpClient _httpClient; + @Mock + private PinotFS _pinotFS; + + private RealtimeSegmentCopier _copier; + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + when(_controllerConf.getDataDir()).thenReturn("hdfs://data"); + _copier = spy(new RealtimeSegmentCopier(_controllerConf, _httpClient)); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testCopy() throws Exception { + String tableNameWithType = "table1_REALTIME"; + String segmentName = "seg1"; + CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + + Map metadata = new HashMap<>(); + metadata.put("segment.download.url", "hdfs://src/data/seg1"); + + doReturn(_pinotFS).when(_copier).getPinotFS(any(URI.class)); + when(_pinotFS.exists(any(URI.class))).thenReturn(false); + when(_pinotFS.copy(any(URI.class), any(URI.class))).thenReturn(true); + + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponse()).thenReturn("{}"); + doReturn(response).when(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); + + _copier.copy(tableNameWithType, segmentName, payload, metadata); + + verify(_pinotFS).copy(any(URI.class), any(URI.class)); + verify(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); + } + + @Test + public void testCopyExisting() throws Exception { + String tableNameWithType = "table1_REALTIME"; + String segmentName = "seg1"; + CopyTablePayload payload = new CopyTablePayload("http://src", Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + + Map metadata = new HashMap<>(); + metadata.put("segment.download.url", "hdfs://src/data/seg1"); + + doReturn(_pinotFS).when(_copier).getPinotFS(any(URI.class)); + when(_pinotFS.exists(any(URI.class))).thenReturn(true); + + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getStatusCode()).thenReturn(200); + when(response.getResponse()).thenReturn("{}"); + doReturn(response).when(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); + + _copier.copy(tableNameWithType, segmentName, payload, metadata); + + verify(_pinotFS, never()).copy(any(URI.class), any(URI.class)); + verify(_httpClient).sendRequest(any(ClassicHttpRequest.class), anyLong()); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java new file mode 100644 index 000000000000..4dcd259479ae --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicationProgressStatsTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TableReplicationProgressStatsTest { + + @Test + public void testStats() { + TableReplicationProgressStats stats = new TableReplicationProgressStats(10); + Assert.assertEquals(stats.getRemainingSegments(), 10); + Assert.assertTrue(stats.getSegmentsFailToCopy().isEmpty()); + + stats.updateSegmentStatus("seg1", TableReplicationProgressStats.SegmentStatus.COMPLETED); + Assert.assertEquals(stats.getRemainingSegments(), 9); + Assert.assertTrue(stats.getSegmentsFailToCopy().isEmpty()); + + stats.updateSegmentStatus("seg2", TableReplicationProgressStats.SegmentStatus.ERROR); + Assert.assertEquals(stats.getRemainingSegments(), 8); + List failedSegments = stats.getSegmentsFailToCopy(); + Assert.assertEquals(failedSegments.size(), 1); + Assert.assertEquals(failedSegments.get(0), "seg2"); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java new file mode 100644 index 000000000000..947ddcf29a22 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/TableReplicatorTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.resources.CopyTablePayload; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TableReplicatorTest { + + @Mock + private PinotHelixResourceManager _pinotHelixResourceManager; + @Mock + private ExecutorService _executorService; + @Mock + private SegmentCopier _segmentCopier; + @Mock + private HttpClient _httpClient; + + private TableReplicator _tableReplicator; + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + _tableReplicator = new TableReplicator(_pinotHelixResourceManager, _executorService, _segmentCopier, _httpClient); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testReplicateTable() throws Exception { + String jobId = "job1"; + String tableName = "table1_REALTIME"; + String sourceClusterUri = "http://localhost:9000"; + CopyTablePayload copyTablePayload = new CopyTablePayload(sourceClusterUri, Collections.emptyMap(), + "http://dest", Collections.emptyMap(), "broker", "server", Collections.emptyMap(), null); + + List watermarks = Arrays.asList( + new WatermarkInductionResult.Watermark(0, 10, 100L), + new WatermarkInductionResult.Watermark(1, 11, 110L)); + WatermarkInductionResult watermarkInductionResult = new WatermarkInductionResult(watermarks, + Arrays.asList("seg1", "seg2")); + + // Mock HttpClient response for ZK metadata + Map> zkMetadataMap = new HashMap<>(); + Map seg1Metadata = new HashMap<>(); + seg1Metadata.put("k1", "v1"); + zkMetadataMap.put("seg1", seg1Metadata); + + Map seg2Metadata = new HashMap<>(); + seg2Metadata.put("k2", "v2"); + zkMetadataMap.put("seg2", seg2Metadata); + + String zkMetadataJson = new ObjectMapper().writeValueAsString(zkMetadataMap); + SimpleHttpResponse response = mock(SimpleHttpResponse.class); + when(response.getResponse()).thenReturn(zkMetadataJson); + when(response.getStatusCode()).thenReturn(200); + + when(_httpClient.sendGetRequest(any(URI.class), anyMap())).thenReturn(response); + when(_pinotHelixResourceManager.addNewTableReplicationJob(anyString(), anyString(), anyLong(), + any(WatermarkInductionResult.class))).thenReturn(true); + + _tableReplicator.replicateTable(jobId, tableName, copyTablePayload, watermarkInductionResult); + + ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(WatermarkInductionResult.class); + verify(_pinotHelixResourceManager).addNewTableReplicationJob(eq(tableName), eq(jobId), anyLong(), + resultCaptor.capture()); + + List capturedSegments = resultCaptor.getValue().getHistoricalSegments(); + Assert.assertEquals(capturedSegments.size(), 2); + Assert.assertTrue(capturedSegments.contains("seg1")); + Assert.assertTrue(capturedSegments.contains("seg2")); + + verify(_executorService, times(watermarks.size())).submit(any(Runnable.class)); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java new file mode 100644 index 000000000000..4dec37ef0bb3 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/replication/ZkBasedTableReplicationObserverTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.replication; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.WatermarkInductionResult; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ZkBasedTableReplicationObserverTest { + + @Mock + private PinotHelixResourceManager _pinotHelixResourceManager; + + private AutoCloseable _mocks; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testObserver() throws Exception { + String jobId = "job1"; + String tableName = "table1"; + List segments = Arrays.asList("seg1", "seg2", "seg3"); + WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); + + Map baseMetadata = new HashMap<>(); + baseMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + baseMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableName); + when(_pinotHelixResourceManager.commonTableReplicationJobMetadata(eq(tableName), eq(jobId), anyLong(), + eq(res))).thenReturn(baseMetadata); + + ZkBasedTableReplicationObserver observer = + new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); + + // Trigger completion (1st segment) - no ZK update (only every 100 or error) + // Total 3. remaining starts at 3. + // complete seg1 -> remaining 2. 2 % 100 != 0. + // complete seg2 -> remaining 1. + // complete seg3 -> remaining 0. 0 % 100 == 0 -> ZK update. + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg1"); + verify(_pinotHelixResourceManager, never()).addControllerJobToZK(anyString(), anyMap(), any()); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg2"); + verify(_pinotHelixResourceManager, never()).addControllerJobToZK(anyString(), anyMap(), any()); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_COMPLETED_TRIGGER, "seg3"); + + ArgumentCaptor> metadataCaptor = ArgumentCaptor.forClass(Map.class); + verify(_pinotHelixResourceManager).addControllerJobToZK(eq(jobId), metadataCaptor.capture(), + eq(ControllerJobTypes.TABLE_REPLICATION)); + + Map metadata = metadataCaptor.getValue(); + Assert.assertEquals(metadata.get(CommonConstants.ControllerJob.JOB_ID), jobId); + Assert.assertEquals(metadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE), tableName); + } + + @Test + public void testObserverError() throws Exception { + String jobId = "job1"; + String tableName = "table1"; + List segments = Arrays.asList("seg1"); + WatermarkInductionResult res = new WatermarkInductionResult(Collections.emptyList(), segments); + + Map baseMetadata = new HashMap<>(); + baseMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + baseMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableName); + when(_pinotHelixResourceManager.commonTableReplicationJobMetadata(eq(tableName), eq(jobId), anyLong(), + eq(res))).thenReturn(baseMetadata); + + ZkBasedTableReplicationObserver observer = + new ZkBasedTableReplicationObserver(jobId, tableName, res, _pinotHelixResourceManager); + + observer.onTrigger(TableReplicationObserver.Trigger.SEGMENT_REPLICATE_ERRORED_TRIGGER, "seg1"); + + verify(_pinotHelixResourceManager).addControllerJobToZK(eq(jobId), anyMap(), + eq(ControllerJobTypes.TABLE_REPLICATION)); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index b8fc93363dfd..2c4108dbfd3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -105,6 +105,7 @@ public static class Cluster { public static final String DELETE_QUERY_WORKLOAD_CONFIG = "DeleteQueryWorkloadConfig"; public static final String GET_GROOVY_STATIC_ANALYZER_CONFIG = "GetGroovyStaticAnalyzerConfig"; public static final String UPDATE_GROOVY_STATIC_ANALYZER_CONFIG = "UpdateGroovyStaticAnalyzerConfig"; + public static final String GET_TABLE_COPY_STATUS = "GetTableCopyStatus"; } // Action names for table diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 8a71c5682882..8e1497ad1712 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1635,6 +1635,11 @@ public static class ControllerJob { public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted"; public static final String NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED = "numberOfSegmentsYetToBeCommitted"; + // Table Replication job props + public static final String SEGMENTS_TO_BE_COPIED = "segmentsToBeCopied"; + public static final String CONSUMER_WATERMARKS = "consumerWatermarks"; + public static final String REPLICATION_PROGRESS = "replicationProgress"; + public static final String REPLICATION_JOB_STATUS = "replicationJobStatus"; } // prefix for scheduler related features, e.g. query accountant diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index a7fa09019824..af525911b4f5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -468,6 +468,10 @@ public String forSegmentMetadata(String tableName, TableType tableType) { return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") + "?type=" + tableType.name(); } + public String forSegmentZkMetadata(String tableNameWithType) { + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, "zkmetadata"); + } + public String forListAllSegmentLineages(String tableName, String tableType) { return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType); }