From c32a4e8220cecaf54d6bd191b1b38403c773b3ac Mon Sep 17 00:00:00 2001 From: Samrat Dhillon Date: Sun, 28 Mar 2021 14:44:12 -0400 Subject: [PATCH] Explicilty calling doneSynchronizations on copy exchange objects as these objects have onCompletion added by NettyHttpProducer. Otherwise original exchange objects will not have these SynchronizationAdapters set and will result in memory leak from NettyHttpProducer --- .../faulttolerance/FaultToleranceProcessor.java | 10 ++++++++++ .../component/resilience4j/ResilienceProcessor.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index e8eb374dbc7e2..9d5754c4764b6 100644 --- a/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -44,8 +44,10 @@ import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.util.ObjectHelper; import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; @@ -317,6 +319,14 @@ public Exchange call() throws Exception { exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } + if (copy.getUnitOfWork() == null) { + // handover completions and done them manually to ensure they are being executed + List synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); + } else { + // done the unit of work + copy.getUnitOfWork().done(exchange); + } } catch (Throwable e) { exchange.setException(e); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index c9bd5ca66d65a..e8cb471160029 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -47,8 +47,10 @@ import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -423,6 +425,15 @@ private Exchange processInCopy(Exchange exchange) { exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } + if (copy.getUnitOfWork() == null) { + // handover completions and done them manually to ensure they are being executed + List synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); + } else { + // done the unit of work + copy.getUnitOfWork().done(exchange); + } + } catch (Throwable e) { exchange.setException(e); }