Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.PersistenceException;
import javax.persistence.Query;

import org.apache.camel.Exchange;
Expand Down Expand Up @@ -88,25 +89,38 @@ public boolean add(String messageId) {
@Override
public boolean add(final Exchange exchange, final String messageId) {
final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager, true);

// Run this in single transaction.
Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
public Boolean doInTransaction(TransactionStatus status) {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}

List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
MessageProcessed processed = new MessageProcessed();
processed.setProcessorName(processorName);
processed.setMessageId(messageId);
processed.setCreatedAt(new Date());
entityManager.persist(processed);
entityManager.flush();
return Boolean.TRUE;
} else {
return Boolean.FALSE;
try {
List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
MessageProcessed processed = new MessageProcessed();
processed.setProcessorName(processorName);
processed.setMessageId(messageId);
processed.setCreatedAt(new Date());
entityManager.persist(processed);
entityManager.flush();
entityManager.close();
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} catch(Exception ex) {
LOG.error("Something went wrong trying to add message to repository {}", ex);
throw new PersistenceException(ex);
} finally {
try {
if (entityManager.isOpen()) {
entityManager.close();
}
} catch (Exception e) {
// ignore
}
}
}
});
Expand Down Expand Up @@ -158,15 +172,28 @@ public Boolean doInTransaction(TransactionStatus status) {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}

List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
return Boolean.FALSE;
} else {
MessageProcessed processed = (MessageProcessed) list.get(0);
entityManager.remove(processed);
entityManager.flush();
return Boolean.TRUE;
try{
List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
return Boolean.FALSE;
} else {
MessageProcessed processed = (MessageProcessed) list.get(0);
entityManager.remove(processed);
entityManager.flush();
entityManager.close();
return Boolean.TRUE;
}
} catch(Exception ex){
LOG.error("Something went wrong trying to remove message to repository {}", ex);
throw new PersistenceException(ex);
} finally {
try {
if (entityManager.isOpen()) {
entityManager.close();
}
} catch (Exception e) {
// ignore
}
}
}
});
Expand Down Expand Up @@ -195,17 +222,30 @@ public Boolean doInTransaction(TransactionStatus status) {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}

List<?> list = queryClear(entityManager);
if (!list.isEmpty()) {
Iterator it = list.iterator();
while (it.hasNext()) {
Object item = it.next();
entityManager.remove(item);
try {
List<?> list = queryClear(entityManager);
if (!list.isEmpty()) {
Iterator it = list.iterator();
while (it.hasNext()) {
Object item = it.next();
entityManager.remove(item);
}
entityManager.flush();
entityManager.close();
}
return Boolean.TRUE;
} catch(Exception ex) {
LOG.error("Something went wrong trying to clear the repository {}", ex);
throw new PersistenceException(ex);
} finally {
try {
if (entityManager.isOpen()) {
entityManager.close();
}
} catch (Exception e) {
// ignore
}
entityManager.flush();
}
return Boolean.TRUE;
}
});

Expand Down