From 7abad201a63df51d04326231cbf429122540e311 Mon Sep 17 00:00:00 2001 From: Ali Bajwa Date: Mon, 5 Oct 2015 22:40:06 -0700 Subject: [PATCH 01/14] initial commit of pig interpreter --- conf/zeppelin-site.xml.template | 2 +- pig/pom.xml | 143 ++++++++++++++++++ .../apache/zeppelin/pig/PigInterpreter.java | 137 +++++++++++++++++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 2 + 5 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 pig/pom.xml create mode 100644 pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 05bd7195277..c4b369c301c 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -190,7 +190,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/pig/pom.xml b/pig/pom.xml new file mode 100644 index 00000000000..62e953adedb --- /dev/null +++ b/pig/pom.xml @@ -0,0 +1,143 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-pig + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: Apache Pig Interpreter + Zeppelin interprter for Apache Pig + http://zeppelin.incubator.apache.org + + + 0.15.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.pig + pig + ${pig.version} + + + + junit + junit + test + + + + org.mockito + mockito-all + 1.9.5 + test + + + + com.mockrunner + mockrunner-jdbc + 1.0.8 + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/pig + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/pig + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java new file mode 100644 index 00000000000..6b9a73216e9 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.PumpStreamHandler; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pig interpreter for Zeppelin. + * + * @author abajwa-hw + * + */ +public class PigInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(PigInterpreter.class); + int commandTimeOut = 600000; + + static final String PIG_START_ARGS = "pig.start.args"; + static final String DEFAULT_START_ARGS = "-useHCatalog -exectype tez"; + + static { + Interpreter.register( + "pig", + "pig", + PigInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(PIG_START_ARGS, DEFAULT_START_ARGS, "Starting arguments") + .build() + ); + } + + public PigInterpreter(Properties property) { + super(property); + } + + @Override + public void open() {} + + @Override + public void close() {} + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + + CommandLine cmdLine = CommandLine.parse("pig"); + + // add any arguments specified + String startArgs = getProperty((PIG_START_ARGS).trim()); + if (startArgs.length() > 0){ + logger.info("Start arguments passed to pig: " + startArgs); + List argList = Arrays.asList(startArgs.split("\\s+")); + for (String arg : argList) { + cmdLine.addArgument(arg, false); + } + } + + logger.info("Run pig command '" + cmd + "'"); + long start = System.currentTimeMillis(); + cmdLine.addArgument("-e", false); + cmdLine.addArgument(cmd, false); + DefaultExecutor executor = new DefaultExecutor(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + executor.setStreamHandler(new PumpStreamHandler(outputStream)); + + executor.setWatchdog(new ExecuteWatchdog(commandTimeOut)); + try { + int exitValue = executor.execute(cmdLine); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString()); + } catch (ExecuteException e) { + logger.error("Can not run " + cmd, e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } catch (IOException e) { + logger.error("Can not run " + cmd, e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } + + @Override + public void cancel(InterpreterContext context) {} + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + PigInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + +} + diff --git a/pom.xml b/pom.xml index 03b226341e1..558ce0624a1 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ shell livy hbase + pig postgresql jdbc file diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index d819869f8fd..583a5ef6474 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -521,6 +521,8 @@ public static enum ConfVars { + "org.apache.zeppelin.alluxio.AlluxioInterpreter," + "org.apache.zeppelin.file.HDFSFileInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + + "org.apache.zeppelin.pig.PigInterpreter," + + "org.apache.zeppelin.pig.PigQueryInterpreter," + "org.apache.zeppelin.flink.FlinkInterpreter," + "org.apache.zeppelin.python.PythonInterpreter," + "org.apache.zeppelin.python.PythonInterpreterPandasSql," From 2586336e44982ac3c92156c24a288c540792d0c5 Mon Sep 17 00:00:00 2001 From: Ali Bajwa Date: Mon, 5 Oct 2015 23:25:47 -0700 Subject: [PATCH 02/14] exposed timeout and pig executable via interpreter and added comments --- .../apache/zeppelin/pig/PigInterpreter.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 6b9a73216e9..3d43c80c7c2 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.List; import java.util.Properties; -import java.util.ArrayList; import java.util.Arrays; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; @@ -40,24 +39,34 @@ /** * Pig interpreter for Zeppelin. - * + * Closely follows code for shell interpreter * @author abajwa-hw * */ public class PigInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(PigInterpreter.class); - int commandTimeOut = 600000; + //Executable name used to start grunt shell + static final String PIG_START_EXE = "pig.executable"; + static final String DEFAULT_START_EXE = "pig"; + + //Arguments to start pig with. More details available via 'pig -help' static final String PIG_START_ARGS = "pig.start.args"; static final String DEFAULT_START_ARGS = "-useHCatalog -exectype tez"; + //How long to wait before timing out (ms) + static final String PIG_TIMEOUT_MS = "pig.timeout.ms"; + static final String DEFAULT_TIMEOUT_MS = "600000"; + static { Interpreter.register( "pig", "pig", PigInterpreter.class.getName(), new InterpreterPropertyBuilder() + .add(PIG_START_EXE, DEFAULT_START_EXE, "Pig executable used to start grunt shell") .add(PIG_START_ARGS, DEFAULT_START_ARGS, "Starting arguments") + .add(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, "Timeout (ms)") .build() ); } @@ -75,11 +84,12 @@ public void close() {} @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + // use commandline to store string corresponding to pig shell command + // start with pig exectable name (or full path if provided)... + CommandLine cmdLine = CommandLine.parse(getProperty(PIG_START_EXE).trim()); - CommandLine cmdLine = CommandLine.parse("pig"); - - // add any arguments specified - String startArgs = getProperty((PIG_START_ARGS).trim()); + // ...add any CLI arguments specified by user in interpreter settings + String startArgs = getProperty(PIG_START_ARGS).trim(); if (startArgs.length() > 0){ logger.info("Start arguments passed to pig: " + startArgs); List argList = Arrays.asList(startArgs.split("\\s+")); @@ -87,15 +97,18 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr cmdLine.addArgument(arg, false); } } - + // ...finally add contents of pig cell after the -e flag logger.info("Run pig command '" + cmd + "'"); long start = System.currentTimeMillis(); cmdLine.addArgument("-e", false); cmdLine.addArgument(cmd, false); + + // execute command and return success/failure based on its exit value DefaultExecutor executor = new DefaultExecutor(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); executor.setStreamHandler(new PumpStreamHandler(outputStream)); + int commandTimeOut = Integer.parseInt(getProperty(PIG_TIMEOUT_MS)); executor.setWatchdog(new ExecuteWatchdog(commandTimeOut)); try { int exitValue = executor.execute(cmdLine); From c28beb588922a5edbc9d5db3336bbdba9a22c3a1 Mon Sep 17 00:00:00 2001 From: Ali Bajwa Date: Tue, 6 Oct 2015 22:02:18 -0700 Subject: [PATCH 03/14] Updated based on comments: 1. Documentation: added pig.md with interpreter documentation and added pig entry to index.md 2. Added test junit test based on passwd file parsing example here https://pig.apache.org/docs/r0.10.0/start.html#run 3. Removed author tag from comment (this was copied from shell interpreter https://github.com/apache/incubator-zeppelin/blob/master/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java#L42) 4. Implemented cancel functionality 5. Display output stream in case of error --- docs/docs/interpreter/pig.md | 49 +++++++ pig/pom.xml | 6 + .../apache/zeppelin/pig/PigInterpreter.java | 30 +++-- .../zeppelin/pig/PigInterpreterTest.java | 125 ++++++++++++++++++ 4 files changed, 198 insertions(+), 12 deletions(-) create mode 100644 docs/docs/interpreter/pig.md create mode 100644 pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java diff --git a/docs/docs/interpreter/pig.md b/docs/docs/interpreter/pig.md new file mode 100644 index 00000000000..34428be8577 --- /dev/null +++ b/docs/docs/interpreter/pig.md @@ -0,0 +1,49 @@ +--- +layout: page +title: "Pig Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + + +## Pig interpreter for Apache Zeppelin +[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets. + +- Supported operations through Zeppelin + - Play button: Run multiple lines of pig latin (delimited by semi-colon) entered into Zeppelin cell + - Pause button: Cancel the exection of pig + - Output: Output of jobs are displayed in Zeppelin (for both jobs that were sucessful and those that failed) + +- Unsupported operations + - Progress bar + +### How to setup Pig +Install Pig as you would normally do on the same node where Zeppelin is running - see [documentation](https://pig.apache.org/). If installing through Ambari, you just need to install the client on the Zeppelin node. + +### How to configure interpreter +At the "Interpreters" menu, you have to create a new Pig interpreter and provide next properties: + +property | value | Description +---------|----------|----- +executable | pig | Path to pig executable. If pig is part of PATH, then just pig will work +args | -useHCatalog -exectype local | Arguments to pass pig when starting. Launch 'pig -help' for list of supported options. For example, the options for exectype: local|mapreduce|tez +timeout | 600000 | Time (in milliseconds) to wait for pig job before timing out + +### How to test it's working + +Run below example taken from [this tutorial](http://hortonworks.com/hadoop-tutorial/how-to-use-basic-pig-commands/) + +``` +%sh +rm -f infochimps_dataset_4778_download_16677-csv.zip +wget https://s3.amazonaws.com/hw-sandbox/tutorial1/infochimps_dataset_4778_download_16677-csv.zip -O /tmp/infochimps_dataset_4778_download_16677-csv.zip +unzip /tmp/infochimps_dataset_4778_download_16677-csv.zip -d /tmp +``` +``` +%pig +DIV_A = LOAD 'file:///tmp/infochimps_dataset_4778_download_16677/NYSE/NYSE_dividends_A.csv' using PigStorage(',') AS (exchange:chararray, symbol:chararray, date:chararray, dividend:float); +B = FILTER DIV_A BY symbol=='AZZ'; +C = GROUP B BY dividend; +dump C; +``` diff --git a/pig/pom.xml b/pig/pom.xml index 62e953adedb..7cb8dcc3223 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -51,6 +51,12 @@ ${pig.version} + + org.apache.commons + commons-io + 1.3.2 + + junit junit diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 3d43c80c7c2..3576b4a8780 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -40,24 +40,25 @@ /** * Pig interpreter for Zeppelin. * Closely follows code for shell interpreter - * @author abajwa-hw - * */ public class PigInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(PigInterpreter.class); + private static final String NEWLINE = "\n"; + //Executable name used to start grunt shell - static final String PIG_START_EXE = "pig.executable"; - static final String DEFAULT_START_EXE = "pig"; + public static final String PIG_START_EXE = "pig.executable"; + public static final String DEFAULT_START_EXE = "pig"; //Arguments to start pig with. More details available via 'pig -help' - static final String PIG_START_ARGS = "pig.start.args"; - static final String DEFAULT_START_ARGS = "-useHCatalog -exectype tez"; + public static final String PIG_START_ARGS = "pig.start.args"; + public static final String DEFAULT_START_ARGS = "-useHCatalog -exectype local"; //How long to wait before timing out (ms) - static final String PIG_TIMEOUT_MS = "pig.timeout.ms"; - static final String DEFAULT_TIMEOUT_MS = "600000"; + public static final String PIG_TIMEOUT_MS = "pig.timeout.ms"; + public static final String DEFAULT_TIMEOUT_MS = "600000"; + DefaultExecutor executor = null; static { Interpreter.register( "pig", @@ -104,7 +105,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr cmdLine.addArgument(cmd, false); // execute command and return success/failure based on its exit value - DefaultExecutor executor = new DefaultExecutor(); + executor = new DefaultExecutor(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); executor.setStreamHandler(new PumpStreamHandler(outputStream)); @@ -115,15 +116,20 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString()); } catch (ExecuteException e) { logger.error("Can not run " + cmd, e); - return new InterpreterResult(Code.ERROR, e.getMessage()); + return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString()); } catch (IOException e) { logger.error("Can not run " + cmd, e); - return new InterpreterResult(Code.ERROR, e.getMessage()); + return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString()); } } @Override - public void cancel(InterpreterContext context) {} + public void cancel(InterpreterContext context) { + if (executor != null) { + executor.getWatchdog().destroyProcess(); + } + } + @Override public FormType getFormType() { diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java new file mode 100644 index 00000000000..03bef545226 --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.pig; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Properties; +import java.io.PrintWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.io.IOException; +import java.io.StringWriter; +import org.apache.commons.io.FileUtils; +import static org.apache.zeppelin.pig.PigInterpreter.*; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import static org.junit.Assert.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class PigInterpreterTest { + + private static PigInterpreter pig; + private static InterpreterContext context; + private static final String PASSWD_FILE = "/tmp/tmp_zeppelin_dummypasswd"; + private static final String USERS_FILE = "/tmp/tmp_zeppelin_dummyusers"; + + @BeforeClass + public static void setUp() { + Properties properties = new Properties(); + properties.put(PIG_START_EXE, DEFAULT_START_EXE); + properties.put(PIG_START_ARGS, DEFAULT_START_ARGS); + properties.put(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS); + + pig = new PigInterpreter(properties); + pig.open(); + + context = new InterpreterContext(null, null, null, null, null, null, null, null); + } + + @AfterClass + public static void tearDown() { + pig.close(); + pig.destroy(); + try { + org.apache.commons.io.FileUtils.forceDelete(new File(PASSWD_FILE)); + org.apache.commons.io.FileUtils.forceDelete(new File(USERS_FILE)); + } catch (IOException e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + fail("Unable to cleanup temp pig files:\n" + sw.toString()); + } + } + + /** + * Extract users from a dummy passwd file + * https://pig.apache.org/docs/r0.10.0/start.html#run + */ + @Test + public void testExtractUsers() { + PrintWriter out = null; + StringWriter sw = new StringWriter(); + + try { + out = new PrintWriter(new File(PASSWD_FILE)); + out.println("user1:pass1"); + out.println("user2:pass2"); + out.println("user3:pass3"); + out.println("user4:pass4"); + out.flush(); + } catch (FileNotFoundException e) { + e.printStackTrace(new PrintWriter(sw)); + fail("Unable to write to "+PASSWD_FILE+":\n" + sw.toString()); + } finally { + if (out != null){ + out.close(); + } + } + + try { + FileUtils.deleteDirectory(new File(USERS_FILE)); + } catch (IOException e) { + e.printStackTrace(new PrintWriter(sw)); + fail("Unable to delete: "+USERS_FILE+". Error: " + sw.toString()); + } + String pigScript = "A = load 'file://"+PASSWD_FILE+"' using PigStorage(':');"; + pigScript += "B = foreach A generate $0 as id;"; + pigScript += "store B into 'file://"+USERS_FILE+"';"; + + InterpreterResult result = pig.interpret(pigScript, context); + + String readusers = null; + try { + readusers = new String(Files.readAllBytes(Paths.get(USERS_FILE+"/part-m-00000"))); + } catch (IOException e) { + e.printStackTrace(new PrintWriter(sw)); + fail("Unable read output of pig job from: "+USERS_FILE+"/part-m-00000. Error:\n" + sw.toString()); + } + assertEquals(readusers, "user1\nuser2\nuser3\nuser4\n"); + + } + + +} From a09a7f78c6f72cf8bf5623a55c71cc06141671aa Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 28 Sep 2016 19:41:24 +0800 Subject: [PATCH 04/14] refactor pig Interpreter --- bin/interpreter.sh | 18 + conf/interpreter-list | 1 + docs/docs/interpreter/pig.md | 49 --- pig/pom.xml | 307 ++++++++++-------- .../zeppelin/pig/BasePigInterpreter.java | 100 ++++++ .../apache/zeppelin/pig/PigInterpreter.java | 187 +++++------ .../zeppelin/pig/PigQueryInterpreter.java | 169 ++++++++++ .../zeppelin/pig/PigScriptListener.java | 94 ++++++ .../org/apache/zeppelin/pig/PigUtils.java | 290 +++++++++++++++++ .../main/resources/interpreter-setting.json | 46 +++ .../zeppelin/pig/PigInterpreterTest.java | 210 +++++++----- .../zeppelin/pig/PigQueryInterpreterTest.java | 153 +++++++++ pig/src/test/resources/log4j.properties | 22 ++ .../zeppelin/conf/ZeppelinConfiguration.java | 2 +- 14 files changed, 1277 insertions(+), 371 deletions(-) delete mode 100644 docs/docs/interpreter/pig.md create mode 100644 pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java create mode 100644 pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java create mode 100644 pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java create mode 100644 pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java create mode 100644 pig/src/main/resources/interpreter-setting.json create mode 100644 pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java create mode 100644 pig/src/test/resources/log4j.properties diff --git a/bin/interpreter.sh b/bin/interpreter.sh index a81c8f21067..894777dc716 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -149,6 +149,24 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then else echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded" fi +elif [[ "${INTERPRETER_ID}" == "pig" ]]; then + # autodetect HADOOP_CONF_HOME by heuristic + if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then + if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then + export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" + elif [[ -d "/etc/hadoop/conf" ]]; then + export HADOOP_CONF_DIR="/etc/hadoop/conf" + fi + fi + + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" + fi + + # autodetect TEZ_CONF_DIR + TEZ_CONF_DIR = ${TEZ_CONF_DIR:=/etc/tez/conf} + echo "TEZ_CONF_DIR:${TEZ_CONF_DIR}" + ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}" fi addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}" diff --git a/conf/interpreter-list b/conf/interpreter-list index 098b3c6c188..72349b4b0c8 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -25,6 +25,7 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandr elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11 +pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter diff --git a/docs/docs/interpreter/pig.md b/docs/docs/interpreter/pig.md deleted file mode 100644 index 34428be8577..00000000000 --- a/docs/docs/interpreter/pig.md +++ /dev/null @@ -1,49 +0,0 @@ ---- -layout: page -title: "Pig Interpreter" -description: "" -group: manual ---- -{% include JB/setup %} - - -## Pig interpreter for Apache Zeppelin -[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets. - -- Supported operations through Zeppelin - - Play button: Run multiple lines of pig latin (delimited by semi-colon) entered into Zeppelin cell - - Pause button: Cancel the exection of pig - - Output: Output of jobs are displayed in Zeppelin (for both jobs that were sucessful and those that failed) - -- Unsupported operations - - Progress bar - -### How to setup Pig -Install Pig as you would normally do on the same node where Zeppelin is running - see [documentation](https://pig.apache.org/). If installing through Ambari, you just need to install the client on the Zeppelin node. - -### How to configure interpreter -At the "Interpreters" menu, you have to create a new Pig interpreter and provide next properties: - -property | value | Description ----------|----------|----- -executable | pig | Path to pig executable. If pig is part of PATH, then just pig will work -args | -useHCatalog -exectype local | Arguments to pass pig when starting. Launch 'pig -help' for list of supported options. For example, the options for exectype: local|mapreduce|tez -timeout | 600000 | Time (in milliseconds) to wait for pig job before timing out - -### How to test it's working - -Run below example taken from [this tutorial](http://hortonworks.com/hadoop-tutorial/how-to-use-basic-pig-commands/) - -``` -%sh -rm -f infochimps_dataset_4778_download_16677-csv.zip -wget https://s3.amazonaws.com/hw-sandbox/tutorial1/infochimps_dataset_4778_download_16677-csv.zip -O /tmp/infochimps_dataset_4778_download_16677-csv.zip -unzip /tmp/infochimps_dataset_4778_download_16677-csv.zip -d /tmp -``` -``` -%pig -DIV_A = LOAD 'file:///tmp/infochimps_dataset_4778_download_16677/NYSE/NYSE_dividends_A.csv' using PigStorage(',') AS (exchange:chararray, symbol:chararray, date:chararray, dividend:float); -B = FILTER DIV_A BY symbol=='AZZ'; -C = GROUP B BY dividend; -dump C; -``` diff --git a/pig/pom.xml b/pig/pom.xml index 7cb8dcc3223..d54d1ede3ac 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -16,134 +16,185 @@ ~ limitations under the License. --> - - 4.0.0 + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + - - zeppelin org.apache.zeppelin - 0.6.0-incubating-SNAPSHOT - - - org.apache.zeppelin - zeppelin-pig - jar - 0.6.0-incubating-SNAPSHOT - Zeppelin: Apache Pig Interpreter - Zeppelin interprter for Apache Pig - http://zeppelin.incubator.apache.org - - - 0.15.0 - - - - - org.apache.zeppelin - zeppelin-interpreter - ${project.version} - provided - - - - org.apache.pig - pig - ${pig.version} - - - - org.apache.commons - commons-io - 1.3.2 - - - - junit - junit - test - - - - org.mockito - mockito-all - 1.9.5 - test - - - - com.mockrunner - mockrunner-jdbc - 1.0.8 - test - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - 2.7 - - true - - - - - maven-enforcer-plugin - 1.3.1 - - - enforce - none - - - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/../../interpreter/pig - false - false - true - runtime - - - - copy-artifact - package - - copy - - - ${project.build.directory}/../../interpreter/pig - false - false - true - runtime - - - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.packaging} - - - - - - - - - + zeppelin-pig + jar + 0.7.0-SNAPSHOT + Zeppelin: Apache Pig Interpreter + Zeppelin interpreter for Apache Pig + http://zeppelin.apache.org + + + 0.16.0 + 2.6.0 + 0.7.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.pig + pig + h2 + ${pig.version} + + + + org.apache.logging.log4j + log4j-api + 2.3 + + + + org.apache.logging.log4j + log4j-core + 2.3 + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + org.apache.tez + tez-api + ${tez.version} + + + + org.apache.tez + tez-common + ${tez.version} + + + + org.apache.tez + tez-dag + ${tez.version} + + + + org.apache.tez + tez-runtime-library + ${tez.version} + + + + org.apache.tez + tez-runtime-internals + ${tez.version} + + + + org.apache.tez + tez-mapreduce + ${tez.version} + + + + org.apache.tez + tez-yarn-timeline-history-with-acls + ${tez.version} + + + + junit + junit + test + + + + + + + + + + + + + + + + + + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/pig + + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/pig + + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java new file mode 100644 index 00000000000..0aa8a20f7ce --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.PigServer; +import org.apache.pig.backend.BackendException; +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + */ +public abstract class BasePigInterpreter extends Interpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class); + + protected ConcurrentHashMap listenerMap = new ConcurrentHashMap<>(); + + public BasePigInterpreter(Properties property) { + super(property); + } + + @Override + public void cancel(InterpreterContext context) { + LOGGER.info("Cancel paragraph:" + context.getParagraphId()); + PigScriptListener listener = listenerMap.get(context.getParagraphId()); + if (listener != null) { + Set jobIds = listener.getJobIds(); + if (jobIds.isEmpty()) { + LOGGER.info("No job is started, so can not cancel paragraph:" + context.getParagraphId()); + } + for (String jobId : jobIds) { + LOGGER.info("Kill jobId:" + jobId); + HExecutionEngine engine = + (HExecutionEngine) getPigServer().getPigContext().getExecutionEngine(); + try { + Field launcherField = HExecutionEngine.class.getDeclaredField("launcher"); + launcherField.setAccessible(true); + Launcher launcher = (Launcher) launcherField.get(engine); + // It doesn't work for Tez Engine due to PIG-5035 + launcher.killJob(jobId, new Configuration()); + } catch (NoSuchFieldException | BackendException | IllegalAccessException e) { + LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), e); + } + } + } else { + LOGGER.warn("No PigScriptListener found, can not cancel paragraph:" + + context.getParagraphId()); + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + PigScriptListener listener = listenerMap.get(context.getParagraphId()); + if (listener != null) { + return listener.getProgress(); + } + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + PigInterpreter.class.getName() + this.hashCode()); + } + + public abstract PigServer getPigServer(); +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 3576b4a8780..1f43e23cc84 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -17,139 +17,120 @@ package org.apache.zeppelin.pig; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; -import java.util.Properties; -import java.util.Arrays; -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.commons.exec.ExecuteException; -import org.apache.commons.exec.ExecuteWatchdog; -import org.apache.commons.exec.PumpStreamHandler; -import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigServer; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigstats.*; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; + /** * Pig interpreter for Zeppelin. - * Closely follows code for shell interpreter */ -public class PigInterpreter extends Interpreter { - Logger logger = LoggerFactory.getLogger(PigInterpreter.class); - - private static final String NEWLINE = "\n"; - - //Executable name used to start grunt shell - public static final String PIG_START_EXE = "pig.executable"; - public static final String DEFAULT_START_EXE = "pig"; +public class PigInterpreter extends BasePigInterpreter { + private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class); - //Arguments to start pig with. More details available via 'pig -help' - public static final String PIG_START_ARGS = "pig.start.args"; - public static final String DEFAULT_START_ARGS = "-useHCatalog -exectype local"; - - //How long to wait before timing out (ms) - public static final String PIG_TIMEOUT_MS = "pig.timeout.ms"; - public static final String DEFAULT_TIMEOUT_MS = "600000"; - - DefaultExecutor executor = null; - static { - Interpreter.register( - "pig", - "pig", - PigInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add(PIG_START_EXE, DEFAULT_START_EXE, "Pig executable used to start grunt shell") - .add(PIG_START_ARGS, DEFAULT_START_ARGS, "Starting arguments") - .add(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, "Timeout (ms)") - .build() - ); - } + private PigServer pigServer; + private boolean includeJobStats = false; public PigInterpreter(Properties property) { super(property); } @Override - public void open() {} - - @Override - public void close() {} - - - @Override - public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { - // use commandline to store string corresponding to pig shell command - // start with pig exectable name (or full path if provided)... - CommandLine cmdLine = CommandLine.parse(getProperty(PIG_START_EXE).trim()); - - // ...add any CLI arguments specified by user in interpreter settings - String startArgs = getProperty(PIG_START_ARGS).trim(); - if (startArgs.length() > 0){ - logger.info("Start arguments passed to pig: " + startArgs); - List argList = Arrays.asList(startArgs.split("\\s+")); - for (String arg : argList) { - cmdLine.addArgument(arg, false); - } + public void open() { + String execType = getProperty("zeppelin.pig.execType"); + if (execType == null) { + execType = "mapreduce"; + } + String includeJobStats = getProperty("zeppelin.pig.includeJobStats"); + if (includeJobStats != null) { + this.includeJobStats = Boolean.parseBoolean(includeJobStats); } - // ...finally add contents of pig cell after the -e flag - logger.info("Run pig command '" + cmd + "'"); - long start = System.currentTimeMillis(); - cmdLine.addArgument("-e", false); - cmdLine.addArgument(cmd, false); - - // execute command and return success/failure based on its exit value - executor = new DefaultExecutor(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - executor.setStreamHandler(new PumpStreamHandler(outputStream)); - - int commandTimeOut = Integer.parseInt(getProperty(PIG_TIMEOUT_MS)); - executor.setWatchdog(new ExecuteWatchdog(commandTimeOut)); try { - int exitValue = executor.execute(cmdLine); - return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString()); - } catch (ExecuteException e) { - logger.error("Can not run " + cmd, e); - return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString()); + pigServer = new PigServer(execType); } catch (IOException e) { - logger.error("Can not run " + cmd, e); - return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString()); + throw new RuntimeException("Fail to launch PigServer", e); } } @Override - public void cancel(InterpreterContext context) { - if (executor != null) { - executor.getWatchdog().destroyProcess(); - } + public void close() { + pigServer = null; } - - @Override - public FormType getFormType() { - return FormType.SIMPLE; - } @Override - public int getProgress(InterpreterContext context) { - return 0; + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + // remember the origial stdout, because we will redirect stdout to capture + // the pig dump output. + PrintStream originalStdOut = System.out; + ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream(); + File tmpFile = null; + try { + tmpFile = PigUtils.createTempPigScript(cmd); + System.setOut(new PrintStream(bytesOutput)); + // each thread should its own ScriptState & PigStats + ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState()); + // reset PigStats, otherwise you may get the PigStats of last job in the same thread + // because PigStats is ThreadLocal variable + PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats()); + PigScriptListener scriptListener = new PigScriptListener(); + ScriptState.get().registerListener(scriptListener); + listenerMap.put(contextInterpreter.getParagraphId(), scriptListener); + pigServer.registerScript(tmpFile.getAbsolutePath()); + } catch (IOException e) { + if (e instanceof FrontendException) { + FrontendException fe = (FrontendException) e; + if (!fe.getMessage().contains("Backend error :")) { + // If the error message contains "Backend error :", that means the exception is from + // backend. + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + PigStats stats = PigStats.get(); + if (stats != null) { + String errorMsg = PigUtils.extactJobStats(stats); + if (errorMsg != null) { + LOGGER.debug("Error Message:" + errorMsg); + return new InterpreterResult(Code.ERROR, errorMsg); + } + } + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + System.setOut(originalStdOut); + listenerMap.remove(contextInterpreter.getParagraphId()); + if (tmpFile != null) { + tmpFile.delete(); + } + } + StringBuilder outputBuilder = new StringBuilder(); + PigStats stats = PigStats.get(); + if (stats != null && includeJobStats) { + String jobStats = PigUtils.extactJobStats(stats); + if (jobStats != null) { + outputBuilder.append(jobStats); + } + } + if (!outputBuilder.toString().isEmpty() || !bytesOutput.toString().isEmpty()) { + outputBuilder.append("------------- Pig Output --------------\n"); + } + outputBuilder.append(bytesOutput.toString()); + return new InterpreterResult(Code.SUCCESS, outputBuilder.toString()); } - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - PigInterpreter.class.getName() + this.hashCode()); - } - @Override - public List completion(String buf, int cursor) { - return null; + public PigServer getPigServer() { + return pigServer; } } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java new file mode 100644 index 00000000000..164b8015c92 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +/** + * + */ +public class PigQueryInterpreter extends BasePigInterpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class); + private PigServer pigServer; + private int maxResult; + + public PigQueryInterpreter(Properties properties) { + super(properties); + } + + @Override + public void open() { + pigServer = getPigInterpreter().getPigServer(); + maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult")); + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String alias = "paragraph_" + context.getParagraphId().replace("-", "_"); + String[] lines = st.split("\n"); + List queries = new ArrayList(); + for (int i = 0; i < lines.length; ++i) { + if (i == lines.length - 1) { + lines[i] = alias + " = " + lines[i]; + } + queries.add(lines[i]); + } + + StringBuilder resultBuilder = new StringBuilder("%table "); + try { + File tmpScriptFile = PigUtils.createTempPigScript(queries); + // each thread should its own ScriptState & PigStats + ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState()); + // reset PigStats, otherwise you may get the PigStats of last job in the same thread + // because PigStats is ThreadLocal variable + PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats()); + PigScriptListener scriptListener = new PigScriptListener(); + ScriptState.get().registerListener(scriptListener); + listenerMap.put(context.getParagraphId(), scriptListener); + pigServer.registerScript(tmpScriptFile.getAbsolutePath()); + Schema schema = pigServer.dumpSchema(alias); + boolean schemaKnown = (schema != null); + if (schemaKnown) { + for (int i = 0; i < schema.size(); ++i) { + Schema.FieldSchema field = schema.getField(i); + resultBuilder.append(field.alias); + if (i != schema.size() - 1) { + resultBuilder.append("\t"); + } + } + resultBuilder.append("\n"); + } + Iterator iter = pigServer.openIterator(alias); + boolean firstRow = true; + int index = 0; + while (iter.hasNext() && index <= maxResult) { + index++; + Tuple tuple = iter.next(); + if (firstRow && !schemaKnown) { + for (int i = 0; i < tuple.size(); ++i) { + resultBuilder.append("c_" + i + "\t"); + } + resultBuilder.append("\n"); + firstRow = false; + } + resultBuilder.append(StringUtils.join(tuple, "\t")); + resultBuilder.append("\n"); + } + if (index >= maxResult && iter.hasNext()) { + resultBuilder.append("\nResults are limited by " + maxResult + "."); + } + } catch (IOException e) { + // Extract error in the following order + // 1. catch FrontendException, FrontendException happens in the query compilation phase. + // 2. PigStats, This is execution error + // 3. Other errors. + if (e instanceof FrontendException) { + FrontendException fe = (FrontendException) e; + if (!fe.getMessage().contains("Backend error :")) { + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + PigStats stats = PigStats.get(); + if (stats != null) { + String errorMsg = PigUtils.extactJobStats(stats); + if (errorMsg != null) { + return new InterpreterResult(Code.ERROR, errorMsg); + } + } + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + listenerMap.remove(context.getParagraphId()); + } + return new InterpreterResult(Code.SUCCESS, resultBuilder.toString()); + } + + @Override + public PigServer getPigServer() { + return this.pigServer; + } + + private PigInterpreter getPigInterpreter() { + LazyOpenInterpreter lazy = null; + PigInterpreter pig = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + pig = (PigInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return pig; + } +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java new file mode 100644 index 00000000000..1f88b2ee6cc --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + +import org.apache.pig.impl.plan.OperatorPlan; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigProgressNotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * + */ +public class PigScriptListener implements PigProgressNotificationListener { + + private static Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class); + + private Set jobIds = new HashSet(); + private int progress; + + @Override + public void initialPlanNotification(String scriptId, OperatorPlan plan) { + + } + + @Override + public void launchStartedNotification(String scriptId, int numJobsToLaunch) { + + } + + @Override + public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) { + + } + + @Override + public void jobStartedNotification(String scriptId, String assignedJobId) { + this.jobIds.add(assignedJobId); + } + + @Override + public void jobFinishedNotification(String scriptId, JobStats jobStats) { + + } + + @Override + public void jobFailedNotification(String scriptId, JobStats jobStats) { + + } + + @Override + public void outputCompletedNotification(String scriptId, OutputStats outputStats) { + + } + + @Override + public void progressUpdatedNotification(String scriptId, int progress) { + LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress); + this.progress = progress; + } + + @Override + public void launchCompletedNotification(String scriptId, int numJobsSucceeded) { + + } + + public Set getJobIds() { + return jobIds; + } + + public int getProgress() { + return progress; + } +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java new file mode 100644 index 00000000000..bcabcee4751 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigRunner; +import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; +import org.apache.pig.tools.pigstats.InputStats; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; +import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; +import org.apache.pig.tools.pigstats.tez.TezDAGStats; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Field; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class PigUtils { + + private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class); + + protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static File createTempPigScript(String content) throws IOException { + File tmpFile = File.createTempFile("zeppelin", "pig"); + LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath()); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + return tmpFile.getAbsoluteFile(); + } + + public static File createTempPigScript(List lines) throws IOException { + return createTempPigScript(StringUtils.join(lines, "\n")); + } + + public static String extactJobStats(PigStats stats) { + if (stats instanceof SimplePigStats) { + return extractFromSimplePigStats((SimplePigStats) stats); + } else if (stats instanceof TezPigScriptStats) { + return extractFromTezPigStats((TezPigScriptStats) stats); + } else { + throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName()); + } + } + + public static String extractFromSimplePigStats(SimplePigStats stats) { + + try { + Field userIdField = PigStats.class.getDeclaredField("userId"); + userIdField.setAccessible(true); + String userId = (String) (userIdField.get(stats)); + Field startTimeField = PigStats.class.getDeclaredField("startTime"); + startTimeField.setAccessible(true); + long startTime = (Long) (startTimeField.get(stats)); + Field endTimeField = PigStats.class.getDeclaredField("endTime"); + endTimeField.setAccessible(true); + long endTime = (Long) (endTimeField.get(stats)); + + if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { + LOGGER.warn("unknown return code, can't display the results"); + return null; + } + if (stats.getPigContext() == null) { + LOGGER.warn("unknown exec type, don't display the results"); + return null; + } + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + StringBuilder sb = new StringBuilder(); + sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n"); + sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t") + .append(userId).append("\t") + .append(sdf.format(new Date(startTime))).append("\t") + .append(sdf.format(new Date(endTime))).append("\t") + .append(stats.getFeatures()).append("\n"); + sb.append("\n"); + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { + sb.append("Success!\n"); + } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Some jobs have failed! Stop running all dependent jobs\n"); + } else { + sb.append("Failed!\n"); + } + sb.append("\n"); + + Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); + jobPlanField.setAccessible(true); + PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats); + + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Job Stats (time in seconds):\n"); + sb.append(MRJobStats.SUCCESS_HEADER).append("\n"); + List arr = jobPlan.getSuccessfulJobs(); + for (JobStats js : arr) { + sb.append(js.getDisplayString()); + } + sb.append("\n"); + } + if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Failed Jobs:\n"); + sb.append(MRJobStats.FAILURE_HEADER).append("\n"); + List arr = jobPlan.getFailedJobs(); + for (JobStats js : arr) { + sb.append(js.getDisplayString()); + } + sb.append("\n"); + } + sb.append("Input(s):\n"); + for (InputStats is : stats.getInputStats()) { + sb.append(is.getDisplayString()); + } + sb.append("\n"); + sb.append("Output(s):\n"); + for (OutputStats ds : stats.getOutputStats()) { + sb.append(ds.getDisplayString()); + } + + sb.append("\nCounters:\n"); + sb.append("Total records written : " + stats.getRecordWritten()).append("\n"); + sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n"); + sb.append("Spillable Memory Manager spill count : " + + stats.getSMMSpillCount()).append("\n"); + sb.append("Total bags proactively spilled: " + + stats.getProactiveSpillCountObjects()).append("\n"); + sb.append("Total records proactively spilled: " + + stats.getProactiveSpillCountRecords()).append("\n"); + sb.append("\nJob DAG:\n").append(jobPlan.toString()); + + return "Script Statistics: \n" + sb.toString(); + } catch (Exception e) { + LOGGER.error("Can not extract message from SimplePigStats", e); + return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); + } + } + + private static String extractFromTezPigStats(TezPigScriptStats stats) { + + try { + Field userIdField = PigStats.class.getDeclaredField("userId"); + userIdField.setAccessible(true); + String userId = (String) (userIdField.get(stats)); + Field startTimeField = PigStats.class.getDeclaredField("startTime"); + startTimeField.setAccessible(true); + long startTime = (Long) (startTimeField.get(stats)); + Field endTimeField = PigStats.class.getDeclaredField("endTime"); + endTimeField.setAccessible(true); + long endTime = (Long) (endTimeField.get(stats)); + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + StringBuilder sb = new StringBuilder(); + sb.append("\n"); + sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId)); + sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName())); + sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime)))); + sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime)))); + sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures())); + sb.append("\n"); + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { + sb.append("Success!\n"); + } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Some tasks have failed! Stop running all dependent tasks\n"); + } else { + sb.append("Failed!\n"); + } + sb.append("\n"); + + // Print diagnostic info in case of failure + if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + if (stats.getErrorMessage() != null) { + String[] lines = stats.getErrorMessage().split("\n"); + for (int i = 0; i < lines.length; i++) { + String s = lines[i].trim(); + if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) { + sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s)); + } + } + sb.append("\n"); + } + } + + Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); + tezDAGStatsMapField.setAccessible(true); + Map tezDAGStatsMap = + (Map) tezDAGStatsMapField.get(stats); + int count = 0; + for (TezDAGStats dagStats : tezDAGStatsMap.values()) { + sb.append("\n"); + sb.append("DAG " + count++ + ":\n"); + sb.append(dagStats.getDisplayString()); + sb.append("\n"); + } + + sb.append("Input(s):\n"); + for (InputStats is : stats.getInputStats()) { + sb.append(is.getDisplayString().trim()).append("\n"); + } + sb.append("\n"); + sb.append("Output(s):\n"); + for (OutputStats os : stats.getOutputStats()) { + sb.append(os.getDisplayString().trim()).append("\n"); + } + return "Script Statistics:\n" + sb.toString(); + } catch (Exception e) { + LOGGER.error("Can not extract message from SimplePigStats", e); + return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); + } + } + + public static List extractJobIds(PigStats stat) { + if (stat instanceof SimplePigStats) { + return extractJobIdsFromSimplePigStats((SimplePigStats) stat); + } else if (stat instanceof TezPigScriptStats) { + return extractJobIdsFromTezPigStats((TezPigScriptStats) stat); + } else { + throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName()); + } + } + + public static List extractJobIdsFromSimplePigStats(SimplePigStats stat) { + List jobIds = new ArrayList<>(); + try { + Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); + jobPlanField.setAccessible(true); + PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat); + List arr = jobPlan.getJobList(); + for (JobStats js : arr) { + jobIds.add(js.getJobId()); + } + return jobIds; + } catch (Exception e) { + throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e); + } + } + + public static List extractJobIdsFromTezPigStats(TezPigScriptStats stat) { + List jobIds = new ArrayList<>(); + try { + Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); + tezDAGStatsMapField.setAccessible(true); + Map tezDAGStatsMap = + (Map) tezDAGStatsMapField.get(stat); + for (TezDAGStats dagStats : tezDAGStatsMap.values()) { + LOGGER.info("Tez JobId:" + dagStats.getJobId()); + jobIds.add(dagStats.getJobId()); + } + return jobIds; + } catch (Exception e) { + throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e); + } + } +} diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json new file mode 100644 index 00000000000..55f02462e51 --- /dev/null +++ b/pig/src/main/resources/interpreter-setting.json @@ -0,0 +1,46 @@ +[ + { + "group": "pig", + "name": "script", + "className": "org.apache.zeppelin.pig.PigInterpreter", + "properties": { + "zeppelin.pig.execType": { + "envName": null, + "propertyName": "zeppelin.pig.execType", + "defaultValue": "mapreduce", + "description": "local | mapreduce | tez" + }, + "zeppelin.pig.includeJobStats": { + "envName": null, + "propertyName": "zeppelin.pig.includeJobStats", + "defaultValue": "false", + "description": "flag to include job stats in output" + } + }, + "editor": { + "language": "pig" + } + }, + { + "group": "pig", + "name": "query", + "className": "org.apache.zeppelin.pig.PigQueryInterpreter", + "properties": { + "zeppelin.pig.execType": { + "envName": null, + "propertyName": "zeppelin.pig.execType", + "defaultValue": "mapreduce", + "description": "local | mapreduce | tez" + }, + "zeppelin.pig.maxResult": { + "envName": null, + "propertyName": "zeppelin.pig.maxResult", + "defaultValue": "20", + "description": "max row number for %pig.query" + } + }, + "editor": { + "language": "pig" + } + } +] diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java index 03bef545226..3d062d61579 100644 --- a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java @@ -6,120 +6,150 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.pig; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.Properties; -import java.io.PrintWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.io.IOException; -import java.io.StringWriter; -import org.apache.commons.io.FileUtils; -import static org.apache.zeppelin.pig.PigInterpreter.*; +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import static org.junit.Assert.*; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class PigInterpreterTest { - private static PigInterpreter pig; - private static InterpreterContext context; - private static final String PASSWD_FILE = "/tmp/tmp_zeppelin_dummypasswd"; - private static final String USERS_FILE = "/tmp/tmp_zeppelin_dummyusers"; + private PigInterpreter pigInterpreter; + private InterpreterContext context; - @BeforeClass - public static void setUp() { + @Before + public void setUp() { Properties properties = new Properties(); - properties.put(PIG_START_EXE, DEFAULT_START_EXE); - properties.put(PIG_START_ARGS, DEFAULT_START_ARGS); - properties.put(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS); - - pig = new PigInterpreter(properties); - pig.open(); - - context = new InterpreterContext(null, null, null, null, null, null, null, null); + properties.put("zeppelin.pig.execType", "local"); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, + null, null); } - @AfterClass - public static void tearDown() { - pig.close(); - pig.destroy(); - try { - org.apache.commons.io.FileUtils.forceDelete(new File(PASSWD_FILE)); - org.apache.commons.io.FileUtils.forceDelete(new File(USERS_FILE)); - } catch (IOException e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - fail("Unable to cleanup temp pig files:\n" + sw.toString()); - } + @After + public void tearDown() { + pigInterpreter.close(); } - /** - * Extract users from a dummy passwd file - * https://pig.apache.org/docs/r0.10.0/start.html#run - */ @Test - public void testExtractUsers() { - PrintWriter out = null; - StringWriter sw = new StringWriter(); - - try { - out = new PrintWriter(new File(PASSWD_FILE)); - out.println("user1:pass1"); - out.println("user2:pass2"); - out.println("user3:pass3"); - out.println("user4:pass4"); - out.flush(); - } catch (FileNotFoundException e) { - e.printStackTrace(new PrintWriter(sw)); - fail("Unable to write to "+PASSWD_FILE+":\n" + sw.toString()); - } finally { - if (out != null){ - out.close(); - } - } - - try { - FileUtils.deleteDirectory(new File(USERS_FILE)); - } catch (IOException e) { - e.printStackTrace(new PrintWriter(sw)); - fail("Unable to delete: "+USERS_FILE+". Error: " + sw.toString()); - } - String pigScript = "A = load 'file://"+PASSWD_FILE+"' using PigStorage(':');"; - pigScript += "B = foreach A generate $0 as id;"; - pigScript += "store B into 'file://"+USERS_FILE+"';"; - - InterpreterResult result = pig.interpret(pigScript, context); - - String readusers = null; - try { - readusers = new String(Files.readAllBytes(Paths.get(USERS_FILE+"/part-m-00000"))); - } catch (IOException e) { - e.printStackTrace(new PrintWriter(sw)); - fail("Unable read output of pig job from: "+USERS_FILE+"/part-m-00000. Error:\n" + sw.toString()); - } - assertEquals(readusers, "user1\nuser2\nuser3\nuser4\n"); + public void testBasics() throws IOException { + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("a: {id: int,name: bytearray}")); + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Input path does not exist")); } + @Test + public void testIncludeJobStats() throws IOException { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + properties.put("zeppelin.pig.includeJobStats", "true"); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("Counters:")); + assertTrue(result.message().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + // no job is launched, so no jobStats + assertTrue(!result.message().contains("Counters:")); + assertTrue(result.message().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + // no job is launched, so no jobStats + assertTrue(!result.message().contains("Counters:")); + assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Counters:")); + assertTrue(result.message().contains("Input path does not exist")); + } } diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java new file mode 100644 index 00000000000..00ece440542 --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class PigQueryInterpreterTest { + + private PigInterpreter pigInterpreter; + private PigQueryInterpreter pigQueryInterpreter; + private InterpreterContext context; + + @Before + public void setUp() { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + properties.put("zeppelin.pig.maxResult", "20"); + + pigInterpreter = new PigInterpreter(properties); + pigQueryInterpreter = new PigQueryInterpreter(properties); + List interpreters = new ArrayList(); + interpreters.add(pigInterpreter); + interpreters.add(pigQueryInterpreter); + InterpreterGroup group = new InterpreterGroup(); + group.put("note_id", interpreters); + pigInterpreter.setInterpreterGroup(group); + pigQueryInterpreter.setInterpreterGroup(group); + pigInterpreter.open(); + pigQueryInterpreter.open(); + + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, + null, null); + } + + @After + public void tearDown() { + pigInterpreter.close(); + pigQueryInterpreter.close(); + } + + @Test + public void testBasics() throws IOException { + String content = "andy\tmale\t10\n" + + "peter\tmale\t20\n" + + "amy\tfemale\t14\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // run script in PigInterpreter + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n" + + "a2 = load 'invalid_path' as (name, gender, age);\n" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)")); + + // run single line query in PigQueryInterpreter + String query = "foreach a generate name, age;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message()); + + // run multiple line query in PigQueryInterpreter + query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message()); + + // syntax error in PigQueryInterpereter + query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema")); + + // execution error in PigQueryInterpreter + query = "foreach a2 generate name, age;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().contains("Input path does not exist")); + } + + @Test + public void testMaxResult() throws IOException { + StringBuilder content = new StringBuilder(); + for (int i=0;i<30;++i) { + content.append(i + "\tname_" + i + "\n"); + } + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // run script in PigInterpreter + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // empty output + assertTrue(result.message().isEmpty()); + + // run single line query in PigQueryInterpreter + String query = "foreach a generate id;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("id\n0\n1\n2")); + assertTrue(result.message().contains("Results are limited by 20")); + } +} diff --git a/pig/src/test/resources/log4j.properties b/pig/src/test/resources/log4j.properties new file mode 100644 index 00000000000..8daee59d60d --- /dev/null +++ b/pig/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 583a5ef6474..414aed2a5bb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -545,7 +545,7 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery,beam"), + + "scalding,jdbc,hbase,bigquery,beam,pig"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen From 05a3b9b0a76423660497dfb92cf773cec1687871 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sat, 8 Oct 2016 11:45:22 +0800 Subject: [PATCH 05/14] add pig.md --- docs/interpreter/pig.md | 83 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 docs/interpreter/pig.md diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md new file mode 100644 index 00000000000..d6c94ce69fc --- /dev/null +++ b/docs/interpreter/pig.md @@ -0,0 +1,83 @@ +--- +layout: page +title: "Pig Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + + +## Pig nterpreter for Apache Zeppelin +[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets. + +## Supported interpreter type + - %pig.script (default) - All the pig script can run in the type of interpreter, and display type if plain text. + - %pig.query - Almost the same as %pig.script. the only difference is that you don't need to add alias in the last statement. And the display type is table. + + +## Supported runtime mode + - Local + - MapReduce + - Tez (Only Tez 0.7 is supported) + +### How to setup Pig + +- Local Mode +Nothing needs to be done for local mode + +- MapReduce Mode +HADOOP_CONF_DIR needs to be specified in `zeppelin-env.sh` + +- Tez Mode +HADOOP_CONF_DIR and TEZ_CONF_DIR needs to be specified in `zeppelin-env.sh` + +### How to configure interpreter + +At the Interpreters menu, you have to create a new Pig interpreter and provide next properties: + + + + + + + + + + + + + + + + + + + + + + + +
PropertyDefaultDescription
zeppelin.pig.execTypemapreduceExecution mode for pig runtime. Local | mapreduce | tez
zeppelin.pig.includeJobStatsfalsewhether display jobStats info in %pig
zeppelin.pig.maxResult20max row number displayed in %pig.query
+ +### How to use + +**pig** + +``` +%pig + +raw_data = load 'dataset/sf_crime/train.csv' using PigStorage(',') as (Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y); +b = group raw_data all; +c = foreach b generate COUNT($1); +dump c; +``` + +**pig.query** +``` +b = foreach raw_data generate Category; +c = group b by Category; +foreach c generate group as category, COUNT($1) as count; +``` + + +Data is shared between %pig and %pig.query, so that you can do some common work in %pig, and do different kinds of query based on the data of %pig. From 39f161a36aad72c590a2feef4c8f7b4afefe72ec Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sat, 8 Oct 2016 16:09:33 +0800 Subject: [PATCH 06/14] address comments --- docs/interpreter/pig.md | 2 ++ pig/pom.xml | 14 -------------- .../org/apache/zeppelin/pig/PigInterpreter.java | 3 ++- .../apache/zeppelin/pig/PigQueryInterpreter.java | 1 + .../java/org/apache/zeppelin/pig/PigUtils.java | 4 +++- 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index d6c94ce69fc..96146570e20 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -74,6 +74,8 @@ dump c; **pig.query** ``` +%pig.query + b = foreach raw_data generate Category; c = group b by Category; foreach c generate group as category, COUNT($1) as count; diff --git a/pig/pom.xml b/pig/pom.xml index d54d1ede3ac..78cf77199ca 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -122,20 +122,6 @@ test - - - - - - - - - - - - - - diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 1f43e23cc84..92cafc56a0c 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -94,6 +94,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr if (!fe.getMessage().contains("Backend error :")) { // If the error message contains "Backend error :", that means the exception is from // backend. + LOGGER.error("Fail to run pig script.", e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } @@ -101,7 +102,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr if (stats != null) { String errorMsg = PigUtils.extactJobStats(stats); if (errorMsg != null) { - LOGGER.debug("Error Message:" + errorMsg); + LOGGER.error("Fail to run pig script, " + errorMsg); return new InterpreterResult(Code.ERROR, errorMsg); } } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index 164b8015c92..dc121a35768 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -126,6 +126,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { if (e instanceof FrontendException) { FrontendException fe = (FrontendException) e; if (!fe.getMessage().contains("Backend error :")) { + LOGGER.error("Fail to run pig script.", e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java index bcabcee4751..d444e0279d8 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java @@ -267,6 +267,7 @@ public static List extractJobIdsFromSimplePigStats(SimplePigStats stat) } return jobIds; } catch (Exception e) { + LOGGER.error("Can not extract jobIds from SimpelPigStats", e); throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e); } } @@ -279,11 +280,12 @@ public static List extractJobIdsFromTezPigStats(TezPigScriptStats stat) Map tezDAGStatsMap = (Map) tezDAGStatsMapField.get(stat); for (TezDAGStats dagStats : tezDAGStatsMap.values()) { - LOGGER.info("Tez JobId:" + dagStats.getJobId()); + LOGGER.debug("Tez JobId:" + dagStats.getJobId()); jobIds.add(dagStats.getJobId()); } return jobIds; } catch (Exception e) { + LOGGER.error("Can not extract jobIds from TezPigScriptStats", e); throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e); } } From 5e2e222555dabfb66a7981e5d7d0d0f659ca1144 Mon Sep 17 00:00:00 2001 From: AhyoungRyu Date: Sun, 9 Oct 2016 21:35:07 +0900 Subject: [PATCH 07/14] Minor update for pig.md --- docs/interpreter/pig.md | 46 ++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 96146570e20..44c1a5506c0 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -1,40 +1,52 @@ --- layout: page title: "Pig Interpreter" -description: "" +description: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." group: manual --- {% include JB/setup %} -## Pig nterpreter for Apache Zeppelin +# Pig Interpreter for Apache Zeppelin + +

