Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ storm.daemon.metrics.reporter.plugins:
- "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"

storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager"
# Also determines whether the unit tests for cgroup runs.
# Also determines whether the unit tests for cgroup runs.
# If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run
storm.resource.isolation.plugin.enable: false

Expand All @@ -304,3 +304,6 @@ storm.cgroup.hierarchy.name: "storm"
storm.supervisor.cgroup.rootdir: "storm"
storm.cgroup.cgexec.cmd: "/bin/cgexec"
storm.cgroup.memory.limit.tolerance.margin.mb: 128.0

# Default factory to use for transactional state
storm.transaction.state.store.factory: org.apache.storm.trident.topology.state.TransactionalStateStorageZkFactory
5 changes: 5 additions & 0 deletions storm-core/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.serialization.IKryoDecorator;
import org.apache.storm.serialization.IKryoFactory;
import org.apache.storm.trident.topology.state.ITransactionalStateStorageFactory;
import org.apache.storm.validation.ConfigValidationAnnotations.*;
import org.apache.storm.validation.ConfigValidation.*;
import com.esotericsoftware.kryo.Serializer;
Expand Down Expand Up @@ -2280,9 +2281,13 @@ public class Config extends HashMap<String, Object> {
@isString
public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";

@isImplementationOfClass(implementsClass = ITransactionalStateStorageFactory.class)
public static final String STORM_TRANSATION_STATE_STORE_FACTORY = "storm.transaction.state.store.factory";

/**
* The amount of memory a worker can exceed its allocation before cgroup will kill it
*/

@isPositiveNumber
public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB = "storm.cgroup.memory.limit.tolerance.margin.mb";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.trident.topology.state;

import java.util.List;

public interface ITransactionalStateStorage {

void setData(String path, Object obj);
void delete(String path);
List<String> list(String path);
void mkdir(String path);
Object getData(String path);
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.trident.topology.state;

import java.util.Map;

public interface ITransactionalStateStorageFactory {
ITransactionalStateStorage mkTransactionalState(Map conf, String id, String subroot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ protected TestTransactionalState(Map conf, String id, String subroot) {
public static void createNode(CuratorFramework curator,
String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
throws Exception {
TransactionalState.createNode(curator, rootDir, data, acls, mode);
TransactionalStateZkStorage.createNode(curator, rootDir, data, acls, mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,133 +39,56 @@
import org.json.simple.JSONValue;

public class TransactionalState {
CuratorFramework _curator;
List<ACL> _zkAcls = null;


public static final String USER = "user";
public static final String COORDINATOR = "coordinator";
private ITransactionalStateStorage transactionalStateStorage;

public static TransactionalState newUserState(Map conf, String id) {
return new TransactionalState(conf, id, "user");
return new TransactionalState(conf, id, USER);
}

public static TransactionalState newCoordinatorState(Map conf, String id) {
return new TransactionalState(conf, id, "coordinator");
return new TransactionalState(conf, id, COORDINATOR);
}

protected TransactionalState(Map conf, String id, String subroot) {
try {
conf = new HashMap(conf);
String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
String rootDir = transactionalRoot + "/" + id + "/" + subroot;
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
_zkAcls = Utils.getWorkerACL(conf);
try {
TransactionalState.createNode(initter, transactionalRoot, null, null, null);
} catch (KeeperException.NodeExistsException e) {
}
try {
TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
} catch (KeeperException.NodeExistsException e) {
}
initter.close();

_curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected static String forPath(PathAndBytesable<String> builder,
String path, byte[] data) throws Exception {
return (data == null)
? builder.forPath(path)
: builder.forPath(path, data);
}

protected static void createNode(CuratorFramework curator, String path,
byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
ProtectACLCreateModePathAndBytesable<String> builder =
curator.create().creatingParentsIfNeeded();

if (acls == null) {
if (mode == null ) {
TransactionalState.forPath(builder, path, data);
String className = null;
if (conf.get(Config.STORM_TRANSATION_STATE_STORE_FACTORY) != null) {
className = (String) conf.get(Config.STORM_TRANSATION_STATE_STORE_FACTORY);
} else {
TransactionalState.forPath(builder.withMode(mode), path, data);
className = "org.apache.storm.trident.topology.state.TransactionalStateStorageZkFactory";
}
return;
Class clazz = Class.forName(className);
ITransactionalStateStorageFactory storageFactory = (ITransactionalStateStorageFactory) clazz.newInstance();
transactionalStateStorage = storageFactory.mkTransactionalState(conf, id, subroot);
} catch (Exception e) {
throw new RuntimeException(e);
}

TransactionalState.forPath(builder.withACL(acls), path, data);
}

public void setData(String path, Object obj) {
path = "/" + path;
byte[] ser;
try {
ser = JSONValue.toJSONString(obj).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
try {
if(_curator.checkExists().forPath(path)!=null) {
_curator.setData().forPath(path, ser);
} else {
TransactionalState.createNode(_curator, path, ser, _zkAcls,
CreateMode.PERSISTENT);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
transactionalStateStorage.setData(path, obj);
}

public void delete(String path) {
path = "/" + path;
try {
_curator.delete().forPath(path);
} catch (Exception e) {
throw new RuntimeException(e);
}
transactionalStateStorage.delete(path);
}

public List<String> list(String path) {
path = "/" + path;
try {
if(_curator.checkExists().forPath(path)==null) {
return new ArrayList<String>();
} else {
return _curator.getChildren().forPath(path);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
return transactionalStateStorage.list(path);
}

public void mkdir(String path) {
setData(path, 7);
transactionalStateStorage.mkdir(path);
}

public Object getData(String path) {
path = "/" + path;
try {
if(_curator.checkExists().forPath(path)!=null) {
return JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8"));
} else {
return null;
}
} catch(Exception e) {
throw new RuntimeException(e);
}
return transactionalStateStorage.getData(path);
}

public void close() {
_curator.close();
}

private Object getWithBackup(Map amap, Object primary, Object backup) {
Object ret = amap.get(primary);
if(ret==null) return amap.get(backup);
return ret;
transactionalStateStorage.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.trident.topology.state;

import java.util.Map;

public class TransactionalStateStorageZkFactory implements ITransactionalStateStorageFactory {
@Override
public ITransactionalStateStorage mkTransactionalState(Map conf, String id, String subroot) {
return new TransactionalStateZkStorage(conf, id, subroot);
}
}

Loading