feat(storage): implement App-centric Observability (ACO) OpenTelemetry tracing#17149
feat(storage): implement App-centric Observability (ACO) OpenTelemetry tracing#17149chandra-siri wants to merge 5 commits into
Conversation
56e225f to
cab26e6
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces an in-memory LRU cache for GCS bucket metadata to support App-centric Observability (ACO) within OpenTelemetry tracing. Key changes include the implementation of a thread-safe BucketMetadataCache with background fetching, updates to the Client class to support context management and cache lifecycle, and the re-enabling of system tests in Cloud Build. Feedback identifies a potential StopIteration error in the cache logic if max_size is zero and suggests pre-compiling regular expressions in the tracing module to improve performance.
| """Thread-safely update or insert a cache entry with bounded size.""" | ||
| with self._lock: | ||
| if bucket_name in self._cache: | ||
| self._cache.pop(bucket_name) | ||
| elif len(self._cache) >= self._max_size: | ||
| # Pop the first item (oldest inserted) | ||
| first_key = next(iter(self._cache)) | ||
| self._cache.pop(first_key) | ||
| self._cache[bucket_name] = (destination_id, location) |
There was a problem hiding this comment.
The current implementation of update_cache will raise a StopIteration exception if max_size is set to 0, as next(iter(self._cache)) will be called on an empty dictionary. While the default is 10000, it is safer to validate that max_size is at least 1 in the constructor or handle the zero-size case here.
| """Thread-safely update or insert a cache entry with bounded size.""" | |
| with self._lock: | |
| if bucket_name in self._cache: | |
| self._cache.pop(bucket_name) | |
| elif len(self._cache) >= self._max_size: | |
| # Pop the first item (oldest inserted) | |
| first_key = next(iter(self._cache)) | |
| self._cache.pop(first_key) | |
| self._cache[bucket_name] = (destination_id, location) | |
| def update_cache(self, bucket_name, destination_id, location): | |
| """Thread-safely update or insert a cache entry with bounded size.""" | |
| if self._max_size <= 0: | |
| return | |
| with self._lock: | |
| if bucket_name in self._cache: | |
| self._cache.pop(bucket_name) | |
| elif len(self._cache) >= self._max_size: | |
| # Pop the first item (oldest inserted) | |
| first_key = next(iter(self._cache)) | |
| self._cache.pop(first_key) | |
| self._cache[bucket_name] = (destination_id, location) |
5544ee4 to
d037656
Compare
…y tracing and lockless bucket metadata caching
d037656 to
d0f4980
Compare
…and debug logging on exceptions
…n behind OpenTelemetry enablement Ensure that BucketMetadataCache lookups and asynchronous cache eviction threads only execute when OpenTelemetry tracing is installed and enabled. Also safely access _extra_headers on Client objects. TAG=agy CONV=c671fa00-7189-45b9-a5af-12f4c7a7c486
TAG=agy CONV=c671fa00-7189-45b9-a5af-12f4c7a7c486
Inject mandatory OpenTelemetry span attributes (
gcp.resource.destination.idandgcp.resource.destination.location) into Cloud Storage bucket and object spans to support App-centric Observability (ACO). Implements a highly optimized, lockless in-memory LRU cache on the GCSClient, backed by asynchronous singleflight background fetching to prevent cache miss stampedes and eliminate latency overhead.