Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
protected String passphrase;
protected BossPool bossPool;
protected WorkerPool workerPool;
protected String networkInterface;

public String getAddress() {
return host + ":" + port;
Expand Down Expand Up @@ -319,6 +320,16 @@ public void setWorkerPool(WorkerPool workerPool) {
this.workerPool = workerPool;
}

public String getNetworkInterface()
{
return networkInterface;
}

public void setNetworkInterface(String networkInterface)
{
this.networkInterface = networkInterface;
}

/**
* Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
* with this, as a Netty listener bound on port X shares the same common
Expand Down Expand Up @@ -396,6 +407,8 @@ public boolean compatible(NettyServerBootstrapConfiguration other) {
isCompatible = false;
} else if (workerPool != other.workerPool) {
isCompatible = false;
} else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) {
isCompatible = false;
}

return isCompatible;
Expand Down Expand Up @@ -433,6 +446,7 @@ public String toStringBootstrapConfiguration() {
+ ", passphrase='" + passphrase + '\''
+ ", bossPool=" + bossPool
+ ", workerPool=" + workerPool
+ ", networkInterface='" + networkInterface + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.camel.component.netty;

import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -30,10 +33,12 @@
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
import org.jboss.netty.channel.socket.nio.WorkerPool;
import org.jboss.netty.handler.ipfilter.IpV4Subnet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,14 +48,15 @@
public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {

protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
private static final String LOOPBACK_INTERFACE="lo";
private static final String MULTICAST_SUBNET = "224.0.0.0/4";
private final ChannelGroup allChannels;
private CamelContext camelContext;
private ThreadFactory threadFactory;
private NettyServerBootstrapConfiguration configuration;
private ChannelPipelineFactory pipelineFactory;
private DatagramChannelFactory datagramChannelFactory;
private ConnectionlessBootstrap connectionlessBootstrap;
private Channel channel;
private WorkerPool workerPool;

public SingleUDPNettyServerBootstrapFactory() {
Expand Down Expand Up @@ -98,7 +104,8 @@ protected void doStop() throws Exception {
stopServerBootstrap();
}

protected void startServerBootstrap() {
protected void startServerBootstrap() throws UnknownHostException, SocketException
{
// create non-shared worker pool
int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
Expand Down Expand Up @@ -135,10 +142,22 @@ protected void startServerBootstrap() {
// set the pipeline factory, which creates the pipeline for each newly created channels
connectionlessBootstrap.setPipelineFactory(pipelineFactory);

LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
channel = connectionlessBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
// to keep track of all channels in use
allChannels.add(channel);
InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);

if (multicastSubnet.contains(configuration.getHost()))
{
LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
channel.joinGroup(hostAddress, multicastNetworkInterface);
allChannels.add(channel);
} else {
LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
Channel channel = connectionlessBootstrap.bind(hostAddress);
allChannels.add(channel);
}
}

protected void stopServerBootstrap() {
Expand Down