Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions nvflare/fuel/f3/streaming/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,35 @@ def clear_cache(self):
def _get_item(self, index: int, requester: str) -> bytes:
with self.lock:
if not self.cache:
# the cache has been cleared
cache_available = False
data = None
else:
cache_available = True
data, _ = self.cache[index]

if data is None:
data = self.produce_item(index)
if self.cache:
self.cache[index] = (data, 0)
self.logger.debug(f"created and cached item {index} for {requester}: {len(data)} bytes")
else:
self.logger.debug(f"got item {index} from cache for {requester}")
if not cache_available:
return self.produce_item(index)

if data is not None:
self.logger.debug(f"got item {index} from cache for {requester}")
return data

# Produce outside the lock so concurrent receivers aren't blocked.
# If two receivers produce the same item simultaneously, the first
# to re-acquire the lock stores its result; the second uses it.
data = self.produce_item(index)

with self.lock:
if self.cache:
existing, count = self.cache[index]
if existing is None:
self.cache[index] = (data, count)
self.logger.debug(f"created and cached item {index} for {requester}: {len(data)} bytes")
else:
data = existing
self.logger.debug(f"got item {index} from cache for {requester} (produced concurrently)")
return data

def _adjust_cache(self, start: int, count: int):
with self.lock:
if not self.cache:
Expand Down