Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.lib.math.SumKeyVal;
import com.datatorrent.stram.engine.PortContext;
import com.dt.weather.constants.WeatherConstants;
import com.dt.weather.converter.OutputConverter;
import com.dt.weather.counter.KeyValChangeAggregator;
import com.dt.weather.counter.KeyValChangeAlert;
import com.dt.weather.event.convertor.SinglePortWeatherEventConvertor;
import com.dt.weather.input.SimpleFileReader;
Expand All @@ -40,9 +42,9 @@ public void populateDAG(DAG dag, Configuration conf)

fileReader.setDirectory(conf.get(WeatherConstants.INPUT_DIRECTORY_PATH,
"/Users/dev/workspace/mydtapp/src/test/resources/data/"));

//Uncomment the rename logic in simple file reader when the regex works
// fileReader.getScanner().setFilePatternRegexp("\\*.json");
// fileReader.getScanner().setFilePatternRegexp("\\*.json");

fileReader.setScanIntervalMillis(0);
fileReader.setEmitBatchSize(1);
Expand All @@ -51,22 +53,16 @@ public void populateDAG(DAG dag, Configuration conf)

SinglePortWeatherEventConvertor eventConvertor = dag.addOperator("WeatherEventConv",
new SinglePortWeatherEventConvertor());

// DefaultConverter defaultConv = dag.addOperator("DefaultConverter", new DefaultConverter());

KeyValChangeAlert<String, Integer> changeNotifier = dag.addOperator("ChangeNotifier", new KeyValChangeAlert<String, Integer>());

changeNotifier.setAbsoluteThreshold(1);

//TODO- add the partitioner code snippet here
//TODO - change the locality of the converter with the uniqCounter

//Add the overall counter
SumKeyVal<String, Integer> counter = dag.addOperator("GlobalCounter", new SumKeyVal<String, Integer>());
KeyValChangeAggregator<String, Integer> counter = dag.addOperator("GlobalCounter",
new KeyValChangeAggregator<String, Integer>());
counter.setType(Integer.class);
counter.setCumulative(true);

dag.setAttribute(counter, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<SumKeyVal<String,Integer>>(3));
dag.setAttribute(counter, Context.OperatorContext.PARTITIONER,
new StatelessPartitioner<KeyValChangeAggregator<String, Integer>>(3));

OutputConverter<String, Integer> opConv = dag.addOperator("Converter", new OutputConverter<String, Integer>());

