@@ -35,9 +35,11 @@ import (
3535 "go.opentelemetry.io/otel/attribute"
3636 "go.opentelemetry.io/otel/metric"
3737 gtransport "google.golang.org/api/transport/grpc"
38+ "google.golang.org/grpc/codes"
3839 "google.golang.org/grpc/credentials/alts"
3940 "google.golang.org/grpc/metadata"
4041 "google.golang.org/grpc/peer"
42+ "google.golang.org/grpc/status"
4143
4244 btopt "cloud.google.com/go/bigtable/internal/option"
4345
@@ -48,6 +50,8 @@ import (
4850// We cap the max draining timeout to 30mins as there might be a long running stream (such as full table scan).
4951var maxDrainingTimeout = 30 * time .Minute
5052
53+ const artificialLoadIfError = 10
54+ const artificialLoadPenalizedTimer = 5 * time .Second
5155const requestParamsHeader = "x-goog-request-params"
5256
5357// ipProtocol represents the type of IP protocol used.
@@ -250,6 +254,7 @@ type connEntry struct {
250254 streamingLoad atomic.Int32 // In-flight streaming requests
251255 errorCount atomic.Int64 // Errors since the last metric report
252256 drainingState atomic.Bool // True if the connection is being gracefully drained.
257+ penaltyExpiry atomic.Int64 // penaltyExpiry stores the UnixNano timestamp of when the penalty ends
253258
254259}
255260
@@ -271,6 +276,25 @@ func (e *connEntry) createdAt() int64 {
271276 return e .conn .creationTime ()
272277}
273278
279+ // applyErrorPenalty checks if the error warrants a load balancing penalty,
280+ // and if so, sets an expiration time for the artificial load.
281+ func (e * connEntry ) applyErrorPenalty (err error ) {
282+ if err == nil {
283+ return
284+ }
285+
286+ code := status .Code (err )
287+
288+ // Penalize errors that typically indicate target-specific health or capacity issues.
289+ if code == codes .Unavailable ||
290+ code == codes .ResourceExhausted ||
291+ code == codes .Internal {
292+ // A simple Store is safe here; concurrent updates is fine here.
293+ newExpiry := time .Now ().Add (artificialLoadPenalizedTimer ).UnixNano ()
294+ e .penaltyExpiry .Store (newExpiry )
295+ }
296+ }
297+
274298// isDraining atomically checks if the connection is in the draining state.
275299func (e * connEntry ) isDraining () bool {
276300 return e .drainingState .Load ()
@@ -313,7 +337,18 @@ func (p *BigtableChannelPool) waitForDrainAndClose(entry *connEntry) {
313337func (e * connEntry ) calculateConnLoad () int32 {
314338 unary := e .unaryLoad .Load ()
315339 streaming := e .streamingLoad .Load ()
316- return unary + streaming
340+ load := unary + streaming
341+
342+ expiry := e .penaltyExpiry .Load ()
343+ if expiry > 0 {
344+ if time .Now ().UnixNano () < expiry {
345+ load += artificialLoadIfError // Apply the artificial penalty weight
346+ } else {
347+ // restore to zero
348+ e .penaltyExpiry .CompareAndSwap (expiry , 0 )
349+ }
350+ }
351+ return load
317352}
318353
319354// BigtableChannelPool implements ConnPool and routes requests to the connection
@@ -673,6 +708,7 @@ func (p *BigtableChannelPool) Invoke(ctx context.Context, method string, args in
673708 err = entry .conn .Invoke (ctx , method , args , reply , opts ... )
674709 if err != nil {
675710 entry .errorCount .Add (1 )
711+ entry .applyErrorPenalty (err ) // Apply penalty on error
676712 }
677713 return err
678714
@@ -823,6 +859,7 @@ func (s *refCountedStream) SendMsg(m interface{}) error {
823859 err := s .ClientStream .SendMsg (m )
824860 if err != nil {
825861 s .entry .errorCount .Add (1 )
862+ s .entry .applyErrorPenalty (err )
826863 s .decrementLoad ()
827864 }
828865 return err
@@ -835,6 +872,7 @@ func (s *refCountedStream) RecvMsg(m interface{}) error {
835872 // io.EOF is a normal stream termination, not an error to be counted.
836873 if ! errors .Is (err , io .EOF ) {
837874 s .entry .errorCount .Add (1 )
875+ s .entry .applyErrorPenalty (err )
838876 }
839877 s .decrementLoad ()
840878 }
0 commit comments