1515 */
1616package com .google .cloud .bigtable .data .v2 .stub ;
1717
18+ import static com .google .cloud .bigtable .data .v2 .stub .metrics .Util .extractStatus ;
19+
1820import com .google .api .gax .rpc .ApiCallContext ;
1921import com .google .api .gax .rpc .DeadlineExceededException ;
2022import com .google .api .gax .rpc .ResourceExhaustedException ;
@@ -69,6 +71,8 @@ class RateLimitingServerStreamingCallable
6971
7072 private final ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ;
7173
74+ private BigtableTracer bigtableTracer ;
75+
7276 RateLimitingServerStreamingCallable (
7377 @ Nonnull ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ) {
7478 this .limiter = new ConditionalRateLimiter (DEFAULT_QPS );
@@ -84,8 +88,8 @@ public void call(
8488 limiter .acquire ();
8589 stopwatch .stop ();
8690 if (context .getTracer () instanceof BigtableTracer ) {
87- (( BigtableTracer ) context .getTracer ())
88- .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
91+ bigtableTracer = ( BigtableTracer ) context .getTracer ();
92+ bigtableTracer .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
8993 }
9094 RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver (responseObserver );
9195 innerCallable .call (request , innerObserver , context );
@@ -104,7 +108,10 @@ static class ConditionalRateLimiter {
104108
105109 public ConditionalRateLimiter (long defaultQps ) {
106110 limiter = RateLimiter .create (defaultQps );
107- logger .info ("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS." );
111+ logger .info (
112+ "Batch write flow control: rate limiter is initiated (but disabled) with rate of "
113+ + defaultQps
114+ + " QPS." );
108115 }
109116
110117 /**
@@ -128,7 +135,7 @@ public void tryDisable() {
128135 if (now .isAfter (nextTime )) {
129136 boolean wasEnabled = this .enabled .getAndSet (false );
130137 if (wasEnabled ) {
131- logger .info ("Rate limiter is disabled." );
138+ logger .info ("Batch write flow control: rate limiter is disabled." );
132139 }
133140 // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
134141 // update the rate again.
@@ -139,7 +146,7 @@ public void tryDisable() {
139146 public void enable () {
140147 boolean wasEnabled = this .enabled .getAndSet (true );
141148 if (!wasEnabled ) {
142- logger .info ("Rate limiter is enabled." );
149+ logger .info ("Batch write flow control: rate limiter is enabled." );
143150 }
144151 }
145152
@@ -158,31 +165,49 @@ public double getRate() {
158165 * @param rate The new rate of the rate limiter.
159166 * @param period The period during which rate should not be updated again and the rate limiter
160167 * should not be disabled.
168+ * @param bigtableTracer The tracer for exporting client-side metrics.
161169 */
162- public void trySetRate (double rate , Duration period ) {
170+ public void trySetRate (
171+ double rate ,
172+ Duration period ,
173+ BigtableTracer bigtableTracer ,
174+ double factor ,
175+ String statusString ) {
163176 Instant nextTime = nextRateUpdateTime .get ();
164177 Instant now = Instant .now ();
165178
166179 if (now .isBefore (nextTime )) {
180+ if (bigtableTracer != null ) {
181+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , false );
182+ }
167183 return ;
168184 }
169185
170186 Instant newNextTime = now .plusSeconds (period .getSeconds ());
171187
172188 if (!nextRateUpdateTime .compareAndSet (nextTime , newNextTime )) {
173189 // Someone else updated it already.
190+ if (bigtableTracer != null ) {
191+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , false );
192+ }
174193 return ;
175194 }
176195 final double oldRate = limiter .getRate ();
177196 limiter .setRate (rate );
178197 logger .info (
179- "Updated max rate from "
198+ "Batch write flow control: updated max rate from "
180199 + oldRate
181200 + " to "
182201 + rate
202+ + " applied factor "
203+ + factor
183204 + " with period "
184205 + period .getSeconds ()
185206 + " seconds." );
207+ if (bigtableTracer != null ) {
208+ bigtableTracer .setBatchWriteFlowControlTargetQps (rate );
209+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , true );
210+ }
186211 }
187212
188213 @ VisibleForTesting
@@ -215,17 +240,21 @@ private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
215240 // have presence even thought it's marked as "optional". Check the factor and
216241 // period to make sure they're not 0.
217242 if (!response .hasRateLimitInfo ()) {
218- logger .finest ("Response carries no RateLimitInfo" );
243+ logger .finest ("Batch write flow control: response carries no RateLimitInfo" );
219244 return false ;
220245 }
221246
222247 if (response .getRateLimitInfo ().getFactor () <= 0
223248 || response .getRateLimitInfo ().getPeriod ().getSeconds () <= 0 ) {
224- logger .finest ("Response carries invalid RateLimitInfo=" + response .getRateLimitInfo ());
249+ logger .finest (
250+ "Batch write flow control: response carries invalid RateLimitInfo="
251+ + response .getRateLimitInfo ());
225252 return false ;
226253 }
227254
228- logger .finest ("Response carries valid RateLimitInfo=" + response .getRateLimitInfo ());
255+ logger .finest (
256+ "Batch write flow control: response carries valid RateLimitInfo="
257+ + response .getRateLimitInfo ());
229258 return true ;
230259 }
231260
@@ -236,7 +265,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
236265 RateLimitInfo info = response .getRateLimitInfo ();
237266 updateQps (
238267 info .getFactor (),
239- Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
268+ Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())),
269+ extractStatus (null ));
240270 } else {
241271 limiter .tryDisable ();
242272 }
@@ -250,7 +280,15 @@ protected void onErrorImpl(Throwable t) {
250280 if (t instanceof DeadlineExceededException
251281 || t instanceof UnavailableException
252282 || t instanceof ResourceExhaustedException ) {
253- updateQps (MIN_FACTOR , DEFAULT_PERIOD );
283+ logger .info (
284+ "Batch write flow control: received error "
285+ + extractStatus (t )
286+ + " applying min factor "
287+ + MIN_FACTOR
288+ + " with period "
289+ + DEFAULT_PERIOD
290+ + " seconds." );
291+ updateQps (MIN_FACTOR , DEFAULT_PERIOD , extractStatus (t ));
254292 }
255293 outerObserver .onError (t );
256294 }
@@ -260,11 +298,11 @@ protected void onCompleteImpl() {
260298 outerObserver .onComplete ();
261299 }
262300
263- private void updateQps (double factor , Duration period ) {
301+ private void updateQps (double factor , Duration period , String statusString ) {
264302 double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
265303 double currentRate = limiter .getRate ();
266304 double cappedRate = Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS );
267- limiter .trySetRate (cappedRate , period );
305+ limiter .trySetRate (cappedRate , period , bigtableTracer , cappedFactor , statusString );
268306 }
269307 }
270308
0 commit comments