From 865e6ab5970d79613f19a0afadb0c953efc2730b Mon Sep 17 00:00:00 2001 From: Raj Bains Date: Wed, 2 Sep 2015 23:50:56 -0700 Subject: [PATCH 1/3] Add File Interpreter, HDFS Interpreter and Tests --- file/pom.xml | 157 ++++++++++++ .../apache/zeppelin/file/FileInterpreter.java | 170 +++++++++++++ .../org/apache/zeppelin/file/HDFSCommand.java | 151 ++++++++++++ .../zeppelin/file/HDFSFileInterpreter.java | 225 ++++++++++++++++++ .../file/HDFSFileInterpreterTest.java | 210 ++++++++++++++++ pom.xml | 1 + 6 files changed, 914 insertions(+) create mode 100644 file/pom.xml create mode 100644 file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java create mode 100644 file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java create mode 100644 file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java create mode 100644 file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java diff --git a/file/pom.xml b/file/pom.xml new file mode 100644 index 00000000000..bfc6dd84bb7 --- /dev/null +++ b/file/pom.xml @@ -0,0 +1,157 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-file + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: File Manager + http://www.apache.org + + + 0.14.0 + 2.6.0 + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.1 + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + org.apache.hadoop + hadoop-common + ${hive.hadoop.version} + + + + org.apache.hadoop + hadoop-hdfs + ${hive.hadoop.version} + + + + + + junit + junit + test + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18.1 + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/file + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/file + false + false + true + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + diff --git a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java new file mode 100644 index 00000000000..a1c45503e50 --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; + +/** + * File interpreter for Zeppelin. + * + * @author rajbains + */ +public abstract class FileInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(FileInterpreter.class); + String currentDir = null; + CommandArgs args = null; + + public FileInterpreter(Properties property) { + super(property); + currentDir = new String("/"); + } + + /** + * Handling the arguments of the command + */ + public class CommandArgs { + public String input = null; + public String command = null; + public ArrayList args = null; + public HashSet flags = null; + + public CommandArgs(String cmd) { + input = cmd; + args = new ArrayList(); + flags = new HashSet(); + } + + private void parseArg(String arg) { + if (arg.charAt(0) == '-') { // handle flags + for (int i = 0; i < arg.length(); i++) { + Character c = arg.charAt(i); + flags.add(c); + } + } else { // handle other args + args.add(arg); + } + } + + public void parseArgs() { + if (input == null) + return; + StringTokenizer st = new StringTokenizer(input); + if (st.hasMoreTokens()) { + command = st.nextToken(); + while (st.hasMoreTokens()) + parseArg(st.nextToken()); + } + } + } + + // Functions that each file system implementation must override + + public abstract String listAll(String path); + + public abstract boolean isDirectory(String path); + + // Combine paths, takes care of arguments such as .. + + private String getNewPath(String argument){ + Path arg = Paths.get(argument); + Path ret = arg.isAbsolute() ? arg : Paths.get(currentDir, argument); + return ret.normalize().toString(); + } + + // Handle the command handling uniformly across all file systems + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Run File command '" + cmd + "'"); + + args = new CommandArgs(cmd); + args.parseArgs(); + + if (args.command == null) { + logger.info("Error: No command"); + return new InterpreterResult(Code.ERROR, Type.TEXT, "No command"); + } + + // Simple parsing of the command + + if (args.command.equals("cd")) { + + String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir; + if (!isDirectory(newPath)) + return new InterpreterResult(Code.ERROR, Type.TEXT, "Invalid Directory"); + + currentDir = newPath; + return new InterpreterResult(Code.SUCCESS, Type.TEXT, "OK"); + + } else if (args.command.equals("ls")) { + + String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir; + if (!isDirectory(newPath)) + return new InterpreterResult(Code.ERROR, Type.TEXT, "Invalid List Directory"); + + String results = listAll(newPath); + return new InterpreterResult(Code.SUCCESS, Type.TEXT, results); + + } else if (args.command.equals("pwd")) { + + return new InterpreterResult(Code.SUCCESS, Type.TEXT, currentDir); + + } else { + + return new InterpreterResult(Code.ERROR, Type.TEXT, "Unknown command"); + + } + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + FileInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } +} diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java new file mode 100644 index 00000000000..b3e8ad4ba9d --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import java.net.URL; +import java.net.HttpURLConnection; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import javax.ws.rs.core.UriBuilder; +import org.slf4j.Logger; + +/** + * Definition and HTTP invocation methods for all WebHDFS commands + * + * @author rajbains + * + */ +public class HDFSCommand { + + /** + * Type of HTTP request + */ + public enum HttpType { + GET, + PUT + } + + /** + * Definition of WebHDFS operator + */ + public class Op { + public String op; + public HttpType cmd; + public int minArgs; + + public Op(String op, HttpType cmd, int minArgs) { + this.op = op; + this.cmd = cmd; + this.minArgs = minArgs; + } + } + + /** + * Definition of argument to an operator + */ + public class Arg { + public String key; + public String value; + + public Arg(String key, String value) { + this.key = key; + this.value = value; + } + } + + // How to connect to WebHDFS + String url = null; + String user = null; + Logger logger; + + // Define all the commands available + public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0); + public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0); + + public HDFSCommand(String url, String user, Logger logger) { + super(); + this.url = url; + this.user = user; + this.logger = logger; + } + + public String checkArgs(Op op, String path, Arg[] args) throws Exception { + if (op == null || + path == null || + (op.minArgs > 0 && + (args == null || + args.length != op.minArgs))) + { + String a = ""; + a = (op != null) ? a + op.op + "\n" : a; + a = (path != null) ? a + path + "\n" : a; + a = (args != null) ? a + args + "\n" : a; + return a; + } + return null; + } + + + // The operator that runs all commands + public String runCommand(Op op, String path, Arg[] args) throws Exception { + + // Check arguments + String error = checkArgs(op, path, args); + if (error != null) { + logger.error("Bad arguments to command: " + error); + return "ERROR: BAD ARGS"; + } + + // Build URI + UriBuilder builder = UriBuilder + .fromPath(url) + .path(path) + .queryParam("op", op.op); + + if (args != null) { + for (Arg a : args) { + builder = builder.queryParam(a.key, a.value); + } + } + java.net.URI uri = builder.build(); + + // Connect and get response string + URL hdfsUrl = uri.toURL(); + HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection(); + + if (op.cmd == HttpType.GET) { + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + logger.info("Sending 'GET' request to URL : " + hdfsUrl); + logger.info("Response Code : " + responseCode); + + BufferedReader in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuffer response = new StringBuffer(); + + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + return response.toString(); + } + return null; + } +} diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java new file mode 100644 index 00000000000..fcabc3797c9 --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; +import com.google.gson.Gson; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; + +/** + * HDFS implementation of File interpreter for Zeppelin. + * + * @author rajbains + * + */ +public class HDFSFileInterpreter extends FileInterpreter { + static final String HDFS_URL = "hdfs.url"; + static final String HDFS_USER = "hdfs.user"; + + static { + Interpreter.register( + "hdfs", + "hdfs", + HDFSFileInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(HDFS_URL, "http://c6401.ambari.apache.org:50070/webhdfs/v1/", + "The URL for WebHDFS") + .add(HDFS_USER, "hdfs", "The WebHDFS user").build()); + } + + Exception exceptionOnConnect = null; + HDFSCommand cmd = null; + Gson gson = null; + + public void prepare() { + String userName = getProperty(HDFS_USER); + String hdfsUrl = getProperty(HDFS_URL); + cmd = new HDFSCommand(hdfsUrl, userName, logger); + gson = new Gson(); + } + + public HDFSFileInterpreter(Properties property){ + super(property); + prepare(); + } + + /** + * Status of one file + * + * matches returned JSON + */ + public class OneFileStatus { + public long accessTime; + public int blockSize; + public int childrenNum; + public int fileId; + public String group; + public long length; + public long modificationTime; + public String owner; + public String pathSuffix; + public String permission; + public int replication; + public int storagePolicy; + public String type; + public String toString() { + String str = ""; + str += "\nAccessTime = " + accessTime; + str += "\nBlockSize = " + blockSize; + str += "\nChildrenNum = " + childrenNum; + str += "\nFileId = " + fileId; + str += "\nGroup = " + group; + str += "\nLength = " + length; + str += "\nModificationTime = " + modificationTime; + str += "\nOwner = " + owner; + str += "\nPathSuffix = " + pathSuffix; + str += "\nPermission = " + permission; + str += "\nReplication = " + replication; + str += "\nStoragePolicy = " + storagePolicy; + str += "\nType = " + type; + return str; + } + } + + /** + * Status of one file + * + * matches returned JSON + */ + public class SingleFileStatus { + public OneFileStatus FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class MultiFileStatus { + public OneFileStatus[] FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class AllFileStatus { + public MultiFileStatus FileStatuses; + } + + // tests whether we're able to connect to HDFS + + private void testConnection() { + try { + if (isDirectory("/")) + logger.info("Successfully created WebHDFS connection"); + } catch (Exception e) { + logger.error("testConnection: Cannot open WebHDFS connection. Bad URL: " + "/", e); + exceptionOnConnect = e; + } + } + + @Override + public void open() { + testConnection(); + } + + @Override + public void close() { + } + + private String listDir(String path) throws Exception { + return cmd.runCommand(cmd.listStatus, path, null); + } + + private String listPermission(OneFileStatus fs){ + String s = ""; + s += fs.type.equalsIgnoreCase("Directory") ? 'd' : '-'; + int p = Integer.parseInt(fs.permission, 16); + s += ((p & 0x400) == 0) ? '-' : 'r'; + s += ((p & 0x200) == 0) ? '-' : 'w'; + s += ((p & 0x100) == 0) ? '-' : 'x'; + s += ((p & 0x40) == 0) ? '-' : 'r'; + s += ((p & 0x20) == 0) ? '-' : 'w'; + s += ((p & 0x10) == 0) ? '-' : 'x'; + s += ((p & 0x4) == 0) ? '-' : 'r'; + s += ((p & 0x2) == 0) ? '-' : 'w'; + s += ((p & 0x1) == 0) ? '-' : 'x'; + return s; + } + private String listDate(OneFileStatus fs) { + return new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(fs.modificationTime)); + } + private String ListOne(String path, OneFileStatus fs) { + if (args.flags.contains(new Character('l'))) { + String s = ""; + s += listPermission(fs) + "\t "; + s += ((fs.replication == 0) ? "-" : fs.replication) + "\t "; + s += fs.owner + "\t "; + s += fs.group + "\t "; + s += fs.length + "\t "; + s += listDate(fs) + " GMT\t "; + s += (path.length() == 1) ? path + fs.pathSuffix : path + '/' + fs.pathSuffix; + return s; + } + return fs.pathSuffix; + } + + public String listAll(String path) { + String all = ""; + if (exceptionOnConnect != null) + return all; + try { + String sfs = listDir(path); + if (sfs != null) { + AllFileStatus allFiles = gson.fromJson(sfs, AllFileStatus.class); + + if (allFiles != null && + allFiles.FileStatuses != null && + allFiles.FileStatuses.FileStatus != null) + { + for (OneFileStatus fs : allFiles.FileStatuses.FileStatus) + all = all + ListOne(path, fs) + '\n'; + } + } + } catch (Exception e) { + logger.error("listall: listDir " + path, e); + } + return all; + } + + public boolean isDirectory(String path) { + boolean ret = false; + if (exceptionOnConnect != null) + return ret; + try { + String str = cmd.runCommand(cmd.getFileStatus, path, null); + SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class); + if (sfs != null) + return sfs.FileStatus.type.equals("DIRECTORY"); + } catch (Exception e) { + logger.error("IsDirectory: " + path, e); + } + return ret; + } +} diff --git a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java new file mode 100644 index 00000000000..29eee87bdf1 --- /dev/null +++ b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import com.google.gson.Gson; +import junit.framework.TestCase; +import static org.junit.Assert.*; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.Test; +import org.slf4j.Logger; +import java.util.HashMap; +import java.util.Properties; +import java.lang.Override; +import java.lang.String; + + +/** + * Tests Hive Interpreter by running pre-determined commands against mock file system + * + * Created by rajbains on 8/29/15. + */ +public class HDFSFileInterpreterTest extends TestCase { + + @Test + public void test() { + HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); + t.open(); + + // We have info for /, /user, /tmp, /mr-history/done + + // Ensure + // 1. ls -l works + // 2. paths (. and ..) are correctly handled + // 3. flags and arguments to commands are correctly handled + + InterpreterResult result1 = t.interpret("ls -l /", null); + assertEquals(result1.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result2 = t.interpret("ls -l /./user/..", null); + assertEquals(result2.type(), InterpreterResult.Type.TEXT); + + assertEquals(result1.message(), result2.message()); + + // Ensure you can do cd and after that the ls uses current directory correctly + + InterpreterResult result3 = t.interpret("cd user", null); + assertEquals(result3.type(), InterpreterResult.Type.TEXT); + assertEquals(result3.message(), "OK"); + + InterpreterResult result4 = t.interpret("ls", null); + assertEquals(result4.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result5 = t.interpret("ls /user", null); + assertEquals(result5.type(), InterpreterResult.Type.TEXT); + + assertEquals(result4.message(), result5.message()); + + // Ensure pwd works correctly + + InterpreterResult result6 = t.interpret("pwd", null); + assertEquals(result6.type(), InterpreterResult.Type.TEXT); + assertEquals(result6.message(), "/user"); + + // Move a couple of levels and check we're in the right place + + InterpreterResult result7 = t.interpret("cd ../mr-history/done", null); + assertEquals(result7.type(), InterpreterResult.Type.TEXT); + assertEquals(result7.message(), "OK"); + + InterpreterResult result8 = t.interpret("ls -l ", null); + assertEquals(result8.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result9 = t.interpret("ls -l /mr-history/done", null); + assertEquals(result9.type(), InterpreterResult.Type.TEXT); + + assertEquals(result8.message(), result9.message()); + + InterpreterResult result10 = t.interpret("cd ../..", null); + assertEquals(result10.type(), InterpreterResult.Type.TEXT); + assertEquals(result7.message(), "OK"); + + InterpreterResult result11 = t.interpret("ls -l ", null); + assertEquals(result11.type(), InterpreterResult.Type.TEXT); + + // we should be back to first result after all this navigation + assertEquals(result1.message(), result11.message()); + + t.close(); + } + } + + /** + * Store command results from curl against a real file system + */ + class MockFileSystem { + HashMap mfs = new HashMap(); + void addListStatusData() { + mfs.put("/?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672,\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045,\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336,\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346,\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089,\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792,\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + "]}}" + ); + mfs.put("/user?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263,\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + " ]}}" + ); + mfs.put("/tmp?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0,\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645,\"modificationTime\":1441253097517,\"owner\":\"hdfs\",\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\",\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" + + " ]}}" + ); + mfs.put("/mr-history/done?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481,\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + "]}}" + ); + } + void addGetFileStatusData() { + mfs.put("/?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/user?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/tmp?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/mr-history/done?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480,\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + } + public void addMockData(HDFSCommand.Op op) { + if (op.op.equals("LISTSTATUS")) { + addListStatusData(); + } else if (op.op.equals("GETFILESTATUS")) { + addGetFileStatusData(); + } + // do nothing + } + public String get(String key) { + return mfs.get(key); + } + } + + /** + * Run commands against mock file system that simulates webhdfs responses + */ + class MockHDFSCommand extends HDFSCommand { + MockFileSystem fs = null; + + public MockHDFSCommand(String url, String user, Logger logger) { + super(url, user, logger); + fs = new MockFileSystem(); + fs.addMockData(getFileStatus); + fs.addMockData(listStatus); + } + + @Override + public String runCommand(Op op, String path, Arg[] args) throws Exception { + + String error = checkArgs(op, path, args); + assertNull(error); + + String c = path + "?op=" + op.op; + + if (args != null) { + for (Arg a : args) { + c += "&" + a.key + "=" + a.value; + } + } + return fs.get(c); + } + } + + /** + * Mock Interpreter - uses Mock HDFS command + */ + class MockHDFSFileInterpreter extends HDFSFileInterpreter { + + @Override + public void prepare() { + // Run commands against mock File System instead of WebHDFS + cmd = new MockHDFSCommand("", "", logger); + gson = new Gson(); + } + + public MockHDFSFileInterpreter(Properties property) { + super(property); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index ff651603f0e..7ba509a4a67 100755 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,7 @@ geode postgresql tajo + file flink ignite kylin From 7d61e5fb6ecb278f99fbec1af72fe983366e6a5a Mon Sep 17 00:00:00 2001 From: Raj Bains Date: Fri, 4 Sep 2015 21:29:59 -0700 Subject: [PATCH 2/3] This is the first reviewed version of File Interpreter that adds basic ls, cd and pwd functionality against WebHDFS. It addresses ZEPPELIN-198 --- conf/zeppelin-site.xml.template | 2 +- file/pom.xml | 24 ------------------- .../apache/zeppelin/file/FileInterpreter.java | 1 - .../org/apache/zeppelin/file/HDFSCommand.java | 13 ++++++---- .../zeppelin/file/HDFSFileInterpreter.java | 20 ++++++++-------- .../file/HDFSFileInterpreterTest.java | 5 ++-- 6 files changed, 22 insertions(+), 43 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 90989478f97..fece8bada8a 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -85,7 +85,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/file/pom.xml b/file/pom.xml index bfc6dd84bb7..01d2b7b4ba6 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -32,10 +32,6 @@ Zeppelin: File Manager http://www.apache.org - - 0.14.0 - 2.6.0 - org.apache.zeppelin @@ -44,12 +40,6 @@ provided - - org.apache.commons - commons-exec - 1.1 - - org.slf4j slf4j-api @@ -60,20 +50,6 @@ slf4j-log4j12 - - org.apache.hadoop - hadoop-common - ${hive.hadoop.version} - - - - org.apache.hadoop - hadoop-hdfs - ${hive.hadoop.version} - - - - junit junit diff --git a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java index a1c45503e50..867b8c70224 100644 --- a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java @@ -34,7 +34,6 @@ /** * File interpreter for Zeppelin. * - * @author rajbains */ public abstract class FileInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(FileInterpreter.class); diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java index b3e8ad4ba9d..94508cd0364 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java @@ -28,8 +28,6 @@ /** * Definition and HTTP invocation methods for all WebHDFS commands * - * @author rajbains - * */ public class HDFSCommand { @@ -72,16 +70,18 @@ public Arg(String key, String value) { // How to connect to WebHDFS String url = null; String user = null; + int maxLength = 0; Logger logger; // Define all the commands available public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0); public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0); - public HDFSCommand(String url, String user, Logger logger) { + public HDFSCommand(String url, String user, Logger logger, int maxLength) { super(); this.url = url; this.user = user; + this.maxLength = maxLength; this.logger = logger; } @@ -140,8 +140,13 @@ public String runCommand(Op op, String path, Arg[] args) throws Exception { String inputLine; StringBuffer response = new StringBuffer(); + int i = 0; while ((inputLine = in.readLine()) != null) { - response.append(inputLine); + if (inputLine.length() < maxLength) + response.append(inputLine); + i++; + if (i >= maxLength) + break; } in.close(); return response.toString(); diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java index fcabc3797c9..598eb4d076b 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -28,22 +28,21 @@ /** * HDFS implementation of File interpreter for Zeppelin. * - * @author rajbains - * */ public class HDFSFileInterpreter extends FileInterpreter { static final String HDFS_URL = "hdfs.url"; static final String HDFS_USER = "hdfs.user"; + static final String HDFS_MAXLENGTH = "hdfs.maxlength"; static { Interpreter.register( - "hdfs", - "hdfs", - HDFSFileInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add(HDFS_URL, "http://c6401.ambari.apache.org:50070/webhdfs/v1/", - "The URL for WebHDFS") - .add(HDFS_USER, "hdfs", "The WebHDFS user").build()); + "hdfs", + "hdfs", + HDFSFileInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(HDFS_URL, "http://localhost:50070/webhdfs/v1/", "The URL for WebHDFS") + .add(HDFS_USER, "hdfs", "The WebHDFS user") + .add(HDFS_MAXLENGTH, "1000", "Maximum number of lines of results fetched").build()); } Exception exceptionOnConnect = null; @@ -53,7 +52,8 @@ public class HDFSFileInterpreter extends FileInterpreter { public void prepare() { String userName = getProperty(HDFS_USER); String hdfsUrl = getProperty(HDFS_URL); - cmd = new HDFSCommand(hdfsUrl, userName, logger); + int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH)); + cmd = new HDFSCommand(hdfsUrl, userName, logger, i); gson = new Gson(); } diff --git a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java index 29eee87bdf1..3c87fa69cfa 100644 --- a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java +++ b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java @@ -31,9 +31,8 @@ /** - * Tests Hive Interpreter by running pre-determined commands against mock file system + * Tests Interpreter by running pre-determined commands against mock file system * - * Created by rajbains on 8/29/15. */ public class HDFSFileInterpreterTest extends TestCase { @@ -168,7 +167,7 @@ class MockHDFSCommand extends HDFSCommand { MockFileSystem fs = null; public MockHDFSCommand(String url, String user, Logger logger) { - super(url, user, logger); + super(url, user, logger, 1000); fs = new MockFileSystem(); fs.addMockData(getFileStatus); fs.addMockData(listStatus); From 70507a84b16463bbd94b749129d849204fdd36f9 Mon Sep 17 00:00:00 2001 From: Raj Bains Date: Thu, 10 Sep 2015 12:10:41 -0700 Subject: [PATCH 3/3] Add Documentation and a missing dependency for HDFS File Browser --- docs/docs/index.md | 1 + docs/docs/interpreter/hdfs.md | 68 +++++++++++++++++++++++++++++++++++ file/pom.xml | 6 ++++ 3 files changed, 75 insertions(+) create mode 100644 docs/docs/interpreter/hdfs.md diff --git a/docs/docs/index.md b/docs/docs/index.md index 5fa9c6d19b6..bbd1f74cef9 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -20,6 +20,7 @@ group: nav-right **[Interpreters in zeppelin](manual/interpreters.html)** * [flink](../docs/pleasecontribute.html) +* [hdfs](./interpreter/hdfs.html) * [hive](../docs/pleasecontribute.html) * [ignite](../docs/pleasecontribute.html) * [lens](../docs/pleasecontribute.html) diff --git a/docs/docs/interpreter/hdfs.md b/docs/docs/interpreter/hdfs.md new file mode 100644 index 00000000000..60a493c428d --- /dev/null +++ b/docs/docs/interpreter/hdfs.md @@ -0,0 +1,68 @@ +--- +layout: page +title: "HDFS File Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + + +## HDFS File Interpreter for Apache Zeppelin + +
+ + + + + + + + + + + +
NameClassDescription
%hdfsHDFSFileInterpreterProvides File System commands for HDFS
+ +
+This interpreter connects to HDFS using the HTTP WebHDFS interface. +It supports the basic shell file commands applied to HDFS, it currently only supports browsing +* You can use ls [PATH] and ls -l [PATH] to list a directory. If the path is missing, then the current directory is listed. +* You can use cd [PATH] to change your current directory by giving a relative or an absolute path. +* You can invoke pwd to see your current directory. + +### Create Interpreter + +You can create the HDFS browser by pointing it to the WebHDFS interface of your Hadoop cluster. + +### Configuration +You can modify the configuration of HDFS from the `Interpreter` section. The HDFS interpreter express the following properties: + + + + + + + + + + + + + + + + + + + + + + +
Property NameDescriptionDefault Value
hdfs.urlThe URL for WebHDFShttp://localhost:50070/webhdfs/v1/
hdfs.userThe WebHDFS userhdfs
hdfs.maxlengthMaximum number of lines of results fetched1000
+ + +#### WebHDFS REST API +You can confirm that you're able to access the WebHDFS API by running a curl command against the WebHDFS end point provided to the interpreter. + +Here is an example: +$> curl "http://localhost:50070/webhdfs/v1/?op=LISTSTATUS" \ No newline at end of file diff --git a/file/pom.xml b/file/pom.xml index 01d2b7b4ba6..8ff45385c3a 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -40,6 +40,12 @@ provided
+ + javax.ws.rs + javax.ws.rs-api + 2.0 + + org.slf4j slf4j-api