Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.apache.camel.impl.DefaultComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import quickfix.LogFactory;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
import quickfix.SessionSettings;

public class QuickfixjComponent extends DefaultComponent implements StartupListener {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
private static final String PARAMETER_LAZY_CREATE_ENGINE = "lazyCreateEngine";

private final Object engineInstancesLock = new Object();
private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>();
Expand All @@ -43,6 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
private LogFactory logFactory;
private MessageFactory messageFactory;
private Map<String, QuickfixjConfiguration> configurations = new HashMap<String, QuickfixjConfiguration>();
private boolean lazyCreateEngines;

@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
Expand All @@ -58,12 +61,18 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
}
if (engine == null) {
QuickfixjConfiguration configuration = configurations.get(remaining);
SessionSettings settings = null;
if (configuration != null) {
SessionSettings settings = configuration.createSessionSettings();
engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory);
settings = configuration.createSessionSettings();
} else {
engine = new QuickfixjEngine(uri, remaining, messageStoreFactory, logFactory, messageFactory);
settings = QuickfixjEngine.loadSettings(remaining);
}
Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE);
if (lazyCreateEngineForEndpoint == null) {
lazyCreateEngineForEndpoint = isLazyCreateEngines();
}
engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory,
lazyCreateEngineForEndpoint);

// only start engine if CamelContext is already started, otherwise the engines gets started
// automatic later when CamelContext has been started using the StartupListener
Expand Down Expand Up @@ -112,8 +121,12 @@ protected void doShutdown() throws Exception {
}

private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception {
LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
engine.start();
if (!engine.isLazy()) {
LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
engine.start();
} else {
LOG.info("QuickFIX/J engine: {} will start lazily", engine.getUri());
}
}

// Test Support
Expand Down Expand Up @@ -153,6 +166,20 @@ public void setConfigurations(Map<String, QuickfixjConfiguration> configurations
this.configurations = configurations;
}

public boolean isLazyCreateEngines() {
return this.lazyCreateEngines;
}

/**
* If set to <code>true</code>, the engines will be created and started when needed (when first message
* is send)
*
* @param lazyCreateEngines
*/
public void setLazyCreateEngines(boolean lazyCreateEngines) {
this.lazyCreateEngines = lazyCreateEngines;
}

@Override
public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
// only start quickfix engines when CamelContext have finished starting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ public boolean isMultipleConsumersSupported() {
return true;
}

/**
* Initializing and starts the engine if it wasn't initialized so far.
*/
public void ensureInitialized() throws Exception {
if (!engine.isInitialized()) {
synchronized (engine) {
if (!engine.isInitialized()) {
engine.initializeEngine();
engine.start();
}
}
}
}

public QuickfixjEngine getEngine() {
return engine;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.management.JMException;
import javax.management.ObjectName;
Expand Down Expand Up @@ -87,17 +88,20 @@ public class QuickfixjEngine extends ServiceSupport {

private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);

private final Acceptor acceptor;
private final Initiator initiator;
private final JmxExporter jmxExporter;
private final MessageStoreFactory messageStoreFactory;
private final LogFactory sessionLogFactory;
private final MessageFactory messageFactory;
private Acceptor acceptor;
private Initiator initiator;
private JmxExporter jmxExporter;
private MessageStoreFactory messageStoreFactory;
private LogFactory sessionLogFactory;
private MessageFactory messageFactory;
private final MessageCorrelator messageCorrelator = new MessageCorrelator();
private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
private final String uri;
private ObjectName acceptorObjectName;
private ObjectName initiatorObjectName;
private final SessionSettings settings;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private boolean lazy;

public enum ThreadModel {
ThreadPerConnector, ThreadPerSession;
Expand Down Expand Up @@ -148,13 +152,48 @@ public QuickfixjEngine(String uri, SessionSettings settings, boolean forcedShutd

public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
this(uri, settings, messageStoreFactoryOverride, sessionLogFactoryOverride, messageFactoryOverride, false);
}

public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
MessageFactory messageFactoryOverride, boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException {
addEventListener(messageCorrelator);

this.uri = uri;

messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory();
sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings);
messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings);
this.lazy = lazy;
this.settings = settings;

// overrides
if (messageFactoryOverride != null) {
messageFactory = messageFactoryOverride;
}
if (sessionLogFactoryOverride != null) {
sessionLogFactory = sessionLogFactoryOverride;
}
if (messageStoreFactoryOverride != null) {
messageStoreFactory = messageStoreFactoryOverride;
}

if (!lazy) {
initializeEngine();
}
}

