1515 */
1616package com .google .cloud .bigtable .data .v2 .stub ;
1717
18+ import static com .google .cloud .bigtable .data .v2 .stub .metrics .Util .extractStatus ;
19+
1820import com .google .api .gax .rpc .ApiCallContext ;
1921import com .google .api .gax .rpc .DeadlineExceededException ;
2022import com .google .api .gax .rpc .ResourceExhaustedException ;
3739import java .util .concurrent .atomic .AtomicReference ;
3840import java .util .logging .Logger ;
3941import javax .annotation .Nonnull ;
42+ import javax .annotation .Nullable ;
4043
4144class RateLimitingServerStreamingCallable
4245 extends ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > {
@@ -69,6 +72,8 @@ class RateLimitingServerStreamingCallable
6972
7073 private final ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ;
7174
75+ private BigtableTracer bigtableTracer ;
76+
7277 RateLimitingServerStreamingCallable (
7378 @ Nonnull ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ) {
7479 this .limiter = new ConditionalRateLimiter (DEFAULT_QPS );
@@ -84,8 +89,8 @@ public void call(
8489 limiter .acquire ();
8590 stopwatch .stop ();
8691 if (context .getTracer () instanceof BigtableTracer ) {
87- (( BigtableTracer ) context .getTracer ())
88- .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
92+ bigtableTracer = ( BigtableTracer ) context .getTracer ();
93+ bigtableTracer .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
8994 }
9095 RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver (responseObserver );
9196 innerCallable .call (request , innerObserver , context );
@@ -104,7 +109,10 @@ static class ConditionalRateLimiter {
104109
105110 public ConditionalRateLimiter (long defaultQps ) {
106111 limiter = RateLimiter .create (defaultQps );
107- logger .info ("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS." );
112+ logger .info (
113+ "Batch write flow control: rate limiter is initiated (but disabled) with rate of "
114+ + defaultQps
115+ + " QPS." );
108116 }
109117
110118 /**
@@ -128,7 +136,7 @@ public void tryDisable() {
128136 if (now .isAfter (nextTime )) {
129137 boolean wasEnabled = this .enabled .getAndSet (false );
130138 if (wasEnabled ) {
131- logger .info ("Rate limiter is disabled." );
139+ logger .info ("Batch write flow control: rate limiter is disabled." );
132140 }
133141 // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
134142 // update the rate again.
@@ -139,7 +147,7 @@ public void tryDisable() {
139147 public void enable () {
140148 boolean wasEnabled = this .enabled .getAndSet (true );
141149 if (!wasEnabled ) {
142- logger .info ("Rate limiter is enabled." );
150+ logger .info ("Batch write flow control: rate limiter is enabled." );
143151 }
144152 }
145153
@@ -158,31 +166,52 @@ public double getRate() {
158166 * @param rate The new rate of the rate limiter.
159167 * @param period The period during which rate should not be updated again and the rate limiter
160168 * should not be disabled.
169+ * @param bigtableTracer The tracer for exporting client-side metrics.
170+ * @param factor The capped factor that we're trying to apply.
171+ * @param status The status of the response from which the factor is retrieved or derived.
161172 */
162- public void trySetRate (double rate , Duration period ) {
173+ public void trySetRate (
174+ double rate ,
175+ Duration period ,
176+ @ Nullable BigtableTracer bigtableTracer ,
177+ double factor ,
178+ @ Nullable Throwable status ) {
163179 Instant nextTime = nextRateUpdateTime .get ();
164180 Instant now = Instant .now ();
165181
166182 if (now .isBefore (nextTime )) {
183+ if (bigtableTracer != null ) {
184+ bigtableTracer .addBatchWriteFlowControlFactor (factor , status , false );
185+ }
167186 return ;
168187 }
169188
170189 Instant newNextTime = now .plusSeconds (period .getSeconds ());
171190
172191 if (!nextRateUpdateTime .compareAndSet (nextTime , newNextTime )) {
173192 // Someone else updated it already.
193+ if (bigtableTracer != null ) {
194+ bigtableTracer .addBatchWriteFlowControlFactor (factor , status , false );
195+ }
174196 return ;
175197 }
176198 final double oldRate = limiter .getRate ();
177199 limiter .setRate (rate );
178200 logger .info (
179- "Updated max rate from "
201+ "Batch write flow control: updated max rate from "
180202 + oldRate
181203 + " to "
182204 + rate
205+ + " applied factor "
206+ + factor
183207 + " with period "
184208 + period .getSeconds ()
185- + " seconds." );
209+ + " seconds. Status="
210+ + extractStatus (status ));
211+ if (bigtableTracer != null ) {
212+ bigtableTracer .setBatchWriteFlowControlTargetQps (rate );
213+ bigtableTracer .addBatchWriteFlowControlFactor (factor , status , true );
214+ }
186215 }
187216
188217 @ VisibleForTesting
@@ -215,17 +244,21 @@ private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
215244 // have presence even thought it's marked as "optional". Check the factor and
216245 // period to make sure they're not 0.
217246 if (!response .hasRateLimitInfo ()) {
218- logger .finest ("Response carries no RateLimitInfo" );
247+ logger .finest ("Batch write flow control: response carries no RateLimitInfo" );
219248 return false ;
220249 }
221250
222251 if (response .getRateLimitInfo ().getFactor () <= 0
223252 || response .getRateLimitInfo ().getPeriod ().getSeconds () <= 0 ) {
224- logger .finest ("Response carries invalid RateLimitInfo=" + response .getRateLimitInfo ());
253+ logger .finest (
254+ "Batch write flow control: response carries invalid RateLimitInfo="
255+ + response .getRateLimitInfo ());
225256 return false ;
226257 }
227258
228- logger .finest ("Response carries valid RateLimitInfo=" + response .getRateLimitInfo ());
259+ logger .finest (
260+ "Batch write flow control: response carries valid RateLimitInfo="
261+ + response .getRateLimitInfo ());
229262 return true ;
230263 }
231264
@@ -236,7 +269,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
236269 RateLimitInfo info = response .getRateLimitInfo ();
237270 updateQps (
238271 info .getFactor (),
239- Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
272+ Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())),
273+ null );
240274 } else {
241275 limiter .tryDisable ();
242276 }
@@ -250,7 +284,7 @@ protected void onErrorImpl(Throwable t) {
250284 if (t instanceof DeadlineExceededException
251285 || t instanceof UnavailableException
252286 || t instanceof ResourceExhaustedException ) {
253- updateQps (MIN_FACTOR , DEFAULT_PERIOD );
287+ updateQps (MIN_FACTOR , DEFAULT_PERIOD , t );
254288 }
255289 outerObserver .onError (t );
256290 }
@@ -260,11 +294,11 @@ protected void onCompleteImpl() {
260294 outerObserver .onComplete ();
261295 }
262296
263- private void updateQps (double factor , Duration period ) {
297+ private void updateQps (double factor , Duration period , @ Nullable Throwable status ) {
264298 double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
265299 double currentRate = limiter .getRate ();
266300 double cappedRate = Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS );
267- limiter .trySetRate (cappedRate , period );
301+ limiter .trySetRate (cappedRate , period , bigtableTracer , cappedFactor , status );
268302 }
269303 }
270304
0 commit comments