Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceChecker;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.replication.RealtimeSegmentCopier;
import org.apache.pinot.controller.helix.core.replication.TableReplicator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
Expand Down Expand Up @@ -218,6 +220,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected TaskMetricsEmitter _taskMetricsEmitter;
protected PoolingHttpClientConnectionManager _connectionManager;
protected TenantRebalancer _tenantRebalancer;
protected TableReplicator _tableReplicator;
// This executor should be used by all code paths for user initiated rebalances, so that the controller config
// CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored.
protected ExecutorService _rebalancerExecutorService;
Expand Down Expand Up @@ -617,6 +620,7 @@ private void setUpPinotController() {
_rebalancerExecutorService);
_tenantRebalancer =
new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, _rebalancerExecutorService);
_tableReplicator = new TableReplicator(_helixResourceManager, _executorService, new RealtimeSegmentCopier(_config));

// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
Expand Down Expand Up @@ -678,6 +682,7 @@ protected void configure() {
bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
bind(_tenantRebalancer).to(TenantRebalancer.class);
bind(_tableReplicator).to(TableReplicator.class);
bind(_tableSizeReader).to(TableSizeReader.class);
bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ public static long getRandomInitialDelayInSeconds() {
public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK = "controller.tenant.rebalance.maxJobsInZK";
public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK = "controller.reload.segment.maxJobsInZK";
public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK = "controller.force.commit.maxJobsInZK";
public static final String CONFIG_OF_MAX_TABLE_REPLICATION_JOBS_IN_ZK = "controller.table.replication.maxJobsInZK";

private final Map<String, String> _invalidConfigs = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -1401,4 +1402,8 @@ public List<String> getTimeseriesLanguages() {
public boolean getSegmentCompletionGroupCommitEnabled() {
return getProperty(CONTROLLER_SEGMENT_COMPLETION_GROUP_COMMIT_ENABLED, true);
}

public int getMaxTableReplicationZkJobs() {
return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@
import java.util.Map;
import javax.annotation.Nullable;

/**
* Payload for the copy table request.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CopyTablePayload {

private String _sourceClusterUri;
private Map<String, String> _headers;

private String _destinationClusterUri;
private Map<String, String> _destinationClusterHeaders;
/**
* Broker tenant for the new table.
* MUST NOT contain the tenant type suffix, i.e. _BROKER.
Expand All @@ -40,6 +46,7 @@ public class CopyTablePayload {
* MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
*/
private String _serverTenant;
private Integer _backfillParallism;

/**
* The instanceAssignmentConfig's tagPoolConfig contains full tenant name. We will use this field to let user specify
Expand All @@ -51,14 +58,20 @@ public class CopyTablePayload {
public CopyTablePayload(
@JsonProperty(value = "sourceClusterUri", required = true) String sourceClusterUri,
@JsonProperty("sourceClusterHeaders") Map<String, String> headers,
@JsonProperty(value = "destinationClusterUri", required = true) String destinationClusterUri,
@JsonProperty(value = "destinationClusterHeaders") Map<String, String> destinationClusterHeaders,
@JsonProperty(value = "brokerTenant", required = true) String brokerTenant,
@JsonProperty(value = "serverTenant", required = true) String serverTenant,
@JsonProperty("tagPoolReplacementMap") @Nullable Map<String, String> tagPoolReplacementMap) {
@JsonProperty("tagPoolReplacementMap") @Nullable Map<String, String> tagPoolReplacementMap,
@JsonProperty("backfillParallism") @Nullable Integer backfillParallism) {
_sourceClusterUri = sourceClusterUri;
_headers = headers;
_destinationClusterUri = destinationClusterUri;
_destinationClusterHeaders = destinationClusterHeaders;
_brokerTenant = brokerTenant;
_serverTenant = serverTenant;
_tagPoolReplacementMap = tagPoolReplacementMap;
_backfillParallism = backfillParallism;
}

@JsonGetter("sourceClusterUri")
Expand All @@ -71,6 +84,16 @@ public Map<String, String> getHeaders() {
return _headers;
}

@JsonGetter("destinationClusterUri")
public String getDestinationClusterUri() {
return _destinationClusterUri;
}

@JsonGetter("destinationClusterHeaders")
public Map<String, String> getDestinationClusterHeaders() {
return _destinationClusterHeaders;
}

@JsonGetter("brokerTenant")
public String getBrokerTenant() {
return _brokerTenant;
Expand All @@ -81,6 +104,11 @@ public String getServerTenant() {
return _serverTenant;
}

@JsonGetter("backfillParallism")
public Integer getBackfillParallism() {
return _backfillParallism;
}

@JsonGetter("tagPoolReplacementMap")
public Map<String, String> getTagPoolReplacementMap() {
return _tagPoolReplacementMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.replication.TableReplicator;
import org.apache.pinot.controller.recommender.RecommenderDriver;
import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
import org.apache.pinot.controller.util.CompletionServiceHelper;
Expand Down Expand Up @@ -205,6 +207,9 @@ public class PinotTableRestletResource {
@Inject
HttpClientConnectionManager _connectionManager;

@Inject
TableReplicator _tableReplicator;

/**
* API to create a table. Before adding, validations will be done (min number of replicas, checking offline and
* realtime table configs match, checking for tenants existing).
Expand Down Expand Up @@ -382,6 +387,9 @@ public CopyTableResponse copyTable(
response.setTableConfig(realtimeTableConfig);
response.setWatermarkInductionResult(watermarkInductionResult);
}
String jobID = UUID.randomUUID().toString();
_tableReplicator.replicateTable(jobID, realtimeTableConfig.getTableName(), copyTablePayload,
watermarkInductionResult);
return response;
} catch (Exception e) {
LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
Expand Down Expand Up @@ -425,6 +433,18 @@ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, CopyTab
}
}

@GET
@Path("/tables/copyStatus/{jobId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TABLE_COPY_STATUS)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get status for a submitted table replication job",
notes = "Get status for a submitted table replication job")
public JsonNode getForceCommitJobStatus(
@ApiParam(value = "job id", required = true) @PathParam("jobId") String id) {
return JsonUtils.objectToJsonNode(
_pinotHelixResourceManager.getControllerJobZKMetadata(id, ControllerJobTypes.TABLE_REPLICATION));
}

@PUT
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/recommender")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,29 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long
return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.FORCE_COMMIT);
}

public boolean addNewTableReplicationJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs,
WatermarkInductionResult res)
throws JsonProcessingException {
Map<String, String> jobMetadata =
commonTableReplicationJobMetadata(tableNameWithType, jobId, jobSubmissionTimeMs, res);
return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.TABLE_REPLICATION);
}

public Map<String, String> commonTableReplicationJobMetadata(String tableNameWithType, String jobId,
long jobSubmissionTimeMs, WatermarkInductionResult res)
throws JsonProcessingException {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REPLICATION.name());
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.SEGMENTS_TO_BE_COPIED,
Integer.toString(res.getHistoricalSegments().size()));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMER_WATERMARKS, JsonUtils.objectToString(res.getWatermarks()));
jobMetadata.put(CommonConstants.ControllerJob.REPLICATION_JOB_STATUS, "IN_PROGRESS");
return jobMetadata;
}

/**
* Adds a new job metadata for controller job like table rebalance or reload into ZK
* @param jobId job's UUID
Expand Down Expand Up @@ -4931,7 +4954,23 @@ public WatermarkInductionResult getConsumerWatermarks(String tableName) throws T
}
return new WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq, startOffset);
}).collect(Collectors.toList());
return new WatermarkInductionResult(watermarks);

Map<Integer, Integer> partGroupToLatestSeq = watermarks.stream().collect(
Collectors.toMap(WatermarkInductionResult.Watermark::getPartitionGroupId,
WatermarkInductionResult.Watermark::getSequenceNumber));
List<String> historicalSegments = new ArrayList<>();
for (String segment : idealState.getRecord().getMapFields().keySet()) {
LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
if (llcSegmentName != null) {
int partitionGroupId = llcSegmentName.getPartitionGroupId();
int seq = llcSegmentName.getSequenceNumber();
if (partGroupToLatestSeq.containsKey(partitionGroupId) && partGroupToLatestSeq.get(partitionGroupId) == seq) {
continue;
}
}
historicalSegments.add(segment);
}
return new WatermarkInductionResult(watermarks, historicalSegments);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ public class WatermarkInductionResult {

private List<Watermark> _watermarks;

private List<String> _historicalSegments;

/**
* The @JsonCreator annotation marks this constructor to be used for deserializing
* a JSON array back into a WaterMarks object.
*
* @param watermarks The list of watermarks.
*/
@JsonCreator
public WatermarkInductionResult(@JsonProperty("watermarks") List<Watermark> watermarks) {
public WatermarkInductionResult(@JsonProperty("watermarks") List<Watermark> watermarks,
@JsonProperty("historicalSegments") List<String> historicalSegments) {
_watermarks = watermarks;
_historicalSegments = historicalSegments;
}

/**
Expand All @@ -52,6 +56,11 @@ public List<Watermark> getWatermarks() {
return _watermarks;
}

@JsonGetter("historicalSegments")
public List<String> getHistoricalSegments() {
return _historicalSegments;
}

/**
* Represents a single watermark with its partitionGroupId, sequence, and offset.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum ControllerJobTypes implements ControllerJobType {
RELOAD_SEGMENT,
FORCE_COMMIT,
TABLE_REBALANCE,
TENANT_REBALANCE;
TENANT_REBALANCE,
TABLE_REPLICATION;

private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJobTypes.class);
private static final EnumMap<ControllerJobTypes, Integer> ZK_NUM_JOBS_LIMIT = new EnumMap<>(ControllerJobTypes.class);
Expand All @@ -55,6 +56,7 @@ public static void init(ControllerConf controllerConf) {
ZK_NUM_JOBS_LIMIT.put(FORCE_COMMIT, controllerConf.getMaxForceCommitZkJobs());
ZK_NUM_JOBS_LIMIT.put(TABLE_REBALANCE, controllerConf.getMaxTableRebalanceZkJobs());
ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE, controllerConf.getMaxTenantRebalanceZkJobs());
ZK_NUM_JOBS_LIMIT.put(TABLE_REPLICATION, controllerConf.getMaxTableReplicationZkJobs());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.replication;

import java.util.Map;
import org.apache.pinot.controller.api.resources.CopyTablePayload;

/**
* A no-op segment copier for testing purposes.
*/
public class NoOpSegmentCopier implements SegmentCopier {

@Override
public void copy(String tableNameWithType, String segmentName, CopyTablePayload copyTablePayload,
Map<String, String> segmentZKMetadata) {
// No-op
}
}
Loading
Loading