diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index 88baa8dd8561f..eb22764d30e70 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -46,6 +46,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { private final TransactionStrategy template; private QueryFactory queryFactory; private DeleteHandler deleteHandler; + private DeleteHandler preConsumedMethod = null; private String query; private String namedQuery; private String nativeQuery; @@ -122,7 +123,7 @@ public Object doInJpa(EntityManager entityManager) throws PersistenceException { return endpoint.getCamelContext().getTypeConverter().convertTo(int.class, messagePolled); } - + public int processBatch(Queue exchanges) throws Exception { int total = exchanges.size(); @@ -146,8 +147,12 @@ public int processBatch(Queue exchanges) throws Exception { // update pending number of exchanges pendingExchanges = total - index - 1; - + if (lockEntity(result, entityManager)) { + // Run the @PreConsumed callback - User supplied preprocessing + DeleteHandler preConsumed = getPreConsumedMethod(); + preConsumed.deleteObject(entityManager, result); + // process the current exchange LOG.debug("Processing exchange: {}", exchange); getProcessor().process(exchange); @@ -170,6 +175,45 @@ public JpaEndpoint getEndpoint() { return endpoint; } + public DeleteHandler getPreConsumedMethod() { + if (preConsumedMethod == null) { + // Look for @PreConsumed to allow custom callback before the Entity has been consumed + Class entityType = getEndpoint().getEntityType(); + if (entityType != null) { + // Inspect the method(s) annotated with @PreConsumed + List methods = ObjectHelper.findMethodsWithAnnotation(entityType, PreConsumed.class); + if (methods.size() > 1) { + throw new IllegalStateException("Only one method can be annotated with the @PreConsumed annotation but found: " + methods); + } else if (methods.size() == 1) { + // Inspect the parameters of the @PreConsumed method + Class[] parameters = methods.get(0).getParameterTypes(); + if (parameters.length != 0) { + throw new IllegalStateException("@PreConsumed annotated method cannot have parameters!"); + } + + final Method method = methods.get(0); + preConsumedMethod = new DeleteHandler() { + @Override + public void deleteObject(EntityManager entityManager, Object entityBean) { + ObjectHelper.invokeMethod(method, entityBean); + } + }; + } else { + preConsumedMethod = new DeleteHandler() { + @Override + public void deleteObject(EntityManager entityManager, Object entityBean) { + // Do nothing + } + }; + } + } else { + throw new IllegalStateException("Jpa endpoint did not contain a defined entity type!"); + } + } + + return preConsumedMethod; + } + public QueryFactory getQueryFactory() { if (queryFactory == null) { queryFactory = createQueryFactory(); diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java new file mode 100644 index 0000000000000..97cc0562ca1e5 --- /dev/null +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java @@ -0,0 +1,33 @@ +/* + * Copyright 2013 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jpa; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An annotation to mark a method to be invoked before an entity bean is processed and routed; so + * that it can be updated in such a way that the results are available to later nodes in the route. + * + * @author R. Matt McCann [mccann.matt AT gmail.com] + */ +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ ElementType.METHOD }) +public @interface PreConsumed { }