diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 2edb0565307..0f3ed19d3f4 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -289,4 +289,59 @@ Size in characters of the maximum text message to be received by websocket. Defaults to 1024000 + diff --git a/docs/install/install.md b/docs/install/install.md index 2edecc6caed..ac399f60c3c 100644 --- a/docs/install/install.md +++ b/docs/install/install.md @@ -336,6 +336,60 @@ You can configure Apache Zeppelin with either **environment variables** in `conf 1024000 Size (in characters) of the maximum text message that can be received by websocket. + + ZEPPELIN_SMTP_USER + zeppelin.mail.smtp.user + + SMTP user. + + + ZEPPELIN_SMTP_PASS + zeppelin.mail.smtp.password + + Password for SMTP user. + + + ZEPPELIN_SMTP_HOST + zeppelin.mail.smtp.host + smtp.googlemail.com + The SMTP server to connect to. + + + ZEPPELIN_SMTP_PROTOCOL + zeppelin.mail.smtp.protocol + smtp + SMTP protocol. + + + ZEPPELIN_SMTP_PORT + zeppelin.mail.smtp.port + 465 + The SMTP server port to connect. + + + ZEPPELIN_SMTP_STARTTLSZEPPELIN_SMTP_AUTH + zeppelin.mail.smtp.starttls.enable + true + If true, enables the use of the STARTTLS command (if supported by the server) to switch the connection to a TLS-protected connection before issuing any login commands. + + + ZEPPELIN_SMTP_AUTH + zeppelin.mail.smtp.auth + true + If true, attempt to authenticate the user using the AUTH command. + + + ZEPPELIN_SMTP_SOCKETFACTORY + zeppelin.mail.smtp.socketFactory.port + 465 + Specifies the port to connect to when using the specified socket factory. + + + ZEPPELIN_SMTP_SOCKETFACTORY_CLASS + zeppelin.mail.smtp.socketFactory.class + javax.net.ssl.SSLSocketFactory + Specifies the name of a class that implements the javax.net.SocketFactory interface. This class will be used to create SMTP sockets. + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 2dc1719ebac..eef947da9f0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -68,6 +68,10 @@ public boolean isRunning() { public boolean isPending() { return this == PENDING; } + + public boolean isError() { + return this == ERROR; + } } private String jobName; diff --git a/zeppelin-web/src/app/notebook/notebook-actionBar.html b/zeppelin-web/src/app/notebook/notebook-actionBar.html index 16f0e103cb8..cfe6d982b39 100644 --- a/zeppelin-web/src/app/notebook/notebook-actionBar.html +++ b/zeppelin-web/src/app/notebook/notebook-actionBar.html @@ -11,6 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. --> +

- +

+ + +
+ + +
+ + + \ No newline at end of file diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index ba84f24c027..f02faa552e6 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -97,6 +97,18 @@ commons-vfs2 2.0 + + + org.apache.commons + commons-email + 1.4 + + + javax.activation + activation + + + org.apache.jackrabbit diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 2e48a1f24d9..0fe25171cf4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -591,7 +591,17 @@ public static enum ConfVars { ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true), - ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); + ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"), + ZEPPELIN_SMTP_USER("zeppelin.mail.smtp.user", "mail"), + ZEPPELIN_SMTP_PASS("zeppelin.mail.smtp.password", "passwod"), + ZEPPELIN_SMTP_HOST("zeppelin.mail.smtp.host", "smtp.googlemail.com"), + ZEPPELIN_SMTP_PROTOCOL("zeppelin.mail.smtp.protocol", "smtp"), + ZEPPELIN_SMTP_PORT("zeppelin.mail.smtp.port", "465"), + ZEPPELIN_SMTP_STARTTLS("zeppelin.mail.smtp.starttls.enable", "true"), + ZEPPELIN_SMTP_AUTH("zeppelin.mail.smtp.auth", "true"), + ZEPPELIN_SMTP_SOCKETFACTORY("zeppelin.mail.smtp.socketFactory.port", "465"), + ZEPPELIN_SMTP_SOCKETFACTORY_CLASS("zeppelin.mail.smtp.socketFactory.class", + "javax.net.ssl.SSLSocketFactory"); private String varName; @SuppressWarnings("rawtypes") diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EmailNotification.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EmailNotification.java new file mode 100644 index 00000000000..397c5bc3e44 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EmailNotification.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import java.util.Map; +import org.apache.commons.mail.DefaultAuthenticator; +import org.apache.commons.mail.Email; +import org.apache.commons.mail.EmailException; +import org.apache.commons.mail.SimpleEmail; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.gson.internal.StringMap; + +/** + * + * Send email notifications + * + */ + +public class EmailNotification implements Notifications { + static Logger logger = LoggerFactory.getLogger(EmailNotification.class); + private ZeppelinConfiguration conf; + + public EmailNotification(ZeppelinConfiguration conf) { + this.conf = conf; + } + + @Override + public void start(Map config, String msg) { + Map notification = (Map) config.get("notification"); + Map emailMap = (Map) notification.get("email"); + String onStart = (String) emailMap.get("start"); + for (String email : onStart.split(",")) { + logger.info("Send email to: " + email); + sendEmail(onStart, msg); + } + } + + @Override + public void finish(Map config, String msg) { + Map notification = (Map) config.get("notification"); + Map emailMap = (Map) notification.get("email"); + String onSuccess = (String) emailMap.get("finish"); + for (String email : onSuccess.split(",")) { + logger.info("Send email to: " + email); + sendEmail(email, msg); + } + } + + @Override + public void error(Map config, String msg) { + Map notification = (Map) config.get("notification"); + Map emailMap = (Map) notification.get("email"); + String onError = (String) emailMap.get("error"); + for (String email : onError.split(",")) { + logger.info("Send email to: " + email); + sendEmail(onError, msg); + } + } + + public void sendEmail(String email, String msg) { + Email sessionEmail = new SimpleEmail(); + + try { + sessionEmail.setSmtpPort(Integer.parseInt(conf.getString(ConfVars.ZEPPELIN_SMTP_PORT))); + sessionEmail.setAuthenticator(new DefaultAuthenticator( + conf.getString(ConfVars.ZEPPELIN_SMTP_USER), + conf.getString(ConfVars.ZEPPELIN_SMTP_PASS))); + sessionEmail.setHostName(conf.getString(ConfVars.ZEPPELIN_SMTP_HOST)); + + sessionEmail.getMailSession().getProperties().put("mail.smtp.host", + conf.getString(ConfVars.ZEPPELIN_SMTP_HOST)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.protocol", + conf.getString(ConfVars.ZEPPELIN_SMTP_PROTOCOL)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.port", + conf.getString(ConfVars.ZEPPELIN_SMTP_PORT)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.starttls.enable", + conf.getString(ConfVars.ZEPPELIN_SMTP_STARTTLS)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.auth", + conf.getString(ConfVars.ZEPPELIN_SMTP_AUTH)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.port", + conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY)); + sessionEmail.getMailSession().getProperties().put("mail.smtp.socketFactory.class", + conf.getString(ConfVars.ZEPPELIN_SMTP_SOCKETFACTORY_CLASS)); + + sessionEmail.setFrom(conf.getString(ConfVars.ZEPPELIN_SMTP_USER)); + sessionEmail.addTo(email); + sessionEmail.setSubject("Note scheduler in Zeppelin"); + sessionEmail.setMsg(msg); + sessionEmail.send(); + + } catch (EmailException e) { + logger.error("Error: ", e); + } + } + + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventNotification.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventNotification.java new file mode 100644 index 00000000000..03017327662 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/EventNotification.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import java.util.Map; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event notifications + */ +public class EventNotification { + static Logger logger = LoggerFactory.getLogger(EventNotification.class); + Notifications emailNotification; + + public EventNotification(ZeppelinConfiguration conf) { + emailNotification = new EmailNotification(conf); + } + + public void enabledNotification(Note note) { + Map config = note.getConfig(); + + emailNotification.start(config, "Start execute note " + note.getName()); + while (!note.isTerminated()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error(e.toString(), e); + } + } + for (Paragraph para : note.paragraphs) { + logger.info(para.getStatus() + ""); + if (para.getStatus().isError()) { + //improve mail messages + String msg = "Error in paragraphs "; + if (para.getTitle() != null) { + msg = msg + para.getTitle() + "\n" + + para.getResult().message(); + } else { + msg = msg + para.getId() + "\n" + + para.getResult().message(); + } + emailNotification.error(config, msg); + } + } + emailNotification.finish(config, "Note " + note.getName() + " has finish."); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/Notifications.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/Notifications.java new file mode 100644 index 00000000000..914f5480118 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/event/Notifications.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.event; + +import java.util.Map; + +/** + * + * Notification interface + * + */ +public interface Notifications { + void start(Map config, String msg); + void finish(Map config, String msg); + void error(Map config, String msg); +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index aa08adf0f83..4177ddcce97 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -73,7 +73,7 @@ public class Note implements Serializable, ParagraphJobListener { delayedPersistThreadPool.setRemoveOnCancelPolicy(true); } - final List paragraphs = new LinkedList<>(); + public final List paragraphs = new LinkedList<>(); private String name = ""; private String id; @@ -520,7 +520,7 @@ public void run(String paragraphId) { /** * Check whether all paragraphs belongs to this note has terminated */ - boolean isTerminated() { + public boolean isTerminated() { synchronized (paragraphs) { for (Paragraph p : paragraphs) { if (!p.isTerminated()) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index c0932569af5..4a399246e83 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; @@ -51,11 +50,15 @@ import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.commons.mail.DefaultAuthenticator; +import org.apache.commons.mail.Email; +import org.apache.commons.mail.EmailException; +import org.apache.commons.mail.SimpleEmail; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.event.EventNotification; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -69,6 +72,7 @@ import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; +import com.google.gson.internal.StringMap; /** * Collection of Notes. @@ -119,6 +123,7 @@ public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo, quartzSched = quertzSchedFact.getScheduler(); quartzSched.start(); CronJob.notebook = this; + CronJob.notification = new EventNotification(conf); AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS; loadAllNotes(anonymous); @@ -775,6 +780,7 @@ public List> getJobListByUnixTime(boolean needsReload, */ public static class CronJob implements org.quartz.Job { public static Notebook notebook; + public static EventNotification notification; @Override public void execute(JobExecutionContext context) throws JobExecutionException { @@ -783,12 +789,19 @@ public void execute(JobExecutionContext context) throws JobExecutionException { Note note = notebook.getNote(noteId); note.runAll(); - while (!note.isTerminated()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.error(e.toString(), e); - } + Map notificationConf = (Map) note.getConfig() + .get("notification"); + + if (notificationConf != null) { + notification.enabledNotification(note); + } else { + while (!note.isTerminated()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error(e.toString(), e); + } + } } boolean releaseResource = false;