diff --git a/WeatherStreamingApp/src/main/java/com/dt/weather/app/WeatherApp.java b/WeatherStreamingApp/src/main/java/com/dt/weather/app/WeatherApp.java index 974747d..6523106 100644 --- a/WeatherStreamingApp/src/main/java/com/dt/weather/app/WeatherApp.java +++ b/WeatherStreamingApp/src/main/java/com/dt/weather/app/WeatherApp.java @@ -19,8 +19,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator; -import com.datatorrent.lib.math.SumKeyVal; +import com.datatorrent.stram.engine.PortContext; import com.dt.weather.constants.WeatherConstants; +import com.dt.weather.converter.OutputConverter; +import com.dt.weather.counter.KeyValChangeAggregator; import com.dt.weather.counter.KeyValChangeAlert; import com.dt.weather.event.convertor.SinglePortWeatherEventConvertor; import com.dt.weather.input.SimpleFileReader; @@ -40,9 +42,9 @@ public void populateDAG(DAG dag, Configuration conf) fileReader.setDirectory(conf.get(WeatherConstants.INPUT_DIRECTORY_PATH, "/Users/dev/workspace/mydtapp/src/test/resources/data/")); - + //Uncomment the rename logic in simple file reader when the regex works - // fileReader.getScanner().setFilePatternRegexp("\\*.json"); + // fileReader.getScanner().setFilePatternRegexp("\\*.json"); fileReader.setScanIntervalMillis(0); fileReader.setEmitBatchSize(1); @@ -51,22 +53,16 @@ public void populateDAG(DAG dag, Configuration conf) SinglePortWeatherEventConvertor eventConvertor = dag.addOperator("WeatherEventConv", new SinglePortWeatherEventConvertor()); - - // DefaultConverter defaultConv = dag.addOperator("DefaultConverter", new DefaultConverter()); - - KeyValChangeAlert changeNotifier = dag.addOperator("ChangeNotifier", new KeyValChangeAlert()); - - changeNotifier.setAbsoluteThreshold(1); - - //TODO- add the partitioner code snippet here - //TODO - change the locality of the converter with the uniqCounter //Add the overall counter - SumKeyVal counter = dag.addOperator("GlobalCounter", new SumKeyVal()); + KeyValChangeAggregator counter = dag.addOperator("GlobalCounter", + new KeyValChangeAggregator()); counter.setType(Integer.class); counter.setCumulative(true); - - dag.setAttribute(counter, Context.OperatorContext.PARTITIONER, new StatelessPartitioner>(3)); + dag.setAttribute(counter, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner>(3)); + + OutputConverter opConv = dag.addOperator("Converter", new OutputConverter()); //Kafka output's KafkaSinglePortOutputOperator kafkaOutputOperator = dag.addOperator("KafkaOutputUniques", @@ -75,21 +71,15 @@ public void populateDAG(DAG dag, Configuration conf) kafkaOutputOperator.setTopic(conf.get(WeatherConstants.TOPIC, "counter")); - System.out.println("heremoe"); - /*Assemble the DAG*/ dag.addStream("InputRecords", fileReader.output, eventConvertor.data).setLocality(Locality.THREAD_LOCAL); dag.addStream("Global Counter", eventConvertor.output, counter.data); - dag.addStream("Change Notifier", counter.sum, changeNotifier.data); - - dag.addStream("Convert Output", changeNotifier.alert, kafkaOutputOperator.inputPort).setLocality( - Locality.CONTAINER_LOCAL); + dag.addStream("Unifier Output", counter.alert, opConv.data); -// dag.addStream("KafkaGLobalCountsWriter", defaultConv.output, kafkaOutputOperator.inputPort).setLocality( -// Locality.CONTAINER_LOCAL); + dag.addStream("Convert Output", opConv.output, kafkaOutputOperator.inputPort); } diff --git a/WeatherStreamingApp/src/main/java/com/dt/weather/converter/OutputConverter.java b/WeatherStreamingApp/src/main/java/com/dt/weather/converter/OutputConverter.java new file mode 100644 index 0000000..ce25017 --- /dev/null +++ b/WeatherStreamingApp/src/main/java/com/dt/weather/converter/OutputConverter.java @@ -0,0 +1,64 @@ +package com.dt.weather.converter; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.commons.lang.mutable.MutableDouble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.dt.weather.constants.WeatherConstants; + +public class OutputConverter extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(OutputConverter.class); + + public final transient DefaultInputPort> data = new DefaultInputPort>() + { + @Override + public void process(HashMap tuple) + { + + emitChangedAggregates(tuple); + + } + }; + + public void emitChangedAggregates(HashMap tuple) + { + StringBuilder outTuple = new StringBuilder(); + + outTuple.append(" entry : tuple.entrySet()) { + String key = (String)entry.getKey(); + MutableDouble value = new MutableDouble(entry.getValue()); + + if (keyToSkip.equalsIgnoreCase(key)) { + uniqueVal.setValue(value); + continue; + } + + outTuple.append(WeatherConstants.RECORD_SEPARATOR); + outTuple.append(key + WeatherConstants.TUPLE_SEPARATOR + value); + + } + + outTuple.append(WeatherConstants.RECORD_SEPARATOR); + outTuple.append(keyToSkip + WeatherConstants.TUPLE_SEPARATOR + uniqueVal.doubleValue()); + + outTuple.append(" >"); + + output.emit(outTuple.toString()); + + outTuple = null; + } + + public final transient DefaultOutputPort output = new DefaultOutputPort(); +} diff --git a/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAggregator.java b/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAggregator.java new file mode 100644 index 0000000..7bc22e0 --- /dev/null +++ b/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAggregator.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.dt.weather.counter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.mutable.MutableDouble; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.BaseNumberKeyValueOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.UnifierHashMapSumKeys; +import com.dt.weather.constants.WeatherConstants; + + +@OperatorAnnotation(partitionable = true) +public class KeyValChangeAggregator extends BaseNumberKeyValueOperator implements Operator +{ + + private volatile HashMap emitMap = new HashMap(); + + protected static class SumEntry + { + public MutableDouble sum; + public boolean changed = true; + + SumEntry() + { + } + + SumEntry(MutableDouble sum, boolean changed) + { + this.sum = sum; + this.changed = changed; + } + + } + + /** + * Sums key map. + */ + protected HashMap sums = new HashMap(); + + /** + * Cumulative sum flag. + */ + protected boolean cumulative = false; + + /** + * Input port that takes key value pairs and adds the values for each key. + */ + public final transient DefaultInputPort> data = new DefaultInputPort>() + { + /** + * For each tuple (a key value pair) Adds the values for each key. + */ + @Override + public void process(KeyValPair tuple) + { + K key = tuple.getKey(); + if (!doprocessKey(key)) { + return; + } + SumEntry val = sums.get(key); + if (val == null) { + val = new SumEntry(new MutableDouble(tuple.getValue().doubleValue()), true); + emitMap.put(cloneKey(key), val.sum); + + } else { + val.sum.add(tuple.getValue().doubleValue()); + val.changed = true; + emitMap.put(key, val.sum); + + } + sums.put(cloneKey(key), val); + } + + /** + * Stream codec used for partitioning. + */ + @Override + public StreamCodec> getStreamCodec() + { + return getKeyValPairStreamCodec(); + } + + }; + + /* + * Output port to emit the aggregates if they have changed + * */ + + public final transient DefaultOutputPort> alert = new DefaultOutputPort>(){ + @Override + public Unifier> getUnifier() + { + UnifierHashMapSumKeys unifierHashMapSumKeys = new UnifierHashMapSumKeys(); + unifierHashMapSumKeys.setType(MutableDouble.class); + return unifierHashMapSumKeys; + } + }; + + /** + * Output sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sum = new DefaultOutputPort>(); + + /** + * Output double sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sumDouble = new DefaultOutputPort>(); + + /** + * Output integer sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sumInteger = new DefaultOutputPort>(); + + /** + * Output long sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sumLong = new DefaultOutputPort>(); + + /** + * Output short sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sumShort = new DefaultOutputPort>(); + + /** + * Output float sum port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> sumFloat = new DefaultOutputPort>(); + + /** + * Get cumulative flag. + * + * @return cumulative flag. + */ + public boolean isCumulative() + { + return cumulative; + } + + /** + * + * @param cumulative + */ + public void setCumulative(boolean cumulative) + { + this.cumulative = cumulative; + } + + /** + * Emits on all ports that are connected. Data is pre-computed during process + * on input port and endWindow just emits it for each key. Clears the internal + * data. + */ + @Override + public void endWindow() + { + + for (Map.Entry e : sums.entrySet()) { + K key = e.getKey(); + SumEntry val = e.getValue(); + if (val.changed) { + sum.emit(new KeyValPair(key, getValue(val.sum.doubleValue()))); + sumDouble.emit(new KeyValPair(key, val.sum.doubleValue())); + sumInteger.emit(new KeyValPair(key, val.sum.intValue())); + sumFloat.emit(new KeyValPair(key, val.sum.floatValue())); + sumShort.emit(new KeyValPair(key, val.sum.shortValue())); + sumLong.emit(new KeyValPair(key, val.sum.longValue())); + } + } + emitChangedAggregates(); + clearCache(); + } + + /* + * Emitts the changed aggregates + * */ + + public void emitChangedAggregates() + { + emitMap.put((K)"uniques", new MutableDouble(sums.size())); + + alert.emit(emitMap); + + emitMap.clear(); + } + + /** + * Clears the cache making this operator stateless on window boundary + */ + public void clearCache() + { + if (cumulative) { + for (Map.Entry e : sums.entrySet()) { + SumEntry val = e.getValue(); + val.changed = false; + } + } else { + sums.clear(); + } + } + +} diff --git a/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAlert.java b/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAlert.java index 0e8e5e8..4498c25 100644 --- a/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAlert.java +++ b/WeatherStreamingApp/src/main/java/com/dt/weather/counter/KeyValChangeAlert.java @@ -48,15 +48,11 @@ public void process(KeyValPair tuple) val = new MutableInt(tval); basemap.put(cloneKey(key), val); emitMap.put(cloneKey(key), val); - - // alert.emit(new KeyValPair(key, tval)); return; } else if (tval >= getAbsoluteThreshold()) { val.setValue(val.intValue() + tval); basemap.put(cloneKey(key), val); emitMap.put(key, val); - - //alert.emit(new KeyValPair(key, val.intValue())); return; } diff --git a/WeatherStreamingApp/src/test/resources/localmode.properties b/WeatherStreamingApp/src/test/resources/localmode.properties index 1145585..e3aab26 100644 --- a/WeatherStreamingApp/src/test/resources/localmode.properties +++ b/WeatherStreamingApp/src/test/resources/localmode.properties @@ -1,6 +1,6 @@ #Operator specific properties - input.dir.path=/Users/dev/checkout/personalGit/devel/DTStreaming/WeatherStreamingApp/src/test/resources/data + input.dir.path=/Users/dev/checkout/personalGit/devel2/DTStreaming/WeatherStreamingApp/src/test/resources/localmode.properties #input.dir.path=/Users/dev/Desktop/test dt.brokerSet=localhost:9092 metadataRefresh=30000