Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.metrics.Util.extractStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this import anymore? You can run mvn com.spotify.fmt:fmt-maven-plugin:format and that should clean up the unused imports.


import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.ResourceExhaustedException;
Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {
Expand Down Expand Up @@ -69,6 +72,8 @@ class RateLimitingServerStreamingCallable

private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

private BigtableTracer bigtableTracer;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
Expand All @@ -84,8 +89,8 @@ public void call(
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
bigtableTracer = (BigtableTracer) context.getTracer();
bigtableTracer.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
Expand All @@ -104,7 +109,10 @@ static class ConditionalRateLimiter {

public ConditionalRateLimiter(long defaultQps) {
limiter = RateLimiter.create(defaultQps);
logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS.");
logger.info(
"Batch write flow control: rate limiter is initiated (but disabled) with rate of "
+ defaultQps
+ " QPS.");
}

/**
Expand All @@ -128,7 +136,7 @@ public void tryDisable() {
if (now.isAfter(nextTime)) {
boolean wasEnabled = this.enabled.getAndSet(false);
if (wasEnabled) {
logger.info("Rate limiter is disabled.");
logger.info("Batch write flow control: rate limiter is disabled.");
}
// No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
// update the rate again.
Expand All @@ -139,7 +147,7 @@ public void tryDisable() {
public void enable() {
boolean wasEnabled = this.enabled.getAndSet(true);
if (!wasEnabled) {
logger.info("Rate limiter is enabled.");
logger.info("Batch write flow control: rate limiter is enabled.");
}
}

Expand All @@ -158,31 +166,52 @@ public double getRate() {
* @param rate The new rate of the rate limiter.
* @param period The period during which rate should not be updated again and the rate limiter
* should not be disabled.
* @param bigtableTracer The tracer for exporting client-side metrics.
* @param factor The capped factor that we're trying to apply.
* @param status The status of the response from which the factor is retrieved or derived.
*/
public void trySetRate(double rate, Duration period) {
public void trySetRate(
double rate,
Duration period,
@Nullable BigtableTracer bigtableTracer,
double factor,
@Nullable Throwable status) {
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();

if (now.isBefore(nextTime)) {
if (bigtableTracer != null) {
bigtableTracer.addBatchWriteFlowControlFactor(factor, status, false);
}
return;
}

Instant newNextTime = now.plusSeconds(period.getSeconds());

if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
// Someone else updated it already.
if (bigtableTracer != null) {
bigtableTracer.addBatchWriteFlowControlFactor(factor, status, false);
}
return;
}
final double oldRate = limiter.getRate();
limiter.setRate(rate);
logger.info(
"Updated max rate from "
"Batch write flow control: updated max rate from "
+ oldRate
+ " to "
+ rate
+ " applied factor "
+ factor
+ " with period "
+ period.getSeconds()
+ " seconds.");
+ " seconds. Status="
+ extractStatus(status));
if (bigtableTracer != null) {
bigtableTracer.setBatchWriteFlowControlTargetQps(rate);
bigtableTracer.addBatchWriteFlowControlFactor(factor, status, true);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -215,17 +244,21 @@ private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (!response.hasRateLimitInfo()) {
logger.finest("Response carries no RateLimitInfo");
logger.finest("Batch write flow control: response carries no RateLimitInfo");
return false;
}

if (response.getRateLimitInfo().getFactor() <= 0
|| response.getRateLimitInfo().getPeriod().getSeconds() <= 0) {
logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo());
logger.finest(
"Batch write flow control: response carries invalid RateLimitInfo="
+ response.getRateLimitInfo());
return false;
}

logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo());
logger.finest(
"Batch write flow control: response carries valid RateLimitInfo="
+ response.getRateLimitInfo());
return true;
}

Expand All @@ -236,7 +269,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
RateLimitInfo info = response.getRateLimitInfo();
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())),
null);
} else {
limiter.tryDisable();
}
Expand All @@ -250,7 +284,7 @@ protected void onErrorImpl(Throwable t) {
if (t instanceof DeadlineExceededException
|| t instanceof UnavailableException
|| t instanceof ResourceExhaustedException) {
updateQps(MIN_FACTOR, DEFAULT_PERIOD);
updateQps(MIN_FACTOR, DEFAULT_PERIOD, t);
}
outerObserver.onError(t);
}
Expand All @@ -260,11 +294,11 @@ protected void onCompleteImpl() {
outerObserver.onComplete();
}

