diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index bccb5b46915..d4d3c3a1bda 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -62,6 +62,7 @@
+
+There are the following items which you can input or set:
+
+### Preset
+
+You can set a cron schedule easily by clicking each option such as `1m` and `5m`. The login user is set as a cron executing user automatically. You can also clear the cron schedule settings by clicking `None`.
+
+### Cron expression
+
+You can set the cron schedule by filling in this form. Please see [Cron Trigger Tutorial](http://www.quartz-scheduler.org/documentation/quartz-2.2.x/tutorials/crontrigger) for the available cron syntax.
+
+### Cron executing user
+
+You can set the cron executing user by filling in this form and press the enter key.
+
+### auto-restart interpreter on cron execution
+
+When this checkbox is set to "on", the interpreters which are binded to the notebook are stopped automatically after the cron execution. This feature is useful if you want to release the interpreter resources after the cron execution.
+
+> **Note**: A cron execution is skipped if one of the paragraphs is in a state of `RUNNING` or `PENDING` no matter whether it is executed automatically (i.e. by the cron scheduler) or manually by a user opening this notebook.
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 6e667327172..19f396ecd05 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -652,6 +652,22 @@ boolean isTerminated() {
return true;
}
+ /**
+ * Return true if there is a running or pending paragraph
+ */
+ boolean isRunningOrPending() {
+ synchronized (paragraphs) {
+ for (Paragraph p : paragraphs) {
+ Status status = p.getStatus();
+ if (status.isRunning() || status.isPending()) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
public boolean isTrash() {
String path = getName();
if (path.charAt(0) == '/') {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 8de981e6e32..d68cd4b75fd 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -890,16 +890,15 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
String noteId = context.getJobDetail().getJobDataMap().getString("noteId");
Note note = notebook.getNote(noteId);
- note.runAll();
- while (!note.isTerminated()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.error(e.toString(), e);
- }
+ if (note.isRunningOrPending()) {
+ logger.warn("execution of the cron job is skipped because there is a running or pending " +
+ "paragraph (note id: {})", noteId);
+ return;
}
+ note.runAll();
+
boolean releaseResource = false;
String cronExecutingUser = null;
try {
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index ba9e177690e..83c09327c35 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -361,6 +361,46 @@ public void testSchedule() throws InterruptedException, IOException {
notebook.removeNote(note.getId(), anonymous);
}
+ @Test
+ public void testScheduleAgainstRunningAndPendingParagraph() throws InterruptedException, IOException {
+ // create a note
+ Note note = notebook.createNote(anonymous);
+ interpreterSettingManager.setInterpreterBinding("user", note.getId(),
+ interpreterSettingManager.getInterpreterSettingIds());
+
+ // append running and pending paragraphs to the note
+ for (Status status: new Status[]{Status.RUNNING, Status.PENDING}) {
+ Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ Map config = new HashMap<>();
+ p.setConfig(config);
+ p.setText("p");
+ p.setStatus(status);
+ assertNull(p.getDateFinished());
+ }
+
+ // set cron scheduler, once a second
+ Map config = note.getConfig();
+ config.put("enabled", true);
+ config.put("cron", "* * * * * ?");
+ note.setConfig(config);
+ notebook.refreshCron(note.getId());
+ Thread.sleep(2 * 1000);
+
+ // remove cron scheduler.
+ config.put("cron", null);
+ note.setConfig(config);
+ notebook.refreshCron(note.getId());
+ Thread.sleep(2 * 1000);
+
+ // check if the executions of the running and pending paragraphs were skipped
+ for (Paragraph p : note.paragraphs) {
+ assertNull(p.getDateFinished());
+ }
+
+ // remove the note
+ notebook.removeNote(note.getId(), anonymous);
+ }
+
@Test
public void testSchedulePoolUsage() throws InterruptedException, IOException {
final int timeout = 30;