diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java index e7972fb8d5307..eb51b5fbb6c61 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java @@ -58,6 +58,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { protected String passphrase; protected BossPool bossPool; protected WorkerPool workerPool; + protected String networkInterface; public String getAddress() { return host + ":" + port; @@ -319,6 +320,16 @@ public void setWorkerPool(WorkerPool workerPool) { this.workerPool = workerPool; } + public String getNetworkInterface() + { + return networkInterface; + } + + public void setNetworkInterface(String networkInterface) + { + this.networkInterface = networkInterface; + } + /** * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible * with this, as a Netty listener bound on port X shares the same common @@ -396,6 +407,8 @@ public boolean compatible(NettyServerBootstrapConfiguration other) { isCompatible = false; } else if (workerPool != other.workerPool) { isCompatible = false; + } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) { + isCompatible = false; } return isCompatible; @@ -433,6 +446,7 @@ public String toStringBootstrapConfiguration() { + ", passphrase='" + passphrase + '\'' + ", bossPool=" + bossPool + ", workerPool=" + workerPool + + ", networkInterface='" + networkInterface + '\'' + '}'; } } diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index 5c59166d19373..301da606e0c01 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -17,6 +17,9 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -30,10 +33,12 @@ import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool; import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.jboss.netty.handler.ipfilter.IpV4Subnet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +48,8 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); + private static final String LOOPBACK_INTERFACE="lo"; + private static final String MULTICAST_SUBNET = "224.0.0.0/4"; private final ChannelGroup allChannels; private CamelContext camelContext; private ThreadFactory threadFactory; @@ -50,7 +57,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme private ChannelPipelineFactory pipelineFactory; private DatagramChannelFactory datagramChannelFactory; private ConnectionlessBootstrap connectionlessBootstrap; - private Channel channel; private WorkerPool workerPool; public SingleUDPNettyServerBootstrapFactory() { @@ -98,7 +104,8 @@ protected void doStop() throws Exception { stopServerBootstrap(); } - protected void startServerBootstrap() { + protected void startServerBootstrap() throws UnknownHostException, SocketException + { // create non-shared worker pool int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS; workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count); @@ -135,10 +142,22 @@ protected void startServerBootstrap() { // set the pipeline factory, which creates the pipeline for each newly created channels connectionlessBootstrap.setPipelineFactory(pipelineFactory); - LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); - channel = connectionlessBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - // to keep track of all channels in use - allChannels.add(channel); + InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET); + + if (multicastSubnet.contains(configuration.getHost())) + { + LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); + DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress); + String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface(); + NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface); + channel.joinGroup(hostAddress, multicastNetworkInterface); + allChannels.add(channel); + } else { + LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); + Channel channel = connectionlessBootstrap.bind(hostAddress); + allChannels.add(channel); + } } protected void stopServerBootstrap() {