Skip to content

Commit 88cdc87

Browse files
authored
[2.7] smaller lock in produce item (#4174)
### Description Do not hold the lock around produce_item. It is not needed and this operation can be slow. We do not want/need to hold up everying during this time. ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated.
1 parent 1d2f20c commit 88cdc87

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

nvflare/fuel/f3/streaming/cacheable.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,35 @@ def clear_cache(self):
9090
def _get_item(self, index: int, requester: str) -> bytes:
9191
with self.lock:
9292
if not self.cache:
93-
# the cache has been cleared
93+
cache_available = False
9494
data = None
9595
else:
96+
cache_available = True
9697
data, _ = self.cache[index]
9798

98-
if data is None:
99-
data = self.produce_item(index)
100-
if self.cache:
101-
self.cache[index] = (data, 0)
102-
self.logger.debug(f"created and cached item {index} for {requester}: {len(data)} bytes")
103-
else:
104-
self.logger.debug(f"got item {index} from cache for {requester}")
99+
if not cache_available:
100+
return self.produce_item(index)
101+
102+
if data is not None:
103+
self.logger.debug(f"got item {index} from cache for {requester}")
105104
return data
106105

106+
# Produce outside the lock so concurrent receivers aren't blocked.
107+
# If two receivers produce the same item simultaneously, the first
108+
# to re-acquire the lock stores its result; the second uses it.
109+
data = self.produce_item(index)
110+
111+
with self.lock:
112+
if self.cache:
113+
existing, count = self.cache[index]
114+
if existing is None:
115+
self.cache[index] = (data, count)
116+
self.logger.debug(f"created and cached item {index} for {requester}: {len(data)} bytes")
117+
else:
118+
data = existing
119+
self.logger.debug(f"got item {index} from cache for {requester} (produced concurrently)")
120+
return data
121+
107122
def _adjust_cache(self, start: int, count: int):
108123
with self.lock:
109124
if not self.cache:

0 commit comments

Comments
 (0)