diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 9423ed7605..6a5ad29af8 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -287,7 +287,7 @@ storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager" -# Also determines whether the unit tests for cgroup runs. +# Also determines whether the unit tests for cgroup runs. # If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run storm.resource.isolation.plugin.enable: false @@ -304,3 +304,6 @@ storm.cgroup.hierarchy.name: "storm" storm.supervisor.cgroup.rootdir: "storm" storm.cgroup.cgexec.cmd: "/bin/cgexec" storm.cgroup.memory.limit.tolerance.margin.mb: 128.0 + +# Default factory to use for transactional state +storm.transaction.state.store.factory: org.apache.storm.trident.topology.state.TransactionalStateStorageZkFactory diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 28bf6bd179..71d31f6fc8 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -24,6 +24,7 @@ import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; +import org.apache.storm.trident.topology.state.ITransactionalStateStorageFactory; import org.apache.storm.validation.ConfigValidationAnnotations.*; import org.apache.storm.validation.ConfigValidation.*; import com.esotericsoftware.kryo.Serializer; @@ -2280,9 +2281,13 @@ public class Config extends HashMap { @isString public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd"; + @isImplementationOfClass(implementsClass = ITransactionalStateStorageFactory.class) + public static final String STORM_TRANSATION_STATE_STORE_FACTORY = "storm.transaction.state.store.factory"; + /** * The amount of memory a worker can exceed its allocation before cgroup will kill it */ + @isPositiveNumber public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB = "storm.cgroup.memory.limit.tolerance.margin.mb"; diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorage.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorage.java new file mode 100644 index 0000000000..b292031672 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology.state; + +import java.util.List; + +public interface ITransactionalStateStorage { + + void setData(String path, Object obj); + void delete(String path); + List list(String path); + void mkdir(String path); + Object getData(String path); + void close(); +} \ No newline at end of file diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorageFactory.java new file mode 100644 index 0000000000..cd6e540e3f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/ITransactionalStateStorageFactory.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology.state; + +import java.util.Map; + +public interface ITransactionalStateStorageFactory { + ITransactionalStateStorage mkTransactionalState(Map conf, String id, String subroot); +} \ No newline at end of file diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TestTransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TestTransactionalState.java index 98b587ab9b..c9f893aad1 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TestTransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TestTransactionalState.java @@ -42,6 +42,6 @@ protected TestTransactionalState(Map conf, String id, String subroot) { public static void createNode(CuratorFramework curator, String rootDir, byte[] data, List acls, CreateMode mode) throws Exception { - TransactionalState.createNode(curator, rootDir, data, acls, mode); + TransactionalStateZkStorage.createNode(curator, rootDir, data, acls, mode); } } diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java index 3bf279448e..0c1f3f1bef 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java @@ -39,133 +39,56 @@ import org.json.simple.JSONValue; public class TransactionalState { - CuratorFramework _curator; - List _zkAcls = null; - + + public static final String USER = "user"; + public static final String COORDINATOR = "coordinator"; + private ITransactionalStateStorage transactionalStateStorage; + public static TransactionalState newUserState(Map conf, String id) { - return new TransactionalState(conf, id, "user"); + return new TransactionalState(conf, id, USER); } - + public static TransactionalState newCoordinatorState(Map conf, String id) { - return new TransactionalState(conf, id, "coordinator"); + return new TransactionalState(conf, id, COORDINATOR); } - + protected TransactionalState(Map conf, String id, String subroot) { try { - conf = new HashMap(conf); - String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT); - String rootDir = transactionalRoot + "/" + id + "/" + subroot; - List servers = (List) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS); - Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT); - ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf); - CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth); - _zkAcls = Utils.getWorkerACL(conf); - try { - TransactionalState.createNode(initter, transactionalRoot, null, null, null); - } catch (KeeperException.NodeExistsException e) { - } - try { - TransactionalState.createNode(initter, rootDir, null, _zkAcls, null); - } catch (KeeperException.NodeExistsException e) { - } - initter.close(); - - _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - protected static String forPath(PathAndBytesable builder, - String path, byte[] data) throws Exception { - return (data == null) - ? builder.forPath(path) - : builder.forPath(path, data); - } - - protected static void createNode(CuratorFramework curator, String path, - byte[] data, List acls, CreateMode mode) throws Exception { - ProtectACLCreateModePathAndBytesable builder = - curator.create().creatingParentsIfNeeded(); - - if (acls == null) { - if (mode == null ) { - TransactionalState.forPath(builder, path, data); + String className = null; + if (conf.get(Config.STORM_TRANSATION_STATE_STORE_FACTORY) != null) { + className = (String) conf.get(Config.STORM_TRANSATION_STATE_STORE_FACTORY); } else { - TransactionalState.forPath(builder.withMode(mode), path, data); + className = "org.apache.storm.trident.topology.state.TransactionalStateStorageZkFactory"; } - return; + Class clazz = Class.forName(className); + ITransactionalStateStorageFactory storageFactory = (ITransactionalStateStorageFactory) clazz.newInstance(); + transactionalStateStorage = storageFactory.mkTransactionalState(conf, id, subroot); + } catch (Exception e) { + throw new RuntimeException(e); } - - TransactionalState.forPath(builder.withACL(acls), path, data); } public void setData(String path, Object obj) { - path = "/" + path; - byte[] ser; - try { - ser = JSONValue.toJSONString(obj).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - try { - if(_curator.checkExists().forPath(path)!=null) { - _curator.setData().forPath(path, ser); - } else { - TransactionalState.createNode(_curator, path, ser, _zkAcls, - CreateMode.PERSISTENT); - } - } catch(Exception e) { - throw new RuntimeException(e); - } + transactionalStateStorage.setData(path, obj); } - + public void delete(String path) { - path = "/" + path; - try { - _curator.delete().forPath(path); - } catch (Exception e) { - throw new RuntimeException(e); - } + transactionalStateStorage.delete(path); } - + public List list(String path) { - path = "/" + path; - try { - if(_curator.checkExists().forPath(path)==null) { - return new ArrayList(); - } else { - return _curator.getChildren().forPath(path); - } - } catch(Exception e) { - throw new RuntimeException(e); - } + return transactionalStateStorage.list(path); } - + public void mkdir(String path) { - setData(path, 7); + transactionalStateStorage.mkdir(path); } - + public Object getData(String path) { - path = "/" + path; - try { - if(_curator.checkExists().forPath(path)!=null) { - return JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8")); - } else { - return null; - } - } catch(Exception e) { - throw new RuntimeException(e); - } + return transactionalStateStorage.getData(path); } - + public void close() { - _curator.close(); - } - - private Object getWithBackup(Map amap, Object primary, Object backup) { - Object ret = amap.get(primary); - if(ret==null) return amap.get(backup); - return ret; + transactionalStateStorage.close(); } } diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateStorageZkFactory.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateStorageZkFactory.java new file mode 100644 index 0000000000..26e691407e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateStorageZkFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology.state; + +import java.util.Map; + +public class TransactionalStateStorageZkFactory implements ITransactionalStateStorageFactory { + @Override + public ITransactionalStateStorage mkTransactionalState(Map conf, String id, String subroot) { + return new TransactionalStateZkStorage(conf, id, subroot); + } +} + diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateZkStorage.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateZkStorage.java new file mode 100644 index 0000000000..aff7ec40dd --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalStateZkStorage.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology.state; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.ZookeeperAuthInfo; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.json.simple.JSONValue; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransactionalStateZkStorage implements ITransactionalStateStorage { + + CuratorFramework _curator; + List _zkAcls = null; + + public TransactionalStateZkStorage(Map conf, String id, String subroot) { + try { + conf = new HashMap(conf); + String transactionalRoot = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT); + String rootDir = transactionalRoot + "/" + id + "/" + subroot; + List servers = (List) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS); + Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT); + ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf); + CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth); + _zkAcls = Utils.getWorkerACL(conf); + try { + createNode(initter, transactionalRoot, null, null, null); + } catch (KeeperException.NodeExistsException e) { + } + try { + createNode(initter, rootDir, null, _zkAcls, null); + } catch (KeeperException.NodeExistsException e) { + } + initter.close(); + + _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static String forPath(PathAndBytesable builder, + String path, byte[] data) throws Exception { + return (data == null) + ? builder.forPath(path) + : builder.forPath(path, data); + } + + public static void createNode(CuratorFramework curator, String path, + byte[] data, List acls, CreateMode mode) throws Exception { + ProtectACLCreateModePathAndBytesable builder = + curator.create().creatingParentsIfNeeded(); + + if (acls == null) { + if (mode == null) { + forPath(builder, path, data); + } else { + forPath(builder.withMode(mode), path, data); + } + return; + } + + forPath(builder.withACL(acls), path, data); + } + + public void setData(String path, Object obj) { + path = "/" + path; + byte[] ser; + try { + ser = JSONValue.toJSONString(obj).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + try { + if (_curator.checkExists().forPath(path) != null) { + _curator.setData().forPath(path, ser); + } else { + createNode(_curator, path, ser, _zkAcls, + CreateMode.PERSISTENT); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void delete(String path) { + path = "/" + path; + try { + _curator.delete().forPath(path); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List list(String path) { + path = "/" + path; + try { + if (_curator.checkExists().forPath(path) == null) { + return new ArrayList(); + } else { + return _curator.getChildren().forPath(path); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void mkdir(String path) { + setData(path, 7); + } + + public Object getData(String path) { + path = "/" + path; + try { + if (_curator.checkExists().forPath(path) != null) { + return JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8")); + } else { + return null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void close() { + _curator.close(); + } + + private Object getWithBackup(Map amap, Object primary, Object backup) { + Object ret = amap.get(primary); + if (ret == null) return amap.get(backup); + return ret; + } +}