1515 */
1616package com .google .cloud .bigtable .data .v2 .stub ;
1717
18+ import static com .google .cloud .bigtable .data .v2 .stub .metrics .Util .extractStatus ;
19+
1820import com .google .api .gax .rpc .ApiCallContext ;
1921import com .google .api .gax .rpc .DeadlineExceededException ;
2022import com .google .api .gax .rpc .ResourceExhaustedException ;
@@ -69,6 +71,8 @@ class RateLimitingServerStreamingCallable
6971
7072 private final ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ;
7173
74+ private BigtableTracer bigtableTracer ;
75+
7276 RateLimitingServerStreamingCallable (
7377 @ Nonnull ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ) {
7478 this .limiter = new ConditionalRateLimiter (DEFAULT_QPS );
@@ -84,8 +88,8 @@ public void call(
8488 limiter .acquire ();
8589 stopwatch .stop ();
8690 if (context .getTracer () instanceof BigtableTracer ) {
87- (( BigtableTracer ) context .getTracer ())
88- .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
91+ bigtableTracer = ( BigtableTracer ) context .getTracer ();
92+ bigtableTracer .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
8993 }
9094 RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver (responseObserver );
9195 innerCallable .call (request , innerObserver , context );
@@ -104,7 +108,10 @@ static class ConditionalRateLimiter {
104108
105109 public ConditionalRateLimiter (long defaultQps ) {
106110 limiter = RateLimiter .create (defaultQps );
107- logger .info ("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS." );
111+ logger .info (
112+ "Batch write flow control: rate limiter is initiated (but disabled) with rate of "
113+ + defaultQps
114+ + " QPS." );
108115 }
109116
110117 /**
@@ -128,7 +135,7 @@ public void tryDisable() {
128135 if (now .isAfter (nextTime )) {
129136 boolean wasEnabled = this .enabled .getAndSet (false );
130137 if (wasEnabled ) {
131- logger .info ("Rate limiter is disabled." );
138+ logger .info ("Batch write flow control: rate limiter is disabled." );
132139 }
133140 // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
134141 // update the rate again.
@@ -139,7 +146,7 @@ public void tryDisable() {
139146 public void enable () {
140147 boolean wasEnabled = this .enabled .getAndSet (true );
141148 if (!wasEnabled ) {
142- logger .info ("Rate limiter is enabled." );
149+ logger .info ("Batch write flow control: rate limiter is enabled." );
143150 }
144151 }
145152
@@ -158,31 +165,50 @@ public double getRate() {
158165 * @param rate The new rate of the rate limiter.
159166 * @param period The period during which rate should not be updated again and the rate limiter
160167 * should not be disabled.
168+ * @param bigtableTracer The tracer for exporting client-side metrics.
161169 */
162- public void trySetRate (double rate , Duration period ) {
170+ public void trySetRate (
171+ double rate ,
172+ Duration period ,
173+ BigtableTracer bigtableTracer ,
174+ double factor ,
175+ String statusString ) {
163176 Instant nextTime = nextRateUpdateTime .get ();
164177 Instant now = Instant .now ();
165178
166179 if (now .isBefore (nextTime )) {
180+ if (bigtableTracer != null ) {
181+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , false );
182+ }
167183 return ;
168184 }
169185
170186 Instant newNextTime = now .plusSeconds (period .getSeconds ());
171187
172188 if (!nextRateUpdateTime .compareAndSet (nextTime , newNextTime )) {
173189 // Someone else updated it already.
190+ if (bigtableTracer != null ) {
191+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , false );
192+ }
174193 return ;
175194 }
176195 final double oldRate = limiter .getRate ();
177196 limiter .setRate (rate );
178197 logger .info (
179- "Updated max rate from "
198+ "Batch write flow control: updated max rate from "
180199 + oldRate
181200 + " to "
182201 + rate
202+ + " applied factor "
203+ + factor
183204 + " with period "
184205 + period .getSeconds ()
185- + " seconds." );
206+ + " seconds. Status="
207+ + statusString );
208+ if (bigtableTracer != null ) {
209+ bigtableTracer .setBatchWriteFlowControlTargetQps (rate );
210+ bigtableTracer .addBatchWriteFlowControlFactor (factor , statusString , true );
211+ }
186212 }
187213
188214 @ VisibleForTesting
@@ -215,17 +241,21 @@ private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
215241 // have presence even thought it's marked as "optional". Check the factor and
216242 // period to make sure they're not 0.
217243 if (!response .hasRateLimitInfo ()) {
218- logger .finest ("Response carries no RateLimitInfo" );
244+ logger .finest ("Batch write flow control: response carries no RateLimitInfo" );
219245 return false ;
220246 }
221247
222248 if (response .getRateLimitInfo ().getFactor () <= 0
223249 || response .getRateLimitInfo ().getPeriod ().getSeconds () <= 0 ) {
224- logger .finest ("Response carries invalid RateLimitInfo=" + response .getRateLimitInfo ());
250+ logger .finest (
251+ "Batch write flow control: response carries invalid RateLimitInfo="
252+ + response .getRateLimitInfo ());
225253 return false ;
226254 }
227255
228- logger .finest ("Response carries valid RateLimitInfo=" + response .getRateLimitInfo ());
256+ logger .finest (
257+ "Batch write flow control: response carries valid RateLimitInfo="
258+ + response .getRateLimitInfo ());
229259 return true ;
230260 }
231261
@@ -236,7 +266,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
236266 RateLimitInfo info = response .getRateLimitInfo ();
237267 updateQps (
238268 info .getFactor (),
239- Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
269+ Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())),
270+ extractStatus (null ));
240271 } else {
241272 limiter .tryDisable ();
242273 }
@@ -250,7 +281,7 @@ protected void onErrorImpl(Throwable t) {
250281 if (t instanceof DeadlineExceededException
251282 || t instanceof UnavailableException
252283 || t instanceof ResourceExhaustedException ) {
253- updateQps (MIN_FACTOR , DEFAULT_PERIOD );
284+ updateQps (MIN_FACTOR , DEFAULT_PERIOD , extractStatus ( t ) );
254285 }
255286 outerObserver .onError (t );
256287 }
@@ -260,11 +291,11 @@ protected void onCompleteImpl() {
260291 outerObserver .onComplete ();
261292 }
262293
263- private void updateQps (double factor , Duration period ) {
294+ private void updateQps (double factor , Duration period , String statusString ) {
264295 double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
265296 double currentRate = limiter .getRate ();
266297 double cappedRate = Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS );
267- limiter .trySetRate (cappedRate , period );
298+ limiter .trySetRate (cappedRate , period , bigtableTracer , cappedFactor , statusString );
268299 }
269300 }
270301
0 commit comments