xds: migrate internal xdsclient to use generic client and dedicated LRS client#8310
Conversation
4a25c97 to
aa38b33
Compare
| releaseChannelRef() | ||
| } | ||
| return load, sync.OnceFunc(func() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), loadStoreStopTimeout) |
There was a problem hiding this comment.
It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context).
Also the implementation of load.Stop seems to block indefinitely on a channel closing, even if the context was cancelled. That looks wrong, because if there is a context, it should respect it. I think this needs to be looked at a bit more closely. Maybe it's as simple as changing the context into a timeout (time.Duration) to apply to the final stream write?
There was a problem hiding this comment.
It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context).
Modified to accept context.Context. I checked google3 and didn't find any usage of ReportLoad outside of grpc so changing the function signature is fine here.
There was a problem hiding this comment.
Also the implementation of load.Stop seems to block indefinitely on a channel closing, even if the context was cancelled.
are you referring to <-lrs.doneCh here https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/lrsclient/lrsclient.go#L172? because the above final send has a select block that respect the provided ctx
There was a problem hiding this comment.
I'll have to look and see how this plumbs back, but I'm skeptical we already have a context where we need it. Remember that only Main/server handlers should be creating contexts from Background, and there are checkers that could get in our way of doing otherwise.
There was a problem hiding this comment.
we are not creating the stop context in lrs client. Its provided by user.
For grpc case, the LoadStore.Stop() is called by clusterimpl in updateLoadStore method in xds/internal/balancer/clusterimpl/clusterimpl.go. The updateLoadStore method is called from the UpdateClientConnState method of the clusterImplBalancer. The UpdateClientConnState method is called when a new cluster update from xds management server is received. The UpdateClientConnState method itself does not receive a context.Context. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.
The stopCtx created with context.WithTimeout(context.Background(), loadStoreStopTimeout) is specifically used to provide a deadline for the b.cancelLoadReport function. This function, in turn, wraps the Stop method of the lrsclient.LoadStore. The LoadStore.Stop method takes a context. b.CancelLoadReport is called in Close() method of clusterImplBalancer and within updateLoadStore if stopOldLoadReport is true. I think its fine for Close() to block on stopping the loadstore? May be for the other case we can do in a new goroutine if we don't want to block.
There was a problem hiding this comment.
The UpdateClientConnState method itself does not receive a context.Context. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.
Right, which is why I'm saying we probably don't want to use a context here, because we can't create one without triggering some checkers that I'd rather just avoid. https://google.github.io/styleguide/go/decisions#contexts
There was a problem hiding this comment.
yeah, so we can create a timer from passed timeout and put that in the select block like below in stop function. Is that reasonable?
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-lrs.finalSendDone:
if err != nil {
c.logger.Warningf("Final send attempt failed: %v", err)
}
case <-timer.C:
// The operation to send the final load report and get confirmation
// via finalSendDone did not complete within the timeout.
c.logger.Warningf("Final send attempt timed out after %v", timeout
}
There was a problem hiding this comment.
Unfortunately, I don't think there's any other choice if we can't create a context and don't already have one.
There was a problem hiding this comment.
I have updated to use time.Duration for LoadStore.Stop()
| }} | ||
|
|
||
| gotStatsData0 := loadStore.Stats([]string{testClusterName}) | ||
| if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { |
There was a problem hiding this comment.
Why are these tests removed? Are we testing all the stats (sufficiently) elsewhere?
There was a problem hiding this comment.
the external client has tests specifically for testing all stats scenarios. We haven't exported the stats function so we can't compare stats exactly like this anymore here.
The other e2e test tests/balancer_test does verify the stats through a fake server which allows intercepting the received report request.
There was a problem hiding this comment.
Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?
If we are deleting this, then loadStore is now unused.
There was a problem hiding this comment.
I think implementing entire LRS implementation is not possible because lrsclient PerClusterReporter is not an interface that we can implement for grpc tests.
One thing is possible though with what we have currently. The picker in clusterImpl has a wrapper for loadStore of type loadReporter which is an interface https://github.com/purnesh42H/grpc-go/blob/generic-xds-client-migrate-internal-e2e/xds/internal/balancer/clusterimpl/picker.go#L86. ClusterImpl sets this field to a wrapper that uses lrsclient.LoadStore to report stats.
We can override the picker's loadStore in our tests with the test loadReporter that allows us to count and fetch stats but the underneath lrsclient.LoadStore is unused then which might be fine because for this test we want to verify the picker's logic only. Let me know if that sounds good to you.
There was a problem hiding this comment.
Let me know if something like this is okay 7da08b1
There was a problem hiding this comment.
Ok. So, I made a couple of comments about this above, but after reading this thread, I have some more context.
The issue is that the the ReportLoad method on the xDS client returns a struct, and the load store only exposes method to report load data, but not to verify the reported load data. So, this approach of using a test double for the load store seems Ok for the time being. But, maybe we should have a TODO here to find other means to test this and avoid so much test-util code.
xds/internal/xdsclient/clientimpl.go
Outdated
| func init() { | ||
| DefaultPool = &Pool{clients: make(map[string]*clientRefCounted)} | ||
| } |
There was a problem hiding this comment.
You shouldn't need an init for this - just iniitalize DefaultPool inline.
xds/internal/xdsclient/clientimpl.go
Outdated
| } | ||
| bundle, _, err := c.Build(cc.Config) | ||
| if err != nil { | ||
| continue |
There was a problem hiding this comment.
This doesn't feel like a continue to me. If we failed to build, that seems like a pretty big error that we should at least log, if not return.
There was a problem hiding this comment.
yes, my mistake. I checked the internal xds/bootstrap.go logic and we do return error if we failed to build creds. Changed to return error.
xds/internal/xdsclient/clientimpl.go
Outdated
| continue | ||
| } | ||
| bundle, _, err := c.Build(cc.Config) | ||
| if err != nil { |
There was a problem hiding this comment.
As above. This looks like duplicate code. Can you refactor?
There was a problem hiding this comment.
Refactored into helper.
7302202 to
a1f489d
Compare
3127e3f to
038ea2b
Compare
| }} | ||
|
|
||
| gotStatsData0 := loadStore.Stats([]string{testClusterName}) | ||
| if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { |
There was a problem hiding this comment.
Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?
If we are deleting this, then loadStore is now unused.
| xdsClient xdsclient.XDSClient // Sent down in ResolverState attributes. | ||
| cancelLoadReport func() // To stop reporting load through the above xDS client. | ||
| cancelLoadReport func(time.Duration) // To stop reporting load through the above xDS client. | ||
| edsServiceName string // EDS service name to report load for. |
There was a problem hiding this comment.
it accepts timeout value now to cancel the load report. cancelLoadReport calls Stop on underlying LoadStore with same timeout. There is no context anymore here.
There was a problem hiding this comment.
Let's follow along with the question to the Go team. It could be that using a context.TODO at our callsite is acceptable. Ideally the API would accept a context, because that matches http.Server.Shutdown, and it feels like a very similar usage pattern. :(
There was a problem hiding this comment.
So, based on their last reply we can create the context from context.TODO in clusterimpl. We need to file an exception though but it might not be very difficult to get exception since no callsites are exported to user. Did i get it right?
There was a problem hiding this comment.
Let's go back to the context, I suppose, since I recently discovered http.Server.Shutdown, which also uses context for essentially the same thing. We will deal with an exception if we need one. I suspect we won't, because those things don't end up using xds anyway. For now.
There was a problem hiding this comment.
Switched back to using context. One more thing. From the other bug related to LRS reporting interval b/416260484, it looks like max report interval can be upto 10 seconds. Do you think we should use 10s in clusterimpl for timeout in loadStore.Stop()?
There was a problem hiding this comment.
I'm not sure of the semantics here. Is it illegal to report before the interval expires, or can we do one early report at any time before closing the stream? If we can't send it early, then is it important to capture the partial load report at the end of connections?
There was a problem hiding this comment.
I followed c-core implementation. They don't have any notion of early report. Final statistics are aggregated and queued and sent with the next regular report. So, essentially they are waiting until the next reporting cycle. Although they do have a minimum load report interval 1s which is used if actual load reporting interval is lower than 1s but that's applicable overall not just to final report.
In our case as well, after calling Stop() if refs become 0, we notify the lrs stream runner about final report so when it comes back on next time, it reports the final stats and then cancel the stream. The only difference here is we are taking the deadline from user to not block indefinitely (in case of some internal error). For grpc, its probably good if we set this value to the max possible from TD because at the grpc level we don't have access to reporting interval from lrs server.
One more thing i realized is we don't cancel/nullify the PerClusterReporter so technically more loads can be added to it even after calling stop. Should we document that after calling Stop on LoadStore, any operation on PerClusterReporter is undefined?
| return rawAny.Value | ||
| } | ||
|
|
||
| // genericResourceWatcher embed ResourceWatcher and implements |
There was a problem hiding this comment.
Same re: embedded.
But.. can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.
There was a problem hiding this comment.
Changed to wrap
can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.
I looked at the google3 usages and WatchResource API is being used in different resolvers. So, if we remove the wrapper then the change won't be transparent due to ResourceType being different and after import we will need manual changes.
On that note, what is the eventual goal? Will we move the other resolvers to directly use generic xdsclient? If yes, then we can just remove some of these wrappers after switching them.
xds/internal/xdsclient/clientimpl.go
Outdated
|
|
||
| func newClientImplGeneric(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, resourceTypes map[string]gxdsclient.ResourceType, target string) (*clientImpl, error) { | ||
| grpcTransportConfigs := make(map[string]grpctransport.Config) | ||
| gServerCfgMap := make(map[gxdsclient.ServerConfig]*bootstrap.ServerConfig) |
There was a problem hiding this comment.
Why are we making this map? What data are we needing to get from of our grpc server config that isn't in the generic server config, or can't be put into the generic ServerIdentifier as an extension?
There was a problem hiding this comment.
Technically, we need a single
ServerConfigwhile decoding but because we need to provide all resource types at the time of creating client, we need to initialize it with all ServerConfigs and then fetch the requiredbootstrap.ServerConfigone based on the genericxdsclient.ServerConfigfromDecodeOptionswithin decoder. The internal xdsclient lazily adds resource type when it starts watching so it has the needed ServerConfig at the time of registering the type. Hence, it doesn't need the map.
I explained the reason for map in above comment.
What data are we needing to get from of our grpc server config that isn't in the generic server config
cds resource parsing needs the server config for the LRS server https://github.com/grpc/grpc-go/blob/master/xds/internal/xdsclient/xdsresource/unmarshal_cds.go#L199.
dfawley
left a comment
There was a problem hiding this comment.
Have you tested this internally by doing an import from your branch? If you haven't already, can you please coordinate with @danielzhaotongliu so you can run their tests as well, as we discussed previously.
| addr := connectedAddress(state) | ||
| lID := xdsinternal.GetLocalityID(addr) | ||
| if lID.Empty() { | ||
| if xdsinternal.IsLocalityEmpty(lID) { |
There was a problem hiding this comment.
Maybe delete the function and directly use lID == clients.LocalityID{}?
00d6aa4 to
c0a8c38
Compare
I have imported it in google3 cl/763291147 and it was successful without any manual changes. I ran all the prod resolver tests as well and they ran fine. I have asked @danielzhaotongliu to run their local tests as well. I was waiting to import for few review iterations till we are good correctness wise and won't be doing major changes. Is it looking close enough? |
| // testDiscoverResponse from the server and verifies that the received | ||
| // discovery response is same as sent from the server. | ||
| func (s) TestStream_SendAndRecv(t *testing.T) { | ||
| func TestStream_SendAndRecv(t *testing.T) { |
There was a problem hiding this comment.
By mistake. Reverted
0e0fb60 to
5fa58f0
Compare
|
@purnesh42H can you please remove the dependency on the codes proto? We use grpc/codes instead. It looks like on master, |
Shouldn't we not be using grpc packages for this? For now, I have made into a local constant. Let me know if that's ok. |
Hmm, yes you are correct. This is a little unfortunate. It looks like C++ uses ABSL to create this invalid argument error, and Java uses the codes proto. So I think you can revert this commit. The proto in question is very small, and we already use the same module for the status proto, so I don't think this should be significant. |
This reverts commit 8b6d364.
3ce4064 to
6a472fb
Compare
Reverted |
…edicated LRS client This reverts commit 996aabe.
…client and dedicated LRS client" This reverts commit d2e8366.
…nd dedicated LRS client This reverts commit 996aabe.
This PR refactors the internal
xdsclientpackage to leverage a shared, generic xDS client implementation and a dedicated LRS client for load reporting.xdsresource.Type,xdsresource.ResourceWatcher, etc.) to ensure transparency and minimal impact on current users of the internalxdsclientpackage.lrsclient). TheReportLoadmethod in the internalxdsclientnow delegates to this new client, separating load reporting concerns from the main ADS client logic.xdsclientpackage are removed, as this functionality is now covered by tests for the generic client.LoadStoreimplementation for aggregating load data continues to use theclients.Localitystruct as keys for tracking locality-specific statistics. (Note: Dropped call counts are reported per category, which is represented by a string).RELEASE NOTES: None