From 7020f0cd053a39a60bb8f58ca5f701baadfae78d Mon Sep 17 00:00:00 2001 From: Beria Date: Thu, 4 Aug 2016 17:45:36 +0530 Subject: [PATCH 01/15] Buffer append output results + fix extra incorrect results --- .../remote/AppendOutputBuffer.java | 31 ++++ .../remote/AppendOutputRunner.java | 127 +++++++++++++++ .../remote/RemoteInterpreterEventPoller.java | 3 +- .../remote/AppendOutputRunnerTest.java | 151 ++++++++++++++++++ .../paragraph/paragraph.controller.js | 9 +- 5 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java create mode 100644 zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java new file mode 100644 index 00000000000..6646a11f75b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -0,0 +1,31 @@ +package org.apache.zeppelin.interpreter.remote; + +/** + * This element stores the buffered + * append-data of paragraph's output. + */ +public class AppendOutputBuffer { + + private String noteId; + private String paragraphId; + private String data; + + public AppendOutputBuffer(String noteId, String paragraphId, String data) { + this.noteId = noteId; + this.paragraphId = paragraphId; + this.data = data; + } + + public String getNoteId() { + return noteId; + } + + public String getParagraphId() { + return paragraphId; + } + + public String getData() { + return data; + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java new file mode 100644 index 00000000000..ad2eea219a7 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -0,0 +1,127 @@ +package org.apache.zeppelin.interpreter.remote; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This thread sends paragraph's append-data + * periodically, rather than continously, with + * a period of BUFFER_TIME_MS. It handles append-data + * for all paragraphs across all notebooks. + */ +public class AppendOutputRunner implements Runnable { + + private static final Logger logger = + LoggerFactory.getLogger(AppendOutputRunner.class); + private static RemoteInterpreterProcessListener listener; + + private static final Long BUFFER_TIME_MS = new Long(100); + private static final Long SAFE_PROCESSING_TIME = new Long(10); + private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); + + private static Thread thread = null; + private static final BlockingQueue QUEUE = + new LinkedBlockingQueue(); + + @Override + public void run() { + + while (true) { + Map > noteMap = + new HashMap >(); + List list = new LinkedList(); + + /* "drainTo" method does not wait for any element + * to be present in the queue, and thus this loop would + * continuosly run (with period of BUFFER_TIME_MS). "take()" method + * waits for the queue to become non-empty and then removes + * one element from it. Rest elements from queue (if present) are + * removed using "drainTo" method. Thus we save on some un-necessary + * cpu-cycles. + */ + try { + list.add(QUEUE.take()); + } catch (InterruptedException e) { + logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); + } + Long processingStartTime = System.currentTimeMillis(); + QUEUE.drainTo(list); + + for (AppendOutputBuffer buffer: list) { + String noteId = buffer.getNoteId(); + String paragraphId = buffer.getParagraphId(); + + Map paragraphMap = (noteMap.containsKey(noteId)) ? + noteMap.get(noteId) : new HashMap(); + StringBuilder builder = paragraphMap.containsKey(paragraphId) ? + paragraphMap.get(paragraphId) : new StringBuilder(); + + builder.append(buffer.getData()); + paragraphMap.put(paragraphId, builder); + noteMap.put(noteId, paragraphMap); + } + Long processingTime = System.currentTimeMillis() - processingStartTime; + + if (processingTime > SAFE_PROCESSING_TIME) { + logger.warn("Processing time for buffered append-output is high: " + + processingTime + " milliseconds."); + } else { + logger.debug("Processing time for append-output took " + + processingTime + " milliseconds"); + } + + Long sizeProcessed = new Long(0); + for (String noteId: noteMap.keySet()) { + for (String paragraphId: noteMap.get(noteId).keySet()) { + String data = noteMap.get(noteId).get(paragraphId).toString(); + sizeProcessed += data.length(); + listener.onOutputAppend(noteId, paragraphId, data); + } + } + + if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { + logger.warn("Processing size for buffered append-output is high: " + + sizeProcessed + " characters."); + } else { + logger.debug("Processing size for append-output is " + + sizeProcessed + " characters"); + } + try { + Thread.sleep(BUFFER_TIME_MS); + } catch (InterruptedException e) { + logger.error("Append output thread interrupted: " + e.getMessage()); + } + } + } + + /* Initialize the thread if non-existent, + * or killed due to errors. + */ + public static void startRunner(RemoteInterpreterProcessListener listener) { + AppendOutputRunner.listener = listener; + if (thread == null || !thread.isAlive()) { + logger.info("Starting a AppendOutputRunner thread to buffer" + + " and send paragraph append data"); + thread = new Thread(new AppendOutputRunner()); + thread.start(); + } + } + + public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) { + QUEUE.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend)); + } + + /* The function is only used by + * unit-tests. + */ + public static void stopRunnerForUnitTests() { + thread.interrupt(); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 48c14d50bde..2f87f747123 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -83,6 +83,7 @@ public void run() { } continue; } + AppendOutputRunner.startRunner(listener); try { client = interpreterProcess.getClient(); @@ -157,7 +158,7 @@ public void run() { String appId = outputAppend.get("appId"); if (appId == null) { - listener.onOutputAppend(noteId, paragraphId, outputToAppend); + AppendOutputRunner.appendBuffer(noteId, paragraphId, outputToAppend); } else { appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java new file mode 100644 index 00000000000..ed89c8bad0d --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -0,0 +1,151 @@ +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Test; + +public class AppendOutputRunnerTest { + + private static final int NUM_EVENTS = 10000; + private static final int NUM_CLUBBED_EVENTS = 100; + + @Test + public void testSingleEvent() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner.startRunner(listener); + AppendOutputRunner.appendBuffer("note", "para", "data\n"); + Thread.sleep(2000); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend("note", "para", "data\n"); + AppendOutputRunner.stopRunnerForUnitTests(); + } + + @Test + public void testMultipleEventsOfSameParagraph() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String para1 = "para1"; + AppendOutputRunner.appendBuffer(note1, para1, "data1\n"); + AppendOutputRunner.appendBuffer(note1, para1, "data2\n"); + AppendOutputRunner.appendBuffer(note1, para1, "data3\n"); + AppendOutputRunner.startRunner(listener); + Thread.sleep(1000); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n"); + AppendOutputRunner.stopRunnerForUnitTests(); + } + + @Test + public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String note2 = "note2"; + String para1 = "para1"; + String para2 = "para2"; + AppendOutputRunner.appendBuffer(note1, para1, "data1\n"); + AppendOutputRunner.appendBuffer(note1, para2, "data2\n"); + AppendOutputRunner.appendBuffer(note2, para1, "data3\n"); + AppendOutputRunner.appendBuffer(note2, para2, "data4\n"); + AppendOutputRunner.startRunner(listener); + Thread.sleep(1000); + verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n"); + verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n"); + verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n"); + verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n"); + AppendOutputRunner.stopRunnerForUnitTests(); + } + + @Test + public void testClubbedData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + + AppendOutputRunner.startRunner(listener); + Thread thread = new Thread(new BombardEvents()); + thread.start(); + thread.join(); + Thread.sleep(1000); + /* NUM_CLUBBED_EVENTS is a heuristic number. + * It has been observed that for 10,000 continuos event + * calls, 30-40 Web-socket calls are made. Keeping + * the unit-test to a pessimistic 100 web-socket calls. + */ + verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class)); + AppendOutputRunner.stopRunnerForUnitTests(); + } + + @Test + public void testWarnLoggerForLargeData() throws InterruptedException { + String data = "data\n"; + int numEvents = 100000; + for (int i=0; i log = appender.getLog(); + LoggingEvent sizeWarnLogEntry = null; + for (LoggingEvent logEntry: log) { + if (Level.WARN.equals(logEntry.getLevel())) { + sizeWarnLogEntry = logEntry; + } + } + String loggerString = "Processing size for buffered append-output is high: " + + (data.length() * numEvents) + " characters."; + assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); + } + + private class BombardEvents implements Runnable { + + @Override + public void run() { + String noteId = "noteId"; + String paraId = "paraId"; + for (int i=0; i log = new ArrayList(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List getLog() { + return new ArrayList(log); + } + } +} \ No newline at end of file diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 1d672e75cbb..8ff712b85ed 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -2271,7 +2271,14 @@ angular.module('zeppelinWebApp').controller('ParagraphCtrl', function($scope, $r }); $scope.$on('appendParagraphOutput', function(event, data) { - if ($scope.paragraph.id === data.paragraphId) { + /* It has been observed that append events + * can be errorneously called even if paragraph + * execution has ended, and in that case, no append + * should be made. Also, it has been observed that this + * could be called in PENDING state as well + */ + if ($scope.paragraph.id === data.paragraphId && + ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) { if ($scope.flushStreamingOutput) { $scope.clearTextOutput(); $scope.flushStreamingOutput = false; From 82e9c4a357755d2b06e2d509dc5a2b3cf199ae0b Mon Sep 17 00:00:00 2001 From: Beria Date: Fri, 5 Aug 2016 10:06:25 +0530 Subject: [PATCH 02/15] Fix comment --- .../src/app/notebook/paragraph/paragraph.controller.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 8ff712b85ed..7398deb29a1 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -2274,8 +2274,10 @@ angular.module('zeppelinWebApp').controller('ParagraphCtrl', function($scope, $r /* It has been observed that append events * can be errorneously called even if paragraph * execution has ended, and in that case, no append - * should be made. Also, it has been observed that this - * could be called in PENDING state as well + * should be made. Also, it was observed that between PENDING + * and RUNNING states, append-events can be called and we can't + * miss those, else during the length of paragraph run, few + * initial output line/s will be missing. */ if ($scope.paragraph.id === data.paragraphId && ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) { From 5057bb3f897305f2a81ebb4e28a66b26e154d043 Mon Sep 17 00:00:00 2001 From: Beria Date: Sat, 6 Aug 2016 20:59:29 +0530 Subject: [PATCH 03/15] Incorporate feedback 1. Synchronize on AppendOutputRunner creation 2. Use ScheduledExecutorService instead of while loop 3. Remove Thread.sleep() from tests --- .../remote/AppendOutputRunner.java | 23 ++----- .../remote/CheckAppendOutputRunner.java | 57 +++++++++++++++++ .../remote/RemoteInterpreterEventPoller.java | 3 +- .../remote/AppendOutputRunnerTest.java | 62 ++++++++++++++----- 4 files changed, 109 insertions(+), 36 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index ad2eea219a7..75967863e01 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -26,7 +26,6 @@ public class AppendOutputRunner implements Runnable { private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); - private static Thread thread = null; private static final BlockingQueue QUEUE = new LinkedBlockingQueue(); @@ -50,6 +49,7 @@ public void run() { list.add(QUEUE.take()); } catch (InterruptedException e) { logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); + break; } Long processingStartTime = System.currentTimeMillis(); QUEUE.drainTo(list); @@ -97,31 +97,16 @@ public void run() { Thread.sleep(BUFFER_TIME_MS); } catch (InterruptedException e) { logger.error("Append output thread interrupted: " + e.getMessage()); + break; } } } - /* Initialize the thread if non-existent, - * or killed due to errors. - */ - public static void startRunner(RemoteInterpreterProcessListener listener) { - AppendOutputRunner.listener = listener; - if (thread == null || !thread.isAlive()) { - logger.info("Starting a AppendOutputRunner thread to buffer" - + " and send paragraph append data"); - thread = new Thread(new AppendOutputRunner()); - thread.start(); - } - } - public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) { QUEUE.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend)); } - /* The function is only used by - * unit-tests. - */ - public static void stopRunnerForUnitTests() { - thread.interrupt(); + public static void setListener(RemoteInterpreterProcessListener listener) { + AppendOutputRunner.listener = listener; } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java new file mode 100644 index 00000000000..13938517e0e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -0,0 +1,57 @@ +package org.apache.zeppelin.interpreter.remote; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* This class is responsible for initializing + * and ensuring that AppendOutputRunner is up + * and running. + */ +public class CheckAppendOutputRunner implements Runnable { + + private static final Logger logger = + LoggerFactory.getLogger(CheckAppendOutputRunner.class); + private static Thread thread = null; + private static final Boolean SYNCHRONIZER = false; + private static ScheduledExecutorService SCHEDULED_SERVICE = null; + + /* Can only be initialized locally.*/ + private CheckAppendOutputRunner() + {} + + @Override + public void run() { + synchronized (SYNCHRONIZER) { + if (thread == null || !thread.isAlive()) { + logger.info("Starting a AppendOutputRunner thread to buffer" + + " and send paragraph append data."); + thread = new Thread(new AppendOutputRunner()); + thread.start(); + } + } + } + + public static void startScheduler() { + synchronized (SYNCHRONIZER) { + if (SCHEDULED_SERVICE == null) { + SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(); + SCHEDULED_SERVICE.scheduleWithFixedDelay( + new CheckAppendOutputRunner(), 0, 1, TimeUnit.SECONDS); + } + } + } + + /* These functions are only used by unit-tests. */ + public static void stopRunnerForUnitTests() { + thread.interrupt(); + } + + public static void startRunnerForUnitTests() { + thread = new Thread(new AppendOutputRunner()); + thread.start(); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 2f87f747123..8e555531506 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -72,6 +72,8 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { @Override public void run() { Client client = null; + AppendOutputRunner.setListener(listener); + CheckAppendOutputRunner.startScheduler(); while (!shutdown) { // wait and retry @@ -83,7 +85,6 @@ public void run() { } continue; } - AppendOutputRunner.startRunner(listener); try { client = interpreterProcess.getClient(); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index ed89c8bad0d..dc85d0a6eb9 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -3,6 +3,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -15,21 +16,28 @@ import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class AppendOutputRunnerTest { private static final int NUM_EVENTS = 10000; private static final int NUM_CLUBBED_EVENTS = 100; + /* It is being accessed by multiple threads. + * While loop for 'loopForBufferCompletion' could + * run for-ever. + */ + private volatile static int numInvocations = 0; @Test public void testSingleEvent() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner.startRunner(listener); AppendOutputRunner.appendBuffer("note", "para", "data\n"); - Thread.sleep(2000); + + loopForCompletingEvents(listener, 1); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend("note", "para", "data\n"); - AppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopRunnerForUnitTests(); } @Test @@ -40,11 +48,11 @@ public void testMultipleEventsOfSameParagraph() throws InterruptedException { AppendOutputRunner.appendBuffer(note1, para1, "data1\n"); AppendOutputRunner.appendBuffer(note1, para1, "data2\n"); AppendOutputRunner.appendBuffer(note1, para1, "data3\n"); - AppendOutputRunner.startRunner(listener); - Thread.sleep(1000); + + loopForCompletingEvents(listener, 1); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n"); - AppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopRunnerForUnitTests(); } @Test @@ -58,32 +66,33 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio AppendOutputRunner.appendBuffer(note1, para2, "data2\n"); AppendOutputRunner.appendBuffer(note2, para1, "data3\n"); AppendOutputRunner.appendBuffer(note2, para2, "data4\n"); - AppendOutputRunner.startRunner(listener); - Thread.sleep(1000); + loopForCompletingEvents(listener, 4); + verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n"); verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n"); verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n"); verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n"); - AppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopRunnerForUnitTests(); } @Test public void testClubbedData() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - - AppendOutputRunner.startRunner(listener); + AppendOutputRunner.setListener(listener); + CheckAppendOutputRunner.startRunnerForUnitTests(); Thread thread = new Thread(new BombardEvents()); thread.start(); thread.join(); Thread.sleep(1000); + /* NUM_CLUBBED_EVENTS is a heuristic number. * It has been observed that for 10,000 continuos event * calls, 30-40 Web-socket calls are made. Keeping * the unit-test to a pessimistic 100 web-socket calls. */ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class)); - AppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopRunnerForUnitTests(); } @Test @@ -100,11 +109,13 @@ public void testWarnLoggerForLargeData() throws InterruptedException { Logger.getLogger(RemoteInterpreterEventPoller.class); RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner.startRunner(listener); - Thread.sleep(1000); - - List log = appender.getLog(); + loopForCompletingEvents(listener, 1); + List log; + do { + log = appender.getLog(); + } while(log.size() != 2); LoggingEvent sizeWarnLogEntry = null; + for (LoggingEvent logEntry: log) { if (Level.WARN.equals(logEntry.getLevel())) { sizeWarnLogEntry = logEntry; @@ -113,6 +124,7 @@ public void testWarnLoggerForLargeData() throws InterruptedException { String loggerString = "Processing size for buffered append-output is high: " + (data.length() * numEvents) + " characters."; assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); + CheckAppendOutputRunner.stopRunnerForUnitTests(); } private class BombardEvents implements Runnable { @@ -148,4 +160,22 @@ public List getLog() { return new ArrayList(log); } } + + private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + numInvocations += 1; + return null; + } + }).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class)); + } + + private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, int numTimes) { + numInvocations = 0; + prepareInvocationCounts(listener); + AppendOutputRunner.setListener(listener); + CheckAppendOutputRunner.startRunnerForUnitTests(); + while(numInvocations != numTimes); + } } \ No newline at end of file From 27790e4a3fd7c825d85c9b93b9b2c7bc0396df73 Mon Sep 17 00:00:00 2001 From: Beria Date: Mon, 8 Aug 2016 16:14:36 +0530 Subject: [PATCH 04/15] Avoid infinite loop in tests --- .../interpreter/remote/AppendOutputRunnerTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index dc85d0a6eb9..152d35535ee 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -1,6 +1,7 @@ package org.apache.zeppelin.interpreter.remote; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; @@ -176,6 +177,11 @@ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, prepareInvocationCounts(listener); AppendOutputRunner.setListener(listener); CheckAppendOutputRunner.startRunnerForUnitTests(); - while(numInvocations != numTimes); + long startTimeMs = System.currentTimeMillis(); + while(numInvocations != numTimes) { + if (System.currentTimeMillis() - startTimeMs > 2000) { + fail("Buffered events were not sent for 2 seconds"); + } + } } } \ No newline at end of file From 1bdd6693a212ba88ba90efa96f9744493685fcf1 Mon Sep 17 00:00:00 2001 From: Beria Date: Mon, 8 Aug 2016 18:10:22 +0530 Subject: [PATCH 05/15] fix javadoc comment --- .../zeppelin/interpreter/remote/CheckAppendOutputRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index 13938517e0e..cf805f30315 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -7,7 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/* This class is responsible for initializing +/** This class is responsible for initializing * and ensuring that AppendOutputRunner is up * and running. */ From 1c893c038902097af370afb7ac3c474193d03cc1 Mon Sep 17 00:00:00 2001 From: Beria Date: Mon, 8 Aug 2016 20:59:20 +0530 Subject: [PATCH 06/15] Add licensing --- .../interpreter/remote/AppendOutputBuffer.java | 17 +++++++++++++++++ .../interpreter/remote/AppendOutputRunner.java | 17 +++++++++++++++++ .../remote/CheckAppendOutputRunner.java | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java index 6646a11f75b..e1484dabaaa 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.interpreter.remote; /** diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index 75967863e01..7665abc186a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.interpreter.remote; import java.util.HashMap; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index cf805f30315..b0231aa030f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.interpreter.remote; import java.util.concurrent.Executors; From 3984ef85c52032dde1fe91209f8d84097867b11f Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 9 Aug 2016 12:34:16 +0530 Subject: [PATCH 07/15] fix tests when ran with other tests --- .../remote/AppendOutputRunner.java | 6 +++++ .../remote/CheckAppendOutputRunner.java | 25 ++++++++++++++++--- .../remote/AppendOutputRunnerTest.java | 11 ++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index 7665abc186a..c969be782f2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -126,4 +126,10 @@ public static void appendBuffer(String noteId, String paragraphId, String output public static void setListener(RemoteInterpreterProcessListener listener) { AppendOutputRunner.listener = listener; } + + /* This function is only used by unit-tests*/ + public static void emptyQueueForUnitTests() { + List list = new LinkedList(); + QUEUE.drainTo(list); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index b0231aa030f..c982d748b12 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -35,6 +36,7 @@ public class CheckAppendOutputRunner implements Runnable { private static Thread thread = null; private static final Boolean SYNCHRONIZER = false; private static ScheduledExecutorService SCHEDULED_SERVICE = null; + private static ScheduledFuture futureObject = null; /* Can only be initialized locally.*/ private CheckAppendOutputRunner() @@ -56,7 +58,7 @@ public static void startScheduler() { synchronized (SYNCHRONIZER) { if (SCHEDULED_SERVICE == null) { SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(); - SCHEDULED_SERVICE.scheduleWithFixedDelay( + futureObject = SCHEDULED_SERVICE.scheduleWithFixedDelay( new CheckAppendOutputRunner(), 0, 1, TimeUnit.SECONDS); } } @@ -64,11 +66,26 @@ public static void startScheduler() { /* These functions are only used by unit-tests. */ public static void stopRunnerForUnitTests() { - thread.interrupt(); + synchronized (SYNCHRONIZER) { + thread.interrupt(); + } } public static void startRunnerForUnitTests() { - thread = new Thread(new AppendOutputRunner()); - thread.start(); + synchronized (SYNCHRONIZER) { + thread = new Thread(new AppendOutputRunner()); + thread.start(); + } + } + + public static void stopSchedulerAndRunnerForUnitTests() { + synchronized (SYNCHRONIZER) { + if (futureObject != null) { + futureObject.cancel(false); + } + if (thread != null && thread.isAlive()) { + thread.interrupt(); + } + } } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index 152d35535ee..e35f8cd0cbb 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -16,6 +16,7 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -30,6 +31,16 @@ public class AppendOutputRunnerTest { */ private volatile static int numInvocations = 0; + @BeforeClass + public static void beforeClass() { + CheckAppendOutputRunner.stopSchedulerAndRunnerForUnitTests(); + AppendOutputRunner.emptyQueueForUnitTests(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) + {} + } + @Test public void testSingleEvent() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); From dd24816e8a90703797ac0df68a7b6dae821ee98c Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 9 Aug 2016 14:17:05 +0530 Subject: [PATCH 08/15] Add license in test file --- .../remote/AppendOutputRunnerTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index e35f8cd0cbb..339f6e0b98a 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.interpreter.remote; import static org.junit.Assert.assertTrue; From 599281ffc90e01e7f2efe1a41e5d8dd865f0033f Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 9 Aug 2016 15:11:00 +0530 Subject: [PATCH 09/15] fix unit tests that run after --- .../zeppelin/interpreter/remote/CheckAppendOutputRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index c982d748b12..e3cf8747552 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -86,6 +86,7 @@ public static void stopSchedulerAndRunnerForUnitTests() { if (thread != null && thread.isAlive()) { thread.interrupt(); } + SCHEDULED_SERVICE = null; } } } From 72c316d749a6baf5bec701d3caeb2d5c071e7de9 Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 16 Aug 2016 12:55:18 +0530 Subject: [PATCH 10/15] Scheduler service to replace while loop in AppendOutputRunner --- .../remote/AppendOutputRunner.java | 115 ++++++++---------- .../remote/CheckAppendOutputRunner.java | 47 ++----- .../remote/AppendOutputRunnerTest.java | 38 +++--- 3 files changed, 81 insertions(+), 119 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index c969be782f2..06de99ff156 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -39,7 +39,7 @@ public class AppendOutputRunner implements Runnable { LoggerFactory.getLogger(AppendOutputRunner.class); private static RemoteInterpreterProcessListener listener; - private static final Long BUFFER_TIME_MS = new Long(100); + public static final Long BUFFER_TIME_MS = new Long(100); private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); @@ -49,74 +49,65 @@ public class AppendOutputRunner implements Runnable { @Override public void run() { - while (true) { - Map > noteMap = - new HashMap >(); - List list = new LinkedList(); - - /* "drainTo" method does not wait for any element - * to be present in the queue, and thus this loop would - * continuosly run (with period of BUFFER_TIME_MS). "take()" method - * waits for the queue to become non-empty and then removes - * one element from it. Rest elements from queue (if present) are - * removed using "drainTo" method. Thus we save on some un-necessary - * cpu-cycles. - */ - try { - list.add(QUEUE.take()); - } catch (InterruptedException e) { - logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); - break; - } - Long processingStartTime = System.currentTimeMillis(); - QUEUE.drainTo(list); + Map > noteMap = + new HashMap >(); + List list = new LinkedList(); - for (AppendOutputBuffer buffer: list) { - String noteId = buffer.getNoteId(); - String paragraphId = buffer.getParagraphId(); + /* "drainTo" method does not wait for any element + * to be present in the queue, and thus this loop would + * continuosly run (with period of BUFFER_TIME_MS). "take()" method + * waits for the queue to become non-empty and then removes + * one element from it. Rest elements from queue (if present) are + * removed using "drainTo" method. Thus we save on some un-necessary + * cpu-cycles. + */ + try { + list.add(QUEUE.take()); + } catch (InterruptedException e) { + logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); + } + Long processingStartTime = System.currentTimeMillis(); + QUEUE.drainTo(list); - Map paragraphMap = (noteMap.containsKey(noteId)) ? - noteMap.get(noteId) : new HashMap(); - StringBuilder builder = paragraphMap.containsKey(paragraphId) ? - paragraphMap.get(paragraphId) : new StringBuilder(); + for (AppendOutputBuffer buffer: list) { + String noteId = buffer.getNoteId(); + String paragraphId = buffer.getParagraphId(); - builder.append(buffer.getData()); - paragraphMap.put(paragraphId, builder); - noteMap.put(noteId, paragraphMap); - } - Long processingTime = System.currentTimeMillis() - processingStartTime; - - if (processingTime > SAFE_PROCESSING_TIME) { - logger.warn("Processing time for buffered append-output is high: " + - processingTime + " milliseconds."); - } else { - logger.debug("Processing time for append-output took " - + processingTime + " milliseconds"); - } + Map paragraphMap = (noteMap.containsKey(noteId)) ? + noteMap.get(noteId) : new HashMap(); + StringBuilder builder = paragraphMap.containsKey(paragraphId) ? + paragraphMap.get(paragraphId) : new StringBuilder(); - Long sizeProcessed = new Long(0); - for (String noteId: noteMap.keySet()) { - for (String paragraphId: noteMap.get(noteId).keySet()) { - String data = noteMap.get(noteId).get(paragraphId).toString(); - sizeProcessed += data.length(); - listener.onOutputAppend(noteId, paragraphId, data); - } - } + builder.append(buffer.getData()); + paragraphMap.put(paragraphId, builder); + noteMap.put(noteId, paragraphMap); + } + Long processingTime = System.currentTimeMillis() - processingStartTime; + + if (processingTime > SAFE_PROCESSING_TIME) { + logger.warn("Processing time for buffered append-output is high: " + + processingTime + " milliseconds."); + } else { + logger.debug("Processing time for append-output took " + + processingTime + " milliseconds"); + } - if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { - logger.warn("Processing size for buffered append-output is high: " + - sizeProcessed + " characters."); - } else { - logger.debug("Processing size for append-output is " + - sizeProcessed + " characters"); - } - try { - Thread.sleep(BUFFER_TIME_MS); - } catch (InterruptedException e) { - logger.error("Append output thread interrupted: " + e.getMessage()); - break; + Long sizeProcessed = new Long(0); + for (String noteId: noteMap.keySet()) { + for (String paragraphId: noteMap.get(noteId).keySet()) { + String data = noteMap.get(noteId).get(paragraphId).toString(); + sizeProcessed += data.length(); + listener.onOutputAppend(noteId, paragraphId, data); } } + + if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { + logger.warn("Processing size for buffered append-output is high: " + + sizeProcessed + " characters."); + } else { + logger.debug("Processing size for append-output is " + + sizeProcessed + " characters"); + } } public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index e3cf8747552..9fbed906db5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -29,62 +29,33 @@ * and ensuring that AppendOutputRunner is up * and running. */ -public class CheckAppendOutputRunner implements Runnable { +public class CheckAppendOutputRunner { private static final Logger logger = LoggerFactory.getLogger(CheckAppendOutputRunner.class); - private static Thread thread = null; private static final Boolean SYNCHRONIZER = false; private static ScheduledExecutorService SCHEDULED_SERVICE = null; private static ScheduledFuture futureObject = null; - - /* Can only be initialized locally.*/ - private CheckAppendOutputRunner() - {} - - @Override - public void run() { - synchronized (SYNCHRONIZER) { - if (thread == null || !thread.isAlive()) { - logger.info("Starting a AppendOutputRunner thread to buffer" - + " and send paragraph append data."); - thread = new Thread(new AppendOutputRunner()); - thread.start(); - } - } - } + private static AppendOutputRunner runner = null; public static void startScheduler() { synchronized (SYNCHRONIZER) { if (SCHEDULED_SERVICE == null) { + runner = new AppendOutputRunner(); + logger.info("Starting a AppendOutputRunner thread to buffer" + + " and send paragraph append data."); SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(); futureObject = SCHEDULED_SERVICE.scheduleWithFixedDelay( - new CheckAppendOutputRunner(), 0, 1, TimeUnit.SECONDS); + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); } } } - /* These functions are only used by unit-tests. */ - public static void stopRunnerForUnitTests() { - synchronized (SYNCHRONIZER) { - thread.interrupt(); - } - } - - public static void startRunnerForUnitTests() { - synchronized (SYNCHRONIZER) { - thread = new Thread(new AppendOutputRunner()); - thread.start(); - } - } - - public static void stopSchedulerAndRunnerForUnitTests() { + /* This function is only used by unit-tests. */ + public static void stopSchedulerForUnitTests() { synchronized (SYNCHRONIZER) { if (futureObject != null) { - futureObject.cancel(false); - } - if (thread != null && thread.isAlive()) { - thread.interrupt(); + futureObject.cancel(true); } SCHEDULED_SERVICE = null; } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index 339f6e0b98a..de105e3ce91 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -50,12 +50,8 @@ public class AppendOutputRunnerTest { @BeforeClass public static void beforeClass() { - CheckAppendOutputRunner.stopSchedulerAndRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); AppendOutputRunner.emptyQueueForUnitTests(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) - {} } @Test @@ -66,7 +62,7 @@ public void testSingleEvent() throws InterruptedException { loopForCompletingEvents(listener, 1); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend("note", "para", "data\n"); - CheckAppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test @@ -81,7 +77,7 @@ public void testMultipleEventsOfSameParagraph() throws InterruptedException { loopForCompletingEvents(listener, 1); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n"); - CheckAppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test @@ -102,14 +98,14 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n"); verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n"); verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n"); - CheckAppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test public void testClubbedData() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); AppendOutputRunner.setListener(listener); - CheckAppendOutputRunner.startRunnerForUnitTests(); + CheckAppendOutputRunner.startScheduler(); Thread thread = new Thread(new BombardEvents()); thread.start(); thread.join(); @@ -121,7 +117,7 @@ public void testClubbedData() throws InterruptedException { * the unit-test to a pessimistic 100 web-socket calls. */ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class)); - CheckAppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test @@ -140,20 +136,24 @@ public void testWarnLoggerForLargeData() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); loopForCompletingEvents(listener, 1); List log; + + int warnLogCounter; + LoggingEvent sizeWarnLogEntry = null; do { + warnLogCounter = 0; log = appender.getLog(); - } while(log.size() != 2); - LoggingEvent sizeWarnLogEntry = null; - - for (LoggingEvent logEntry: log) { - if (Level.WARN.equals(logEntry.getLevel())) { - sizeWarnLogEntry = logEntry; + for (LoggingEvent logEntry: log) { + if (Level.WARN.equals(logEntry.getLevel())) { + sizeWarnLogEntry = logEntry; + warnLogCounter += 1; + } } - } + } while(warnLogCounter != 2); + String loggerString = "Processing size for buffered append-output is high: " + (data.length() * numEvents) + " characters."; assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); - CheckAppendOutputRunner.stopRunnerForUnitTests(); + CheckAppendOutputRunner.stopSchedulerForUnitTests(); } private class BombardEvents implements Runnable { @@ -204,7 +204,7 @@ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, numInvocations = 0; prepareInvocationCounts(listener); AppendOutputRunner.setListener(listener); - CheckAppendOutputRunner.startRunnerForUnitTests(); + CheckAppendOutputRunner.startScheduler(); long startTimeMs = System.currentTimeMillis(); while(numInvocations != numTimes) { if (System.currentTimeMillis() - startTimeMs > 2000) { From 2eae38eae8efef0df68fcd075db38b9fac8d6658 Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 16 Aug 2016 22:05:04 +0530 Subject: [PATCH 11/15] Make AppendOutputRunner non-static --- .../remote/AppendOutputRunner.java | 26 +++---- .../remote/CheckAppendOutputRunner.java | 37 +++------- .../remote/RemoteInterpreterEventPoller.java | 6 +- .../remote/AppendOutputRunnerTest.java | 70 ++++++++++--------- 4 files changed, 61 insertions(+), 78 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index 06de99ff156..e5098324c5b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -37,14 +37,17 @@ public class AppendOutputRunner implements Runnable { private static final Logger logger = LoggerFactory.getLogger(AppendOutputRunner.class); - private static RemoteInterpreterProcessListener listener; - public static final Long BUFFER_TIME_MS = new Long(100); private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); - private static final BlockingQueue QUEUE = + private final BlockingQueue queue = new LinkedBlockingQueue(); + private final RemoteInterpreterProcessListener listener; + + public AppendOutputRunner(RemoteInterpreterProcessListener listener) { + this.listener = listener; + } @Override public void run() { @@ -62,12 +65,12 @@ public void run() { * cpu-cycles. */ try { - list.add(QUEUE.take()); + list.add(queue.take()); } catch (InterruptedException e) { logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); } Long processingStartTime = System.currentTimeMillis(); - QUEUE.drainTo(list); + queue.drainTo(list); for (AppendOutputBuffer buffer: list) { String noteId = buffer.getNoteId(); @@ -110,17 +113,8 @@ public void run() { } } - public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) { - QUEUE.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend)); + public void appendBuffer(String noteId, String paragraphId, String outputToAppend) { + queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend)); } - public static void setListener(RemoteInterpreterProcessListener listener) { - AppendOutputRunner.listener = listener; - } - - /* This function is only used by unit-tests*/ - public static void emptyQueueForUnitTests() { - List list = new LinkedList(); - QUEUE.drainTo(list); - } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java index 9fbed906db5..306e0a35c35 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java @@ -25,39 +25,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** This class is responsible for initializing - * and ensuring that AppendOutputRunner is up - * and running. +/** This class periodically calls AppendOutputRunner + * to send append-output events. */ public class CheckAppendOutputRunner { private static final Logger logger = LoggerFactory.getLogger(CheckAppendOutputRunner.class); - private static final Boolean SYNCHRONIZER = false; - private static ScheduledExecutorService SCHEDULED_SERVICE = null; + private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); private static ScheduledFuture futureObject = null; - private static AppendOutputRunner runner = null; - public static void startScheduler() { - synchronized (SYNCHRONIZER) { - if (SCHEDULED_SERVICE == null) { - runner = new AppendOutputRunner(); - logger.info("Starting a AppendOutputRunner thread to buffer" - + " and send paragraph append data."); - SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(); - futureObject = SCHEDULED_SERVICE.scheduleWithFixedDelay( - runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); - } + public synchronized static void startScheduler( + RemoteInterpreterProcessListener listener, AppendOutputRunner runner) { + if (futureObject != null) { + futureObject.cancel(true); } - } - - /* This function is only used by unit-tests. */ - public static void stopSchedulerForUnitTests() { - synchronized (SYNCHRONIZER) { - if (futureObject != null) { - futureObject.cancel(true); - } - SCHEDULED_SERVICE = null; + logger.info("Starting a AppendOutputRunner thread to buffer" + + " and send paragraph append data."); + futureObject = service.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); } - } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 8e555531506..6374f71763b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -72,8 +72,8 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { @Override public void run() { Client client = null; - AppendOutputRunner.setListener(listener); - CheckAppendOutputRunner.startScheduler(); + AppendOutputRunner runner = new AppendOutputRunner(listener); + CheckAppendOutputRunner.startScheduler(listener, runner); while (!shutdown) { // wait and retry @@ -159,7 +159,7 @@ public void run() { String appId = outputAppend.get("appId"); if (appId == null) { - AppendOutputRunner.appendBuffer(noteId, paragraphId, outputToAppend); + runner.appendBuffer(noteId, paragraphId, outputToAppend); } else { appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index de105e3ce91..57e01bec278 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -33,7 +33,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -48,21 +47,14 @@ public class AppendOutputRunnerTest { */ private volatile static int numInvocations = 0; - @BeforeClass - public static void beforeClass() { - CheckAppendOutputRunner.stopSchedulerForUnitTests(); - AppendOutputRunner.emptyQueueForUnitTests(); - } - @Test public void testSingleEvent() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner.appendBuffer("note", "para", "data\n"); + String[][] buffer = {{"note", "para", "data\n"}}; - loopForCompletingEvents(listener, 1); + loopForCompletingEvents(listener, 1, buffer); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend("note", "para", "data\n"); - CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test @@ -70,14 +62,15 @@ public void testMultipleEventsOfSameParagraph() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); String note1 = "note1"; String para1 = "para1"; - AppendOutputRunner.appendBuffer(note1, para1, "data1\n"); - AppendOutputRunner.appendBuffer(note1, para1, "data2\n"); - AppendOutputRunner.appendBuffer(note1, para1, "data3\n"); + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para1, "data2\n"}, + {note1, para1, "data3\n"} + }; - loopForCompletingEvents(listener, 1); + loopForCompletingEvents(listener, 1, buffer); verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n"); - CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test @@ -87,26 +80,27 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio String note2 = "note2"; String para1 = "para1"; String para2 = "para2"; - AppendOutputRunner.appendBuffer(note1, para1, "data1\n"); - AppendOutputRunner.appendBuffer(note1, para2, "data2\n"); - AppendOutputRunner.appendBuffer(note2, para1, "data3\n"); - AppendOutputRunner.appendBuffer(note2, para2, "data4\n"); - loopForCompletingEvents(listener, 4); + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para2, "data2\n"}, + {note2, para1, "data3\n"}, + {note2, para2, "data4\n"} + }; + loopForCompletingEvents(listener, 4, buffer); verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class)); verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n"); verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n"); verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n"); verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n"); - CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test public void testClubbedData() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner.setListener(listener); - CheckAppendOutputRunner.startScheduler(); - Thread thread = new Thread(new BombardEvents()); + AppendOutputRunner runner = new AppendOutputRunner(listener); + CheckAppendOutputRunner.startScheduler(listener, runner); + Thread thread = new Thread(new BombardEvents(runner)); thread.start(); thread.join(); Thread.sleep(1000); @@ -117,15 +111,17 @@ public void testClubbedData() throws InterruptedException { * the unit-test to a pessimistic 100 web-socket calls. */ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class)); - CheckAppendOutputRunner.stopSchedulerForUnitTests(); } @Test public void testWarnLoggerForLargeData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); String data = "data\n"; int numEvents = 100000; + for (int i=0; i log; int warnLogCounter; @@ -153,17 +148,22 @@ public void testWarnLoggerForLargeData() throws InterruptedException { String loggerString = "Processing size for buffered append-output is high: " + (data.length() * numEvents) + " characters."; assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); - CheckAppendOutputRunner.stopSchedulerForUnitTests(); } private class BombardEvents implements Runnable { + private final AppendOutputRunner runner; + + private BombardEvents(AppendOutputRunner runner) { + this.runner = runner; + } + @Override public void run() { String noteId = "noteId"; String paraId = "paraId"; for (int i=0; i 2000) { From d1686149e0acd6c2e78e9443c85ce66726700d14 Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 16 Aug 2016 22:40:30 +0530 Subject: [PATCH 12/15] Remove un-necessary class CheckAppendOutputRunner --- .../remote/CheckAppendOutputRunner.java | 48 ------------------- .../remote/RemoteInterpreterEventPoller.java | 11 ++++- .../remote/AppendOutputRunnerTest.java | 20 +++++++- 3 files changed, 28 insertions(+), 51 deletions(-) delete mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java deleted file mode 100644 index 306e0a35c35..00000000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** This class periodically calls AppendOutputRunner - * to send append-output events. - */ -public class CheckAppendOutputRunner { - - private static final Logger logger = - LoggerFactory.getLogger(CheckAppendOutputRunner.class); - private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - private static ScheduledFuture futureObject = null; - - public synchronized static void startScheduler( - RemoteInterpreterProcessListener listener, AppendOutputRunner runner) { - if (futureObject != null) { - futureObject.cancel(true); - } - logger.info("Starting a AppendOutputRunner thread to buffer" - + " and send paragraph append data."); - futureObject = service.scheduleWithFixedDelay( - runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); - } -} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 6374f71763b..7ecd0b5833d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -39,12 +39,17 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Processes message from RemoteInterpreter process */ public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private static final ScheduledExecutorService appendService = Executors.newSingleThreadScheduledExecutor(); private final RemoteInterpreterProcessListener listener; private final ApplicationEventListener appListener; @@ -73,7 +78,8 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { public void run() { Client client = null; AppendOutputRunner runner = new AppendOutputRunner(listener); - CheckAppendOutputRunner.startScheduler(listener, runner); + ScheduledFuture appendFuture = appendService.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); while (!shutdown) { // wait and retry @@ -194,6 +200,9 @@ public void run() { logger.error("Can't handle event " + event, e); } } + if (appendFuture != null) { + appendFuture.cancel(true); + } } private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index 57e01bec278..d7e0906d357 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -28,11 +28,16 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -41,12 +46,21 @@ public class AppendOutputRunnerTest { private static final int NUM_EVENTS = 10000; private static final int NUM_CLUBBED_EVENTS = 100; + private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + private static ScheduledFuture future = null; /* It is being accessed by multiple threads. * While loop for 'loopForBufferCompletion' could * run for-ever. */ private volatile static int numInvocations = 0; + @After + public void afterEach() { + if (future != null) { + future.cancel(true); + } + } + @Test public void testSingleEvent() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); @@ -99,7 +113,8 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio public void testClubbedData() throws InterruptedException { RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); AppendOutputRunner runner = new AppendOutputRunner(listener); - CheckAppendOutputRunner.startScheduler(listener, runner); + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); Thread thread = new Thread(new BombardEvents(runner)); thread.start(); thread.join(); @@ -208,7 +223,8 @@ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, for (String[] bufferElement: buffer) { runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]); } - CheckAppendOutputRunner.startScheduler(listener, runner); + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); long startTimeMs = System.currentTimeMillis(); while(numInvocations != numTimes) { if (System.currentTimeMillis() - startTimeMs > 2000) { From 4b68c86c32bbff00dc477ff5ffdb714e6d31ebe4 Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 16 Aug 2016 22:55:14 +0530 Subject: [PATCH 13/15] fix checkstyle --- .../interpreter/remote/RemoteInterpreterEventPoller.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 7ecd0b5833d..090aeeaaee3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -49,7 +49,8 @@ */ public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); - private static final ScheduledExecutorService appendService = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService appendService = + Executors.newSingleThreadScheduledExecutor(); private final RemoteInterpreterProcessListener listener; private final ApplicationEventListener appListener; From 78523687db24f6902d7459573a54e1e8e2ec7eae Mon Sep 17 00:00:00 2001 From: Beria Date: Sat, 20 Aug 2016 12:48:15 +0530 Subject: [PATCH 14/15] nit --- .../apache/zeppelin/interpreter/remote/AppendOutputRunner.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index e5098324c5b..e4bcc60eec2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -52,8 +52,7 @@ public AppendOutputRunner(RemoteInterpreterProcessListener listener) { @Override public void run() { - Map > noteMap = - new HashMap >(); + Map > noteMap = new HashMap<>(); List list = new LinkedList(); /* "drainTo" method does not wait for any element From 17f05249e6889e0faa3d7cbd7a4da5140382a8c1 Mon Sep 17 00:00:00 2001 From: Beria Date: Tue, 23 Aug 2016 13:19:44 +0530 Subject: [PATCH 15/15] Use diamond operator --- .../zeppelin/interpreter/remote/AppendOutputRunner.java | 5 ++--- .../zeppelin/interpreter/remote/AppendOutputRunnerTest.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index e4bcc60eec2..86ea11a8bda 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -41,8 +41,7 @@ public class AppendOutputRunner implements Runnable { private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); - private final BlockingQueue queue = - new LinkedBlockingQueue(); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final RemoteInterpreterProcessListener listener; public AppendOutputRunner(RemoteInterpreterProcessListener listener) { @@ -53,7 +52,7 @@ public AppendOutputRunner(RemoteInterpreterProcessListener listener) { public void run() { Map > noteMap = new HashMap<>(); - List list = new LinkedList(); + List list = new LinkedList<>(); /* "drainTo" method does not wait for any element * to be present in the queue, and thus this loop would diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java index d7e0906d357..8e9f5b361e6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -184,7 +184,7 @@ public void run() { } private class TestAppender extends AppenderSkeleton { - private final List log = new ArrayList(); + private final List log = new ArrayList<>(); @Override public boolean requiresLayout() { @@ -201,7 +201,7 @@ public void close() { } public List getLog() { - return new ArrayList(log); + return new ArrayList<>(log); } }