//Kafka output's
KafkaSinglePortOutputOperator<Object, Object> kafkaOutputOperator = dag.addOperator("KafkaOutputUniques",
Expand All @@ -75,21 +71,15 @@ public void populateDAG(DAG dag, Configuration conf)

kafkaOutputOperator.setTopic(conf.get(WeatherConstants.TOPIC, "counter"));

System.out.println("heremoe");

/*Assemble the DAG*/

dag.addStream("InputRecords", fileReader.output, eventConvertor.data).setLocality(Locality.THREAD_LOCAL);

dag.addStream("Global Counter", eventConvertor.output, counter.data);

dag.addStream("Change Notifier", counter.sum, changeNotifier.data);

dag.addStream("Convert Output", changeNotifier.alert, kafkaOutputOperator.inputPort).setLocality(
Locality.CONTAINER_LOCAL);
dag.addStream("Unifier Output", counter.alert, opConv.data);

// dag.addStream("KafkaGLobalCountsWriter", defaultConv.output, kafkaOutputOperator.inputPort).setLocality(
// Locality.CONTAINER_LOCAL);
dag.addStream("Convert Output", opConv.output, kafkaOutputOperator.inputPort);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.dt.weather.converter;

import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.commons.lang.mutable.MutableDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.dt.weather.constants.WeatherConstants;

public class OutputConverter<K, V extends Number> extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(OutputConverter.class);

public final transient DefaultInputPort<HashMap<K, MutableDouble>> data = new DefaultInputPort<HashMap<K, MutableDouble>>()
{
@Override
public void process(HashMap<K, MutableDouble> tuple)
{

emitChangedAggregates(tuple);

}
};

public void emitChangedAggregates(HashMap<K, MutableDouble> tuple)
{
StringBuilder outTuple = new StringBuilder();

outTuple.append("<time: " + System.currentTimeMillis() / 1000);

String keyToSkip = "uniques";
MutableDouble uniqueVal = new MutableDouble();

for (Entry<K, MutableDouble> entry : tuple.entrySet()) {
String key = (String)entry.getKey();
MutableDouble value = new MutableDouble(entry.getValue());

if (keyToSkip.equalsIgnoreCase(key)) {
uniqueVal.setValue(value);
continue;
}

outTuple.append(WeatherConstants.RECORD_SEPARATOR);
outTuple.append(key + WeatherConstants.TUPLE_SEPARATOR + value);

}

outTuple.append(WeatherConstants.RECORD_SEPARATOR);
outTuple.append(keyToSkip + WeatherConstants.TUPLE_SEPARATOR + uniqueVal.doubleValue());

outTuple.append(" >");

output.emit(outTuple.toString());

outTuple = null;
}

public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.dt.weather.counter;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.mutable.MutableDouble;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.UnifierHashMapSumKeys;
import com.dt.weather.constants.WeatherConstants;


@OperatorAnnotation(partitionable = true)
public class KeyValChangeAggregator<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> implements Operator
{

private volatile HashMap<K, MutableDouble> emitMap = new HashMap<K, MutableDouble>();

protected static class SumEntry
{
public MutableDouble sum;
public boolean changed = true;

SumEntry()
{
}

SumEntry(MutableDouble sum, boolean changed)
{
this.sum = sum;
this.changed = changed;
}

}

/**
* Sums key map.
*/
protected HashMap<K, SumEntry> sums = new HashMap<K, SumEntry>();

/**
* Cumulative sum flag.
*/
protected boolean cumulative = false;

/**
* Input port that takes key value pairs and adds the values for each key.
*/
public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
{
/**
* For each tuple (a key value pair) Adds the values for each key.
*/
@Override
public void process(KeyValPair<K, V> tuple)
{
K key = tuple.getKey();
if (!doprocessKey(key)) {
return;
}
SumEntry val = sums.get(key);
if (val == null) {
val = new SumEntry(new MutableDouble(tuple.getValue().doubleValue()), true);
emitMap.put(cloneKey(key), val.sum);

} else {
val.sum.add(tuple.getValue().doubleValue());
val.changed = true;
emitMap.put(key, val.sum);

}
sums.put(cloneKey(key), val);
}

/**
* Stream codec used for partitioning.
*/
@Override
public StreamCodec<KeyValPair<K, V>> getStreamCodec()
{
return getKeyValPairStreamCodec();
}

};

/*
* Output port to emit the aggregates if they have changed
* */

public final transient DefaultOutputPort<HashMap<K,MutableDouble>> alert = new DefaultOutputPort<HashMap<K,MutableDouble>>(){
@Override
public Unifier<HashMap<K, MutableDouble>> getUnifier()
{
UnifierHashMapSumKeys<K, MutableDouble> unifierHashMapSumKeys = new UnifierHashMapSumKeys<K, MutableDouble>();
unifierHashMapSumKeys.setType(MutableDouble.class);
return unifierHashMapSumKeys;
}
};

/**
* Output sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, V>> sum = new DefaultOutputPort<KeyValPair<K, V>>();

/**
* Output double sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, Double>> sumDouble = new DefaultOutputPort<KeyValPair<K, Double>>();

/**
* Output integer sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, Integer>> sumInteger = new DefaultOutputPort<KeyValPair<K, Integer>>();

/**
* Output long sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, Long>> sumLong = new DefaultOutputPort<KeyValPair<K, Long>>();

/**
* Output short sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, Short>> sumShort = new DefaultOutputPort<KeyValPair<K, Short>>();

/**
* Output float sum port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<KeyValPair<K, Float>> sumFloat = new DefaultOutputPort<KeyValPair<K, Float>>();

/**
* Get cumulative flag.
*
* @return cumulative flag.
*/
public boolean isCumulative()
{
return cumulative;
}

/**
*
* @param cumulative
*/
public void setCumulative(boolean cumulative)
{
this.cumulative = cumulative;
}

/**
* Emits on all ports that are connected. Data is pre-computed during process
* on input port and endWindow just emits it for each key. Clears the internal
* data.
*/
@Override
public void endWindow()
{

for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
K key = e.getKey();
SumEntry val = e.getValue();
if (val.changed) {
sum.emit(new KeyValPair<K, V>(key, getValue(val.sum.doubleValue())));
sumDouble.emit(new KeyValPair<K, Double>(key, val.sum.doubleValue()));
sumInteger.emit(new KeyValPair<K, Integer>(key, val.sum.intValue()));
sumFloat.emit(new KeyValPair<K, Float>(key, val.sum.floatValue()));
sumShort.emit(new KeyValPair<K, Short>(key, val.sum.shortValue()));
sumLong.emit(new KeyValPair<K, Long>(key, val.sum.longValue()));
}
}
emitChangedAggregates();
clearCache();
}

/*
* Emitts the changed aggregates
* */

public void emitChangedAggregates()
{
emitMap.put((K)"uniques", new MutableDouble(sums.size()));

alert.emit(emitMap);

emitMap.clear();
}

/**
* Clears the cache making this operator stateless on window boundary
*/
public void clearCache()
{
if (cumulative) {
for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
SumEntry val = e.getValue();
val.changed = false;
}
} else {
sums.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,11 @@ public void process(KeyValPair<K, V> tuple)
val = new MutableInt(tval);
basemap.put(cloneKey(key), val);
emitMap.put(cloneKey(key), val);

// alert.emit(new KeyValPair<K, Integer>(key, tval));
return;
} else if (tval >= getAbsoluteThreshold()) {
val.setValue(val.intValue() + tval);
basemap.put(cloneKey(key), val);
emitMap.put(key, val);

//alert.emit(new KeyValPair<K, Integer>(key, val.intValue()));
return;
}

Expand Down
Loading