diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index c835026a2248..7a61329b8e88 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -639,9 +639,11 @@ private SegmentCacheEntry assignLocationAndMount( if (cacheEntry.checkExists(location.getPath())) { if (location.isReserved(cacheEntry.id) || location.reserve(cacheEntry)) { final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); - entry.lazyLoadCallback = segmentLoadFailCallback; - entry.mount(location); - return entry; + if (entry != null) { + entry.lazyLoadCallback = segmentLoadFailCallback; + entry.mount(location); + return entry; + } } else { // entry is not reserved, clean it up deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath())); @@ -658,9 +660,11 @@ private SegmentCacheEntry assignLocationAndMount( if (location.reserve(cacheEntry)) { try { final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); - entry.lazyLoadCallback = segmentLoadFailCallback; - entry.mount(location); - return entry; + if (entry != null) { + entry.lazyLoadCallback = segmentLoadFailCallback; + entry.mount(location); + return entry; + } } catch (SegmentLoadingException e) { log.warn(e, "Failed to load segment[%s] in location[%s], trying next location", cacheEntry.id, location.getPath()); @@ -831,7 +835,9 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException } final SegmentizerFactory factory = getSegmentFactory(storageDir); - final Segment segment = factory.factorize(dataSegment, storageDir, false, lazyLoadCallback); + @SuppressWarnings("ObjectEquality") + final boolean lazy = config.isLazyLoadOnStart() && lazyLoadCallback != SegmentLazyLoadFailCallback.NOOP; + final Segment segment = factory.factorize(dataSegment, storageDir, lazy, lazyLoadCallback); // wipe load callback after calling lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP; referenceProvider = ReferenceCountedSegmentProvider.of(segment); diff --git a/server/src/main/java/org/apache/druid/server/ServerManager.java b/server/src/main/java/org/apache/druid/server/ServerManager.java index 732645b42ae9..b7652090af46 100644 --- a/server/src/main/java/org/apache/druid/server/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/ServerManager.java @@ -308,21 +308,27 @@ private ArrayList getSegmentReferences( final ListenableFuture> future = futures.get(i); final ReferenceCountedObjectProvider referenceProvider = future.get(timeoutAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - final Optional segment = referenceProvider.acquireReference(); - try { - final Optional mappedSegment = segmentMapFunction.apply(segment).map(safetyNet::register); + if (referenceProvider == null) { segmentReferences.add( - new SegmentReference( - segmentAndDescriptor.getDescriptor(), - mappedSegment, - action - ) + new SegmentReference(segmentAndDescriptor.getDescriptor(), Optional.empty(), action) ); - } - catch (Throwable t) { - // if applying the mapFn failed, attach the base segment to the closer and rethrow - segment.ifPresent(safetyNet::register); - throw t; + } else { + final Optional segment = referenceProvider.acquireReference(); + try { + final Optional mappedSegment = segmentMapFunction.apply(segment).map(safetyNet::register); + segmentReferences.add( + new SegmentReference( + segmentAndDescriptor.getDescriptor(), + mappedSegment, + action + ) + ); + } + catch (Throwable t) { + // if applying the mapFn failed, attach the base segment to the closer and rethrow + segment.ifPresent(safetyNet::register); + throw t; + } } } catch (Throwable t) { diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java index cf64244b86c9..0ac4796cd3f9 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java @@ -839,6 +839,43 @@ public void testGetBootstrapSegment() throws SegmentLoadingException Assert.assertEquals(dataSegment.getInterval(), actualBootstrapSegment.getDataInterval()); } + + @Test + public void testGetBootstrapSegmentLazy() throws SegmentLoadingException + { + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheDir, 10000L, null); + final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig() + { + @Override + public boolean isLazyLoadOnStart() + { + return true; + } + + @Override + public List getLocations() + { + return List.of(locationConfig); + } + }; + final List storageLocations = loaderConfig.toStorageLocations(); + SegmentLocalCacheManager manager = new SegmentLocalCacheManager( + storageLocations, + loaderConfig, + new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations), + TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT), + jsonMapper + ); + + final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1", Intervals.of("2020/2021")); + + manager.bootstrap(dataSegment, () -> {}); + Segment actualBootstrapSegment = manager.acquireCachedSegment(dataSegment).orElse(null); + Assert.assertNotNull(actualBootstrapSegment); + Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId()); + Assert.assertEquals(dataSegment.getInterval(), actualBootstrapSegment.getDataInterval()); + } + @Test public void testGetSegmentVirtualStorage() throws Exception {