Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -71,7 +70,7 @@
* The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
* And force routes and its services to shutdown now. There is a risk when shutting down now,
* that some resources is not properly shutdown, which can cause side effects. The timeout value
* is by default 300 seconds, but can be customized.
* is by default 300 seconds, but can be customized.
* <p/>
* As this strategy will politely wait until all exchanges has been completed it can potential wait
* for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
Expand Down Expand Up @@ -99,7 +98,7 @@
* to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these
* logs, so they are logged at TRACE level instead.
*
* @version
* @version
*/
public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class);
Expand All @@ -113,6 +112,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
private boolean suppressLoggingOnTimeout;
private volatile boolean forceShutdown;
private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
private volatile Future<?> currentShutdownTaskFuture;

public DefaultShutdownStrategy() {
}
Expand Down Expand Up @@ -173,15 +173,18 @@ public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {

// use another thread to perform the shutdowns so we can support timeout
timeoutOccurred.set(false);
Future<?> future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
try {
future.get(timeout, timeUnit);
} catch (TimeoutException e) {
currentShutdownTaskFuture.get(timeout, timeUnit);
} catch (ExecutionException e) {
// unwrap execution exception
throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
} catch (Exception e) {
// we hit a timeout, so set the flag
timeoutOccurred.set(true);

// timeout then cancel the task
future.cancel(true);
currentShutdownTaskFuture.cancel(true);

// signal we are forcing shutdown now, since timeout occurred
this.forceShutdown = forceShutdown;
Expand All @@ -206,9 +209,9 @@ public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes.");
}
}
} catch (ExecutionException e) {
// unwrap execution exception
throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
} finally {
// Clears the current shutdown task since it's completed
currentShutdownTaskFuture = null;
}

// convert to seconds as its easier to read than a big milli seconds number
Expand Down Expand Up @@ -279,6 +282,10 @@ public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}

public Future<?> getCurrentShutdownTaskFuture() {
return currentShutdownTaskFuture;
}

/**
* Shutdown all the consumers immediately.
*
Expand Down Expand Up @@ -386,7 +393,7 @@ protected void doShutdown() throws Exception {
/**
* Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
* on the service if it implement this interface.
*
*
* @param service the service
* @param forced whether to force shutdown
* @param includeChildren whether to prepare the child of the service as well
Expand Down