+ +## Overview [Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets. ## Supported interpreter type - - %pig.script (default) - All the pig script can run in the type of interpreter, and display type if plain text. - - %pig.query - Almost the same as %pig.script. the only difference is that you don't need to add alias in the last statement. And the display type is table. - + - `%pig.script` (default) + + All the pig script can run in the type of interpreter, and display type if plain text. + + - `%pig.query` + + Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table. ## Supported runtime mode - Local - MapReduce - Tez (Only Tez 0.7 is supported) +## How to use + ### How to setup Pig - Local Mode -Nothing needs to be done for local mode + + Nothing needs to be done for local mode - MapReduce Mode -HADOOP_CONF_DIR needs to be specified in `zeppelin-env.sh` + + HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. - Tez Mode -HADOOP_CONF_DIR and TEZ_CONF_DIR needs to be specified in `zeppelin-env.sh` -### How to configure interpreter + HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. -At the Interpreters menu, you have to create a new Pig interpreter and provide next properties: +### How to configure interpreter +At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default. @@ -50,18 +62,18 @@ At the Interpreters menu, you have to create a new Pig interpreter and provide n - + - +
zeppelin.pig.includeJobStats falsewhether display jobStats info in %pigwhether display jobStats info in %pig
zeppelin.pig.maxResult 20max row number displayed in %pig.querymax row number displayed in %pig.query
-### How to use +### Example -**pig** +##### pig ``` %pig @@ -72,7 +84,8 @@ c = foreach b generate COUNT($1); dump c; ``` -**pig.query** +##### pig.query + ``` %pig.query @@ -81,5 +94,4 @@ c = group b by Category; foreach c generate group as category, COUNT($1) as count; ``` - -Data is shared between %pig and %pig.query, so that you can do some common work in %pig, and do different kinds of query based on the data of %pig. +Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`. From df7a6db47ca2db5b8d3ed625000d6a25f035a8e5 Mon Sep 17 00:00:00 2001 From: AhyoungRyu Date: Sun, 9 Oct 2016 21:35:25 +0900 Subject: [PATCH 08/14] Add pig.md to dropdown menu --- docs/_includes/themes/zeppelin/_navigation.html | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index a0e4485db88..9abcdb177d1 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -62,6 +62,7 @@
  • Lens
  • Livy
  • Markdown
  • +
  • Pig
  • Python
  • Postgresql, HAWQ
  • R
  • From fe014a72b445f6977d00356bfe780c40dade0167 Mon Sep 17 00:00:00 2001 From: AhyoungRyu Date: Sun, 9 Oct 2016 21:39:41 +0900 Subject: [PATCH 09/14] Fix docs title in front matter --- docs/interpreter/pig.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 44c1a5506c0..aab48ecf104 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -1,6 +1,6 @@ --- layout: page -title: "Pig Interpreter" +title: "Pig Interpreter for Apache Zeppelin" description: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." group: manual --- From 58b4b2f26dd8f852d453854a37764a824903d9cd Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sun, 9 Oct 2016 21:28:41 +0800 Subject: [PATCH 10/14] minor update of docs --- docs/interpreter/pig.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index aab48ecf104..2a0ce9460e6 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -17,7 +17,7 @@ group: manual ## Supported interpreter type - `%pig.script` (default) - All the pig script can run in the type of interpreter, and display type if plain text. + All the pig script can run in this type of interpreter, and display type is plain text. - `%pig.query` @@ -57,12 +57,12 @@ At the Interpreters menu, you have to create a new Pig interpreter. Pig interpre zeppelin.pig.execType mapreduce - Execution mode for pig runtime. Local | mapreduce | tez + Execution mode for pig runtime. local | mapreduce | tez zeppelin.pig.includeJobStats false - whether display jobStats info in %pig + whether display jobStats info in %pig.script zeppelin.pig.maxResult From c85a09031a49add7f48114afe3ea43e4b572b0f2 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 10 Oct 2016 07:45:03 +0800 Subject: [PATCH 11/14] add license --- pig/pom.xml | 10 ++++------ zeppelin-distribution/src/bin_license/LICENSE | 10 +++++++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pig/pom.xml b/pig/pom.xml index 78cf77199ca..a4e5cbb38e1 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -57,15 +57,13 @@
    - org.apache.logging.log4j - log4j-api - 2.3 + org.slf4j + slf4j-api - org.apache.logging.log4j - log4j-core - 2.3 + org.slf4j + slf4j-log4j12 diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 2ee668a130e..e39a2ad0fc3 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -156,7 +156,15 @@ The following components are provided under Apache License. (Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/) (Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/) (Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/) - + (Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org) + (Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org) + ======================================================================== MIT licenses ======================================================================== From e858301bdfcac727fea8ef306f42ecd252ddfe94 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 10 Oct 2016 08:27:08 +0800 Subject: [PATCH 12/14] address comments --- bin/interpreter.sh | 10 +++++++--- .../java/org/apache/zeppelin/pig/PigInterpreter.java | 7 +++---- .../org/apache/zeppelin/pig/PigQueryInterpreter.java | 4 +++- pig/src/main/resources/interpreter-setting.json | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 894777dc716..4fb4b269265 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -164,9 +164,13 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then fi # autodetect TEZ_CONF_DIR - TEZ_CONF_DIR = ${TEZ_CONF_DIR:=/etc/tez/conf} - echo "TEZ_CONF_DIR:${TEZ_CONF_DIR}" - ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}" + if [[ -n "${TEZ_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}" + elif [[ -d "/etc/tez/conf" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":/etc/tez/conf" + else + echo "TEZ_CONF_DIR is not set, configuration might not be loaded" + fi fi addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}" diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 92cafc56a0c..8cd1efc929e 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -59,7 +59,8 @@ public void open() { try { pigServer = new PigServer(execType); } catch (IOException e) { - throw new RuntimeException("Fail to launch PigServer", e); + LOGGER.error("Fail to initialize PigServer", e); + throw new RuntimeException("Fail to initialize PigServer", e); } } @@ -106,6 +107,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr return new InterpreterResult(Code.ERROR, errorMsg); } } + LOGGER.error("Fail to run pig script.", e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { System.setOut(originalStdOut); @@ -122,9 +124,6 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr outputBuilder.append(jobStats); } } - if (!outputBuilder.toString().isEmpty() || !bytesOutput.toString().isEmpty()) { - outputBuilder.append("------------- Pig Output --------------\n"); - } outputBuilder.append(bytesOutput.toString()); return new InterpreterResult(Code.SUCCESS, outputBuilder.toString()); } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index dc121a35768..c763b7f9654 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -65,6 +65,7 @@ public void close() { @Override public InterpreterResult interpret(String st, InterpreterContext context) { + // '-' is invalid for pig alias String alias = "paragraph_" + context.getParagraphId().replace("-", "_"); String[] lines = st.split("\n"); List queries = new ArrayList(); @@ -126,7 +127,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { if (e instanceof FrontendException) { FrontendException fe = (FrontendException) e; if (!fe.getMessage().contains("Backend error :")) { - LOGGER.error("Fail to run pig script.", e); + LOGGER.error("Fail to run pig query.", e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } @@ -137,6 +138,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { return new InterpreterResult(Code.ERROR, errorMsg); } } + LOGGER.error("Fail to run pig query.", e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { listenerMap.remove(context.getParagraphId()); diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json index 55f02462e51..27918ede1bf 100644 --- a/pig/src/main/resources/interpreter-setting.json +++ b/pig/src/main/resources/interpreter-setting.json @@ -35,7 +35,7 @@ "zeppelin.pig.maxResult": { "envName": null, "propertyName": "zeppelin.pig.maxResult", - "defaultValue": "20", + "defaultValue": "1000", "description": "max row number for %pig.query" } }, From a1b742bceeff34aad1196ce2de9235810b4ce961 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 10 Oct 2016 15:42:30 +0800 Subject: [PATCH 13/14] minor update on doc --- docs/interpreter/pig.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 2a0ce9460e6..227656bba78 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -66,7 +66,7 @@ At the Interpreters menu, you have to create a new Pig interpreter. Pig interpre zeppelin.pig.maxResult - 20 + 1000 max row number displayed in %pig.query From 73a07f055941a542ccf3f91cfe52c99c18c8f8a0 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 11 Oct 2016 10:27:39 +0800 Subject: [PATCH 14/14] minor update --- conf/interpreter-list | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/interpreter-list b/conf/interpreter-list index 72349b4b0c8..38cb386d8cd 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -25,7 +25,6 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandr elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11 -pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter @@ -33,6 +32,7 @@ kylin org.apache.zeppelin:zeppelin-kylin:0.6.1 Kylin in lens org.apache.zeppelin:zeppelin-lens:0.6.1 Lens interpreter livy org.apache.zeppelin:zeppelin-livy:0.6.1 Livy interpreter md org.apache.zeppelin:zeppelin-markdown:0.6.1 Markdown support +pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter postgresql org.apache.zeppelin:zeppelin-postgresql:0.6.1 Postgresql interpreter python org.apache.zeppelin:zeppelin-python:0.6.1 Python interpreter shell org.apache.zeppelin:zeppelin-shell:0.6.1 Shell command