Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}

@Override
protected void doStart() throws Exception {
((QuickfixjEndpoint)getEndpoint()).ensureInitialized();
super.doStart();
}

public void onExchange(Exchange exchange) throws Exception {
if (isStarted()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.component.quickfixj;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
Expand All @@ -37,7 +36,7 @@
public class QuickfixjConsumerTest {
private Exchange mockExchange;
private Processor mockProcessor;
private Endpoint mockEndpoint;
private QuickfixjEndpoint mockEndpoint;
private Message inboundFixMessage;

@Before
Expand All @@ -54,7 +53,7 @@ public void setUp() {
Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);

mockProcessor = Mockito.mock(Processor.class);
mockEndpoint = Mockito.mock(Endpoint.class);
mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
}

Expand All @@ -71,6 +70,7 @@ public void processExchangeOnlyWhenStarted() throws Exception {
Mockito.verifyZeroInteractions(mockProcessor);

consumer.start();
Mockito.verify(mockEndpoint).ensureInitialized();
Assert.assertThat(consumer.isStarted(), CoreMatchers.is(true));

consumer.onExchange(mockExchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@
</filter>
</route>
<route>
<from uri="lazyQuickfix:example"/>
<filter>
<simple>${in.header.EventCategory} == 'AppMessageReceived'</simple>
<to uri="log:test"/>
</filter>
<from uri="vm:test"/>
<to uri="lazyQuickfix:example"/>
</route>
</camelContext>

Expand All @@ -60,7 +57,7 @@
<property name="lazyCreateEngines" value="true" />
<property name="configurations">
<util:map>
<entry key="example" value-ref="quickfixjConfiguration"/>
<entry key="example" value-ref="lazyQuickfixjConfiguration"/>
</util:map>
</property>
<property name="messageFactory">
Expand Down Expand Up @@ -97,4 +94,33 @@
</bean>
<!-- END SNIPPET: e1 -->

<!-- lazy quickfix settings -->
<bean id="lazyQuickfixjConfiguration" class="org.apache.camel.component.quickfixj.QuickfixjConfiguration">
<property name="defaultSettings">
<util:map>
<entry key="SocketConnectProtocol" value="VM_PIPE"/>
<entry key="SocketAcceptProtocol" value="VM_PIPE"/>
<entry key="UseDataDictionary" value="N"/>
</util:map>
</property>
<property name="sessionSettings">
<util:map>
<entry key="FIX.4.2:INITIATOR->ACCEPTOR">
<util:map>
<entry key="ConnectionType" value="initiator"/>
<entry key="SocketConnectHost" value="localhost"/>
<entry key="SocketConnectPort" value="5001"/>
</util:map>
</entry>
<entry key="FIX.4.2:ACCEPTOR->INITIATOR">
<util:map>
<entry key="ConnectionType" value="acceptor"/>
<entry key="SocketAcceptPort" value="5001"/>
</util:map>
</entry>
</util:map>
</property>
</bean>
<!-- END SNIPPET: e2 -->

</beans>