/**
* Initializes the engine on demand. May be called immediately in constructor or when needed.
* If initializing later, it should be started afterwards.
*/
void initializeEngine() throws ConfigError,
FieldConvertError, JMException {
if (messageFactory == null) {
messageFactory = new DefaultMessageFactory();
}
if (sessionLogFactory == null) {
sessionLogFactory = inferLogFactory(settings);
}
if (messageStoreFactory == null) {
messageStoreFactory = inferMessageStoreFactory(settings);
}

// Set default session schedule if not specified in configuration
if (!settings.isSetting(Session.SETTING_START_TIME)) {
Expand Down Expand Up @@ -208,9 +247,10 @@ public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory
} finally {
Thread.currentThread().setContextClassLoader(ccl);
}
initialized.set(true);
}

private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
if (inputStream == null) {
throw new IllegalArgumentException("Could not load " + settingsResourceName);
Expand Down Expand Up @@ -507,6 +547,14 @@ public MessageCorrelator getMessageCorrelator() {
return messageCorrelator;
}

public boolean isInitialized() {
return this.initialized.get();
}

public boolean isLazy() {
return this.lazy;
}

// For Testing
Initiator getInitiator() {
return initiator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public QuickfixjEndpoint getEndpoint() {
@Override
public void process(Exchange exchange) throws Exception {
try {
getEndpoint().ensureInitialized();
sendMessage(exchange, exchange.getIn());
} catch (Exception e) {
exchange.setException(e);
Expand Down Expand Up @@ -76,6 +77,7 @@ void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) throw

if (callable != null) {
Message reply = callable.call();
exchange.getOut().getHeaders().putAll(camelMessage.getHeaders());
exchange.getOut().setBody(reply);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public void createEndpointBeforeComponentStart() throws Exception {
Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getProvisionalEngines().size(), is(1));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
Expand All @@ -178,6 +179,7 @@ public void createEndpointBeforeComponentStart() throws Exception {
Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile2.getName(), null));
assertThat(component.getProvisionalEngines().size(), is(2));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(nullValue()));
Expand All @@ -187,15 +189,16 @@ public void createEndpointBeforeComponentStart() throws Exception {

assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(component.getEngines().size(), is(2));
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));

// Move these too an endpoint testcase if one exists
assertThat(e1.isSingleton(), is(true));
assertThat(((MultipleConsumersSupport)e1).isMultipleConsumersSupported(), is(true));
assertThat(e2.isSingleton(), is(true));
assertThat(((MultipleConsumersSupport)e2).isMultipleConsumersSupported(), is(true));
}

@Test
public void createEndpointAfterComponentStart() throws Exception {
setUpComponent();
Expand All @@ -211,18 +214,109 @@ public void createEndpointAfterComponentStart() throws Exception {
Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getEngines().size(), is(1));
assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));

Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID));
assertThat(component.getEngines().size(), is(1));
assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
}

@Test
public void createEnginesLazily() throws Exception {
setUpComponent();
component.setLazyCreateEngines(true);

settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);

writeSettings();

// start the component
camelContext.start();

QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getEngines().size(), is(1));
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));

e1.ensureInitialized();
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
}

@Test
public void createEndpointsInNonLazyComponent() throws Exception {
setUpComponent();
// configuration will be done per endpoint
component.setLazyCreateEngines(false);

settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);

writeSettings();

// will start the component
camelContext.start();

QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null) + "?lazyCreateEngine=true");
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));

e1.ensureInitialized();
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));

writeSettings(settings, false);

// will use connector's lazyCreateEngines setting
component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID));
assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
}

@Test
public void createEndpointsInLazyComponent() throws Exception {
setUpComponent();
component.setLazyCreateEngines(true);

settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);

writeSettings();

// will start the component
camelContext.start();

// will use connector's lazyCreateEngines setting
QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));

e1.ensureInitialized();
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));

writeSettings(settings, false);

// will override connector's lazyCreateEngines setting
component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID) + "&lazyCreateEngine=false");
assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
}

@Test
public void componentStop() throws Exception {
setUpComponent();
Expand Down Expand Up @@ -259,6 +353,8 @@ public void process(Exchange exchange) throws Exception {
component.stop();

assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
// it should still be initialized (ready to start again)
assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
}

@Test
Expand Down
Loading