Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,40 @@ private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
}

@Deprecated
Comment thread
jojochuang marked this conversation as resolved.
Outdated
static Class<? extends BlockingQueue<Call>> getQueueClass(
String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}

/**
* Return class configured by property 'ipc.<port>.callqueue.impl' if it is
* present. If the config is not present, default config (without port) is
* used to derive class i.e 'ipc.callqueue.impl', and derived class is
* returned if class value is present and valid. If default config is also
* not present, default class {@link LinkedBlockingQueue} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Class returned based on configuration.
*/
static Class<? extends BlockingQueue<Call>> getQueueClass(
String namespace, int port, Configuration conf) {
String nameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
String nameWithoutPort = namespace + "."
Comment thread
jojochuang marked this conversation as resolved.
Outdated
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(nameWithPort, null);
if(queueClass == null) {
queueClass = conf.getClass(nameWithoutPort, LinkedBlockingQueue.class);
}
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}

@Deprecated
static Class<? extends RpcScheduler> getSchedulerClass(
String prefix, Configuration conf) {
String schedulerKeyname = prefix + "." + CommonConfigurationKeys
Expand All @@ -746,6 +773,51 @@ static Class<? extends RpcScheduler> getSchedulerClass(
return CallQueueManager.convertSchedulerClass(schedulerClass);
}

/**
* Return class configured by property 'ipc.<port>.scheduler.impl' if it is
* present. If the config is not present, and if property
* 'ipc.<port>.callqueue.impl' represents FairCallQueue class,
* return DecayRpcScheduler. If config 'ipc.<port>.callqueue.impl'
* does not have value FairCallQueue, default config (without port) is used
* to derive class i.e 'ipc.scheduler.impl'. If default config is also not
* present, default class {@link DefaultRpcScheduler} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Class returned based on configuration.
*/
static Class<? extends RpcScheduler> getSchedulerClass(
String namespace, int port, Configuration conf) {
String schedulerKeyNameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY;
String schedulerKeyNameWithoutPort = namespace + "."
Comment thread
jojochuang marked this conversation as resolved.
Outdated
+ CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY;

Class<?> schedulerClass = conf.getClass(schedulerKeyNameWithPort, null);
// Patch the configuration for legacy fcq configuration that does not have
// a separate scheduler setting
if (schedulerClass == null) {
String queueKeyNameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(queueKeyNameWithPort, null);
if (queueClass != null) {
if (queueClass.getCanonicalName().equals(
FairCallQueue.class.getCanonicalName())) {
conf.setClass(schedulerKeyNameWithPort, DecayRpcScheduler.class,
RpcScheduler.class);
}
}
}

schedulerClass = conf.getClass(schedulerKeyNameWithPort, null);
if (schedulerClass == null) {
schedulerClass = conf.getClass(schedulerKeyNameWithoutPort,
DefaultRpcScheduler.class);
}
return CallQueueManager.convertSchedulerClass(schedulerClass);
}

/*
* Refresh the call queue
*/
Expand All @@ -755,8 +827,10 @@ public synchronized void refreshCallQueue(Configuration conf) {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
callQueue.swapQueue(getSchedulerClass(prefix, conf),
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
callQueue.swapQueue(
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
}

Expand Down Expand Up @@ -3107,8 +3181,9 @@ protected Server(String bindAddress, int port,

// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
getSchedulerClass(prefix, conf),
this.callQueue = new CallQueueManager<>(
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);

this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,21 @@
</description>
</property>

<property>
<name>ipc.callqueue.impl</name>
<value>java.util.concurrent.LinkedBlockingQueue</value>
<description>
The fully qualified name of a class to use as the implementation
of a call queue. The default implementation is
java.util.concurrent.LinkedBlockingQueue (FIFO queue).
Use org.apache.hadoop.ipc.FairCallQueue for the Fair Call Queue.
This config is fallback config for ipc.[port_number].callqueue.impl.
If call queue is not defined at port level, this default
config is used and hence, this is fallback config to
config with port.
</description>
</property>

<property>
<name>ipc.[port_number].scheduler.impl</name>
<value>org.apache.hadoop.ipc.DefaultRpcScheduler</value>
Expand All @@ -2506,6 +2521,24 @@
</description>
</property>

<property>
<name>ipc.scheduler.impl</name>
<value>org.apache.hadoop.ipc.DefaultRpcScheduler</value>
<description>
The fully qualified name of a class to use as the
implementation of the scheduler. The default implementation is
org.apache.hadoop.ipc.DefaultRpcScheduler (no-op scheduler) when
not using FairCallQueue. If using FairCallQueue, defaults to
org.apache.hadoop.ipc.DecayRpcScheduler. Use
org.apache.hadoop.ipc.DecayRpcScheduler in conjunction
with the Fair Call Queue.
This config is fallback config for ipc.[port_number].scheduler.impl.
If scheduler queue is not defined at port level, this default
config is used and hence, this is fallback config to
config with port.
</description>
</property>

<property>
<name>ipc.[port_number].scheduler.priority.levels</name>
<value>4</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ public void initializeMemberVariables() {
// FairCallQueue configs that includes dynamic ports in its keys
xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl");
xmlPropsToSkipCompare.add("ipc.callqueue.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl");
xmlPropsToSkipCompare.add("ipc.scheduler.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.priority.levels");
xmlPropsToSkipCompare.add(
"ipc.[port_number].faircallqueue.multiplexer.weights");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void testFcqBackwardCompatibility() throws InterruptedException {

// Specify only Fair Call Queue without a scheduler
// Ensure the DecayScheduler will be added to avoid breaking.
Class<? extends RpcScheduler> scheduler = Server.getSchedulerClass(ns,
Class<? extends RpcScheduler> scheduler =
Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0,
conf);
assertTrue(scheduler.getCanonicalName().
equals("org.apache.hadoop.ipc.DecayRpcScheduler"));
Expand Down Expand Up @@ -250,8 +251,8 @@ public void testSchedulerWithoutFCQ() throws InterruptedException {
"LinkedBlockingQueue"));

manager = new CallQueueManager<FakeCall>(queue,
Server.getSchedulerClass(ns, conf), false,
3, "", conf);
Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0,
conf), false, 3, "", conf);

// LinkedBlockingQueue with a capacity of 3 can put 3 calls
assertCanPut(manager, 3, 3);
Expand Down