private void updateQps(double factor, Duration period) {
private void updateQps(double factor, Duration period, @Nullable Throwable status) {
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
limiter.trySetRate(cappedRate, period);
limiter.trySetRate(cappedRate, period, bigtableTracer, cappedFactor, status);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
Expand Down Expand Up @@ -115,4 +116,25 @@ public void grpcMessageSent() {
public void setTotalTimeoutDuration(Duration totalTimeoutDuration) {
// noop
}

/**
* Record the target QPS for batch write flow control.
*
* @param targetQps The new target QPS for the client.
*/
@InternalApi
public void setBatchWriteFlowControlTargetQps(double targetQps) {}

/**
* Record the factors received from server-side for batch write flow control. The factors are
* capped by min and max allowed factor values. Status and whether the factor was actually applied
* are also recorded.
*
* @param factor Capped factor from server-side. For non-OK response, min factor is used.
* @param status The status of the response from which the factor is retrieved or derived.
* @param applied Whether the factor was actually applied.
*/
@InternalApi
public void addBatchWriteFlowControlFactor(
double factor, @Nullable Throwable status, boolean applied) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also, no import *

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** Defining Bigtable builit-in metrics scope, attributes, metric names and views. */
@InternalApi
Expand All @@ -49,6 +47,7 @@ public class BuiltinMetricsConstants {
static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
static final AttributeKey<Boolean> APPLIED_KEY = AttributeKey.booleanKey("applied");

static final AttributeKey<String> TRANSPORT_TYPE = AttributeKey.stringKey("transport_type");
static final AttributeKey<String> TRANSPORT_REGION = AttributeKey.stringKey("transport_region");
Expand Down Expand Up @@ -95,6 +94,9 @@ public class BuiltinMetricsConstants {
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
static final String OUTSTANDING_RPCS_PER_CHANNEL_NAME = "connection_pool/outstanding_rpcs";
static final String BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME =
"batch_write_flow_control_target_qps";
static final String BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME = "batch_write_flow_control_factor";

// Start allow list of metrics that will be exported as internal
public static final Map<String, Set<String>> GRPC_METRICS =
Expand Down Expand Up @@ -210,6 +212,8 @@ public class BuiltinMetricsConstants {
70.0, 75.0, 80.0, 85.0, 90.0, 95.0, 100.0, 105.0, 110.0, 115.0, 120.0, 125.0, 130.0,
135.0, 140.0, 145.0, 150.0, 155.0, 160.0, 165.0, 170.0, 175.0, 180.0, 185.0, 190.0,
195.0, 200.0));
private static final Aggregation AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM =
Aggregation.explicitBucketHistogram(ImmutableList.of(0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3));

static final Set<AttributeKey> COMMON_ATTRIBUTES =
ImmutableSet.of(
Expand All @@ -225,7 +229,7 @@ public class BuiltinMetricsConstants {
static void defineView(
ImmutableMap.Builder<InstrumentSelector, View> viewMap,
String id,
Aggregation aggregation,
@Nullable Aggregation aggregation,
InstrumentType type,
String unit,
Set<AttributeKey> attributes) {
Expand All @@ -242,14 +246,12 @@ static void defineView(
COMMON_ATTRIBUTES.stream().map(AttributeKey::getKey).collect(Collectors.toSet()))
.addAll(attributes.stream().map(AttributeKey::getKey).collect(Collectors.toSet()))
.build();
View view =
View.builder()
.setName(METER_NAME + id)
.setAggregation(aggregation)
.setAttributeFilter(attributesFilter)
.build();

viewMap.put(selector, view);
ViewBuilder viewBuilder =
View.builder().setName(METER_NAME + id).setAttributeFilter(attributesFilter);
if (aggregation != null) {
viewBuilder.setAggregation(aggregation);
}
viewMap.put(selector, viewBuilder.build());
}

// uses cloud.BigtableClient schema
Expand Down Expand Up @@ -367,7 +369,23 @@ public static Map<InstrumentSelector, View> getAllViews() {
.addAll(COMMON_ATTRIBUTES)
.add(STREAMING_KEY, STATUS_KEY)
.build());

defineView(
views,
BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME,
null,
InstrumentType.GAUGE,
"1",
ImmutableSet.<AttributeKey>builder().addAll(COMMON_ATTRIBUTES).build());
defineView(
views,
BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME,
AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM,
InstrumentType.HISTOGRAM,
"1",
ImmutableSet.<AttributeKey>builder()
.addAll(COMMON_ATTRIBUTES)
.add(STATUS_KEY, APPLIED_KEY)
.build());
return views.build();
}
}
Loading
Loading