From aebf7a01196b7aa063d261f8816014ac17d6ea95 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 5 Jan 2017 17:18:19 -0800 Subject: [PATCH 01/21] Plumb through the jvm output, we could test this by overloading sys.stdout and launching an instance of spark and verifying we got the log info (TODO) --- python/pyspark/java_gateway.py | 37 ++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3c783ae541a1f..469dfc4705c58 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import print_function + import atexit import os import sys @@ -24,6 +26,8 @@ import socket import platform from subprocess import Popen, PIPE +from threading import Thread + if sys.version >= '3': xrange = range @@ -39,6 +43,9 @@ def launch_gateway(conf=None): :param conf: spark configuration passed to spark-submit :return: """ + # If running in ijupyter we need to copy through stdout/stderr + grab_jvm_output = type(sys.stderr) != file + if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) else: @@ -70,14 +77,36 @@ def launch_gateway(conf=None): # Launch the Java gateway. # We open a pipe to stdin so that the Java gateway can die when the pipe is broken + proc_kwargs = {"env": env, "stdin": PIPE} if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: + # However, preexec_fn not supported on Windows def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) - else: - # preexec_fn not supported on Windows - proc = Popen(command, stdin=PIPE, env=env) + + proc_kwargs["preexec_fn"] = preexec_func + + # If we need to copy stderr/stdout through, set up a pipe. + if grab_jvm_output: + proc_kwargs["stderr"] = PIPE + proc_kwargs["stdout"] = PIPE + + proc = Popen(command, **proc_kwargs) + + def connect(input_pipe, out_pipe): + """Connect the input pipe to the output. We can't use os.dup for IPython + or directly write to them (see https://github.com/ipython/ipython/pull/3072/).""" + for line in iter(input_pipe.readline, b''): + print(line, file=out_pipe) + input_pipe.close() + + if grab_jvm_output: + t = Thread(target=connect, args=(proc.stdout, sys.stdout)) + t.daemon = True + t.start() + t = Thread(target=connect, args=(proc.stderr, sys.stderr)) + t.daemon = True + t.start() gateway_port = None # We use select() here in order to avoid blocking indefinitely if the subprocess dies From 3e8d7eb2e33dbddbf7bcae2e97baaad7a6bc1908 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 5 Jan 2017 22:59:39 -0800 Subject: [PATCH 02/21] Python3 support, isn't necessarily a file type. Instead just look for overriden stdout --- python/pyspark/java_gateway.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 469dfc4705c58..f5fdcfd76ecbc 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -43,8 +43,9 @@ def launch_gateway(conf=None): :param conf: spark configuration passed to spark-submit :return: """ - # If running in ijupyter we need to copy through stdout/stderr - grab_jvm_output = type(sys.stderr) != file + # If sys.stdout has been changed the child processes JVM will not respect that + # so grab the jvm output and copy it over. This happens with Jupyter and similar systems. + grab_jvm_output = sys.stdout != sys.__stdout__ if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) From ca308b15c1b0b0571c3dee61d049739d84216c32 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 12 Jan 2017 11:01:20 -0800 Subject: [PATCH 03/21] Only redirect if in ZMQInteractiveShell (e.g. notebook) or StringIO (e.g. testing) and leave things like doctest mode alone [otherwise we'd have to rewrite all of the doctests] --- python/pyspark/java_gateway.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index f5fdcfd76ecbc..b1d785e31cabe 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -17,6 +17,7 @@ from __future__ import print_function + import atexit import os import sys @@ -44,8 +45,10 @@ def launch_gateway(conf=None): :return: """ # If sys.stdout has been changed the child processes JVM will not respect that - # so grab the jvm output and copy it over. This happens with Jupyter and similar systems. - grab_jvm_output = sys.stdout != sys.__stdout__ + # so grab the jvm output and copy it over if we are in a notebook. + redirect_shells = ["ZMQInteractiveShell", "StringIO"] + grab_jvm_output = (sys.stdout != sys.__stdout__ and + sys.stdout.__class__.__name__ in redirect_shells) if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) From 011863189c301d2e7673d906e238f3e99d8d28bb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 12 Jan 2017 11:02:24 -0800 Subject: [PATCH 04/21] We can't test this in the normal test classes since the JVM is already started, but we can verify it if we are launched without PYSAPRK_GATEWAY_PORT set --- dev/pip-sanity-check.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/dev/pip-sanity-check.py b/dev/pip-sanity-check.py index 430c2ab52766a..36c65525f6dab 100644 --- a/dev/pip-sanity-check.py +++ b/dev/pip-sanity-check.py @@ -17,20 +17,51 @@ from __future__ import print_function -from pyspark.sql import SparkSession +import os import sys +from pyspark.sql import SparkSession + +if sys.version >= "3": + from io import StringIO +else: + from StringIO import StringIO + if __name__ == "__main__": + gateway_already_started = "PYSPARK_GATEWAY_PORT" in os.environ + if not gateway_already_started: + _old_stdout = sys.stdout + _old_stderr = sys.stderr + # Verify stdout/stderr overwrite support for jupyter + sys.stdout = new_stdout = StringIO() + sys.stderr = new_stderr = StringIO() + spark = SparkSession\ .builder\ .appName("PipSanityCheck")\ .getOrCreate() + print("Spark context created") sc = spark.sparkContext rdd = sc.parallelize(range(100), 10) value = rdd.reduce(lambda x, y: x + y) + if (value != 4950): print("Value {0} did not match expected value.".format(value), file=sys.stderr) sys.exit(-1) + + if not gateway_already_started: + try: + rdd2 = rdd.map(lambda x: str(x).startsWith("expected error")) + rdd2.collect() + except: + pass + sys.stdout = _old_stdout + sys.stderr = _old_stderr + logs = new_stderr.getvalue() + new_stdout.getvalue() + + if logs.find("'str' object has no attribute 'startsWith'") == -1: + print("Failed to find helpful error message, redirect failed?") + sys.exit(-1) print("Successfully ran pip sanity check") spark.stop() From 733d5b955f28c77925c29ce5f2e39f2ce6359fbc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Feb 2017 08:13:30 -0800 Subject: [PATCH 05/21] Update the sanity check to standalone tests --- dev/run-pip-tests | 4 ++-- .../pyspark/standalone_tests.py | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) rename dev/pip-sanity-check.py => python/pyspark/standalone_tests.py (93%) diff --git a/dev/run-pip-tests b/dev/run-pip-tests index af1b1feb70cd1..e1e0765568f62 100755 --- a/dev/run-pip-tests +++ b/dev/run-pip-tests @@ -104,9 +104,9 @@ for python in "${PYTHON_EXECS[@]}"; do cd / echo "Run basic sanity check on pip installed version with spark-submit" - spark-submit "$FWDIR"/dev/pip-sanity-check.py + spark-submit "$FWDIR"/python/standalone_tests.py echo "Run basic sanity check with import based" - python "$FWDIR"/dev/pip-sanity-check.py + python "$FWDIR"/python/standalone_tests.py echo "Run the tests for context.py" python "$FWDIR"/python/pyspark/context.py diff --git a/dev/pip-sanity-check.py b/python/pyspark/standalone_tests.py similarity index 93% rename from dev/pip-sanity-check.py rename to python/pyspark/standalone_tests.py index 6eedc4a0423fb..0603e945d237f 100644 --- a/dev/pip-sanity-check.py +++ b/python/pyspark/standalone_tests.py @@ -15,6 +15,11 @@ # limitations under the License. # +""" +Standalone tests for PySpark - can be used to quickly test PySpark pip installation. When launched +without spark-submit verifies Jupyter redirection. +""" + from __future__ import print_function import os From 4aea95541bad8df89cc5b6dd13552ba0c4a6dca8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Feb 2017 08:37:28 -0800 Subject: [PATCH 06/21] Quick first pass at setting up standalone tests as well --- dev/sparktestsupport/modules.py | 6 ++++++ python/run-tests.py | 25 ++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 10ad1fe3aa2c6..644004d541ae8 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -343,6 +343,12 @@ def __hash__(self): ] ) +pyspark_standalone = Module( + name="pyspark-standalone", + dependencies=[pyspark_core], + source_file_regexes=["python/pyspark/(java_gateway.py|context.py)"], + python_test_goals=["pyspark.standalone_tests"] +) pyspark_sql = Module( name="pyspark-sql", diff --git a/python/run-tests.py b/python/run-tests.py index 38b3bb84c10be..8b11f3c1c6002 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -63,7 +63,11 @@ def print_red(text): raise Exception("Cannot find assembly build directory, please build Spark first.") -def run_individual_python_test(test_name, pyspark_python): +def run_generic_test(test_name, pyspark_python, launch_cmd): + """ + Run a generic python test. launch_cmd should be set to pyspark for normal tests or the same as + pyspark_python for standalone tests. + """ env = dict(os.environ) env.update({ 'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH, @@ -77,7 +81,7 @@ def run_individual_python_test(test_name, pyspark_python): try: per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( - [os.path.join(SPARK_HOME, "bin/pyspark"), test_name], + [launch_cmd, test_name], stderr=per_test_output, stdout=per_test_output, env=env).wait() except: LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python) @@ -109,6 +113,18 @@ def run_individual_python_test(test_name, pyspark_python): per_test_output.close() LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) +def run_standalone_python_test(test_name, pyspark_python): + """ + Runs a standalone python test. This verifies PySpark launch behaviour when starting the JVM from + Python side instead of JVM starting Python. + """ + launch_cmd=pyspark_python + run_generic_test(test_name, pyspark_python, launch_cmd) + +def run_individual_python_test(test_name, pyspark_python): + """Run a Python test launching the JVM first.""" + launch_cmd = os.path.join(SPARK_HOME, "bin/pyspark") + run_generic_test(test_name, pyspark_python, launch_cmd) def get_default_python_executables(): python_execs = [x for x in ["python2.6", "python3.4", "pypy"] if which(x)] @@ -196,7 +212,10 @@ def process_queue(task_queue): except Queue.Empty: break try: - run_individual_python_test(test_goal, python_exec) + if test_goal == 'pyspark.standalone': + run_standalone_python_test(test_goal, python_exec) + else: + run_individual_python_test(test_goal, python_exec) finally: task_queue.task_done() From 15d999bb901aa0a0eef73ff50f2ba3d24c4d3f72 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Mar 2017 16:42:20 -0700 Subject: [PATCH 07/21] pep8 fixes --- python/pyspark/java_gateway.py | 2 +- python/run-tests.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index b1d785e31cabe..603c65cf6781c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -27,7 +27,7 @@ import socket import platform from subprocess import Popen, PIPE -from threading import Thread +from threading import Thread if sys.version >= '3': diff --git a/python/run-tests.py b/python/run-tests.py index 338a669d1d1bc..f317a581113b8 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -113,19 +113,21 @@ def run_generic_test(test_name, pyspark_python, launch_cmd): per_test_output.close() LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) + def run_standalone_python_test(test_name, pyspark_python): """ Runs a standalone python test. This verifies PySpark launch behaviour when starting the JVM from Python side instead of JVM starting Python. """ - launch_cmd=pyspark_python - run_generic_test(test_name, pyspark_python, launch_cmd) + run_generic_test(test_name, pyspark_python, launch_cmd=pyspark_python) + def run_individual_python_test(test_name, pyspark_python): """Run a Python test launching the JVM first.""" launch_cmd = os.path.join(SPARK_HOME, "bin/pyspark") run_generic_test(test_name, pyspark_python, launch_cmd) + def get_default_python_executables(): python_execs = [x for x in ["python2.6", "python3.4", "pypy"] if which(x)] if "python2.6" not in python_execs: From a01f8f82361157d70c512587b5fc0ed4d6855987 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Mar 2017 16:49:08 -0700 Subject: [PATCH 08/21] Fix pip tests to use the correct launch --- dev/run-pip-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/run-pip-tests b/dev/run-pip-tests index e1e0765568f62..e9d9e18fccc16 100755 --- a/dev/run-pip-tests +++ b/dev/run-pip-tests @@ -104,9 +104,9 @@ for python in "${PYTHON_EXECS[@]}"; do cd / echo "Run basic sanity check on pip installed version with spark-submit" - spark-submit "$FWDIR"/python/standalone_tests.py + spark-submit "$FWDIR"/python/pyspark/standalone_tests.py echo "Run basic sanity check with import based" - python "$FWDIR"/python/standalone_tests.py + python "$FWDIR"/python/pyspark/standalone_tests.py echo "Run the tests for context.py" python "$FWDIR"/python/pyspark/context.py From 2bfaa16176be0d8a61062480fd3472a7c345e4b0 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 17 Apr 2017 20:38:39 -0700 Subject: [PATCH 09/21] Only do the pyspark ml tests if numpy is present --- python/pyspark/standalone_tests.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index 0603e945d237f..ac82e3e83db20 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -26,8 +26,12 @@ import sys from pyspark.sql import SparkSession -from pyspark.ml.param import Params -from pyspark.mllib.linalg import * + +if 'numpy' in sys.modules: + from pyspark.ml.param import Params + from pyspark.mllib.linalg import * +else: + print("Skipping pyspark ml import tests, missing numpy") if sys.version >= "3": from io import StringIO From a2caf9757e5123cffd773a41e47a92a898f4c777 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 9 May 2017 17:05:29 -0700 Subject: [PATCH 10/21] Update standalone tests --- python/pyspark/standalone_tests.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index ac82e3e83db20..017df6ddd3930 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -25,13 +25,6 @@ import os import sys -from pyspark.sql import SparkSession - -if 'numpy' in sys.modules: - from pyspark.ml.param import Params - from pyspark.mllib.linalg import * -else: - print("Skipping pyspark ml import tests, missing numpy") if sys.version >= "3": from io import StringIO @@ -41,12 +34,22 @@ if __name__ == "__main__": gateway_already_started = "PYSPARK_GATEWAY_PORT" in os.environ if not gateway_already_started: + print("redirecting stdout and stderr") _old_stdout = sys.stdout _old_stderr = sys.stderr # Verify stdout/stderr overwrite support for jupyter sys.stdout = new_stdout = StringIO() sys.stderr = new_stderr = StringIO() + + from pyspark.sql import SparkSession + if 'numpy' in sys.modules: + from pyspark.ml.param import Params + from pyspark.mllib.linalg import * + else: + print("Skipping pyspark ml import tests, missing numpy") + + spark = SparkSession\ .builder\ .appName("PipSanityCheck")\ @@ -72,6 +75,7 @@ if logs.find("'str' object has no attribute 'startsWith'") == -1: print("Failed to find helpful error message, redirect failed?") + print("logs were {0}".format(logs)) sys.exit(-1) print("Successfully ran pip sanity check") From 1166b9e8a719331c606e65c0e67741703d3c854c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 12 May 2017 03:24:40 -0700 Subject: [PATCH 11/21] Remove extra blank lines (style) --- python/pyspark/standalone_tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index 017df6ddd3930..aebff56761c8e 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -41,7 +41,6 @@ sys.stdout = new_stdout = StringIO() sys.stderr = new_stderr = StringIO() - from pyspark.sql import SparkSession if 'numpy' in sys.modules: from pyspark.ml.param import Params @@ -49,7 +48,6 @@ else: print("Skipping pyspark ml import tests, missing numpy") - spark = SparkSession\ .builder\ .appName("PipSanityCheck")\ From 7fcc83976f091f2e57fa4074a54574b7c311e855 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 12 May 2017 07:12:10 -0700 Subject: [PATCH 12/21] debug failure --- python/pyspark/java_gateway.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 603c65cf6781c..b8dace59145cb 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -47,8 +47,11 @@ def launch_gateway(conf=None): # If sys.stdout has been changed the child processes JVM will not respect that # so grab the jvm output and copy it over if we are in a notebook. redirect_shells = ["ZMQInteractiveShell", "StringIO"] + print("Launching gateway") grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) + print("Grab jvm output was %s %s" % + (grab_jvm_output, type(sys.stdout), sys.stdout.__class__)) if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) From 08632fdcee127bf43cf90f44139925e2c26b4946 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 13 May 2017 03:37:01 -0700 Subject: [PATCH 13/21] Change debug --- python/pyspark/java_gateway.py | 6 +++--- python/pyspark/standalone_tests.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index b8dace59145cb..79b15e24780e1 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -47,11 +47,11 @@ def launch_gateway(conf=None): # If sys.stdout has been changed the child processes JVM will not respect that # so grab the jvm output and copy it over if we are in a notebook. redirect_shells = ["ZMQInteractiveShell", "StringIO"] - print("Launching gateway") grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) - print("Grab jvm output was %s %s" % - (grab_jvm_output, type(sys.stdout), sys.stdout.__class__)) + if (grab_jvm_output): + print("Grab jvm output was %s %s" % + (grab_jvm_output, type(sys.stdout), sys.stdout.__class__)) if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index aebff56761c8e..6c4ebe9a2f485 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -40,6 +40,7 @@ # Verify stdout/stderr overwrite support for jupyter sys.stdout = new_stdout = StringIO() sys.stderr = new_stderr = StringIO() + print("Redirected to {0} / {1}".format(sys.stdout, sys.stderr)) from pyspark.sql import SparkSession if 'numpy' in sys.modules: From 64aee29beadb840601f390b727ae1bccdb9aa88a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 13 May 2017 18:17:48 -0700 Subject: [PATCH 14/21] Check for the pypy error string as well, also catch re/throw errors during standalone tests after reseting the stderr pipe so the user can see it --- python/pyspark/java_gateway.py | 3 - python/pyspark/standalone_tests.py | 90 +++++++++++++++++------------- 2 files changed, 51 insertions(+), 42 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 79b15e24780e1..603c65cf6781c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -49,9 +49,6 @@ def launch_gateway(conf=None): redirect_shells = ["ZMQInteractiveShell", "StringIO"] grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) - if (grab_jvm_output): - print("Grab jvm output was %s %s" % - (grab_jvm_output, type(sys.stdout), sys.stdout.__class__)) if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index 6c4ebe9a2f485..31955c9449310 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -33,49 +33,61 @@ if __name__ == "__main__": gateway_already_started = "PYSPARK_GATEWAY_PORT" in os.environ - if not gateway_already_started: - print("redirecting stdout and stderr") - _old_stdout = sys.stdout - _old_stderr = sys.stderr - # Verify stdout/stderr overwrite support for jupyter - sys.stdout = new_stdout = StringIO() - sys.stderr = new_stderr = StringIO() - print("Redirected to {0} / {1}".format(sys.stdout, sys.stderr)) + try: + if not gateway_already_started: + print("Running redirection tests since not in existing gateway") + _old_stdout = sys.stdout + _old_stderr = sys.stderr + # Verify stdout/stderr overwrite support for jupyter + sys.stdout = new_stdout = StringIO() + sys.stderr = new_stderr = StringIO() + print("Redirected to {0} / {1}".format(sys.stdout, sys.stderr), file=_old_stdout) + else: + print("Skipping redirection tests since gateway already exists") - from pyspark.sql import SparkSession - if 'numpy' in sys.modules: - from pyspark.ml.param import Params - from pyspark.mllib.linalg import * - else: - print("Skipping pyspark ml import tests, missing numpy") + from pyspark.sql import SparkSession + if 'numpy' in sys.modules: + from pyspark.ml.param import Params + from pyspark.mllib.linalg import * + else: + print("Skipping pyspark ml import tests, missing numpy") - spark = SparkSession\ - .builder\ - .appName("PipSanityCheck")\ - .getOrCreate() - print("Spark context created") - sc = spark.sparkContext - rdd = sc.parallelize(range(100), 10) - value = rdd.reduce(lambda x, y: x + y) + spark = SparkSession\ + .builder\ + .appName("PipSanityCheck")\ + .getOrCreate() + print("Spark context created") + sc = spark.sparkContext + rdd = sc.parallelize(range(100), 10) + value = rdd.reduce(lambda x, y: x + y) - if (value != 4950): - print("Value {0} did not match expected value.".format(value), file=sys.stderr) - sys.exit(-1) + if (value != 4950): + print("Value {0} did not match expected value.".format(value), file=sys.__stderr__) + sys.exit(-1) - if not gateway_already_started: - try: - rdd2 = rdd.map(lambda x: str(x).startsWith("expected error")) - rdd2.collect() - except: - pass - sys.stdout = _old_stdout - sys.stderr = _old_stderr - logs = new_stderr.getvalue() + new_stdout.getvalue() + if not gateway_already_started: + try: + rdd2 = rdd.map(lambda x: str(x).startsWith("expected error")) + rdd2.collect() + except: + pass - if logs.find("'str' object has no attribute 'startsWith'") == -1: - print("Failed to find helpful error message, redirect failed?") - print("logs were {0}".format(logs)) - sys.exit(-1) - print("Successfully ran pip sanity check") + sys.stdout = _old_stdout + sys.stderr = _old_stderr + logs = new_stderr.getvalue() + new_stdout.getvalue() + + if logs.find("'str' object has no attribute 'startsWith'") == -1 and \ + logs.find("SystemError: unknown opcode") == -1: + print("Failed to find helpful error message, redirect failed?") + print("logs were {0}".format(logs)) + sys.exit(-1) + else: + print("Redirection tests passed") + print("Successfully ran pip sanity check") + except Exception as inst: + # If there is an uncaught exception print it, restore the stderr + print("Exception during testing, {0}".format(inst), file=sys.__stderr__) + sys.stderr = sys.__stderr__ + raise spark.stop() From bb36f50ee7abc49569a28264dc60f947b54f5a28 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 May 2017 00:52:04 -0700 Subject: [PATCH 15/21] hmmm lets makesure grab jvm is true --- python/pyspark/java_gateway.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 603c65cf6781c..fbc383bbc0f80 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -49,6 +49,8 @@ def launch_gateway(conf=None): redirect_shells = ["ZMQInteractiveShell", "StringIO"] grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) + if grab_jvm_output: + print("Grabbing JVM output cause magic.....") if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) From 24ff579c6663e8a34bb577fb15405b4dcc325ec6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 May 2017 03:26:10 -0700 Subject: [PATCH 16/21] hmmm.... --- python/pyspark/java_gateway.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index fbc383bbc0f80..5f62aa343155a 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -54,6 +54,8 @@ def launch_gateway(conf=None): if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) + if grab_jvm_output: + print("Gateway already launched, can not grab output") else: SPARK_HOME = _find_spark_home() # Launch the Py4j gateway using Spark's run command so that we pick up the From c450d2447693769e3ad8f7c70e8830e878ac91c8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 May 2017 03:38:09 -0700 Subject: [PATCH 17/21] hmmm.... --- python/pyspark/java_gateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 5f62aa343155a..b72e3e17b4c3f 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -104,6 +104,7 @@ def preexec_func(): def connect(input_pipe, out_pipe): """Connect the input pipe to the output. We can't use os.dup for IPython or directly write to them (see https://github.com/ipython/ipython/pull/3072/).""" + print("Connecting pipes....") for line in iter(input_pipe.readline, b''): print(line, file=out_pipe) input_pipe.close() From e0c3a6b778f70d7dec94484a187f9de46ab3b11c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 May 2017 09:16:30 -0700 Subject: [PATCH 18/21] Buffer the stderr/stdout --- python/pyspark/java_gateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index b72e3e17b4c3f..c32f482c5c1d3 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -98,6 +98,7 @@ def preexec_func(): if grab_jvm_output: proc_kwargs["stderr"] = PIPE proc_kwargs["stdout"] = PIPE + proc_kwargs["bufsize"] = 1 proc = Popen(command, **proc_kwargs) From 009e7c4b40c7726d5f51dc6fca5a5b131badab69 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 21 May 2017 07:31:28 -0700 Subject: [PATCH 19/21] Close FDs. We should really use dup2 if sys.stdout / sys.stderr have fileno --- python/pyspark/java_gateway.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index c32f482c5c1d3..9f5495731a407 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -99,6 +99,7 @@ def preexec_func(): proc_kwargs["stderr"] = PIPE proc_kwargs["stdout"] = PIPE proc_kwargs["bufsize"] = 1 + proc_kwargs["close_fds"] = True proc = Popen(command, **proc_kwargs) @@ -108,6 +109,7 @@ def connect(input_pipe, out_pipe): print("Connecting pipes....") for line in iter(input_pipe.readline, b''): print(line, file=out_pipe) + print("Pipe finished...") input_pipe.close() if grab_jvm_output: From ae450e89178f4bfd835d3377e7acbd216f5e7fbb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 16 Jan 2018 20:00:34 -0800 Subject: [PATCH 20/21] Skip on pypy --- python/pyspark/java_gateway.py | 9 ++++----- python/pyspark/standalone_tests.py | 4 +++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 4bae76e1066bb..9d09844dfd352 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -26,6 +26,7 @@ import shlex import socket import platform +import warnings from subprocess import Popen, PIPE from threading import Thread @@ -49,13 +50,13 @@ def launch_gateway(conf=None): redirect_shells = ["ZMQInteractiveShell", "StringIO"] grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) - if grab_jvm_output: - print("Grabbing JVM output cause magic.....") if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) if grab_jvm_output: - print("Gateway already launched, can not grab output") + warnings.warn( + "Gateway already launched, can not grab output. JVM messages may not be delivered.", + RuntimeWarning) else: SPARK_HOME = _find_spark_home() # Launch the Py4j gateway using Spark's run command so that we pick up the @@ -106,10 +107,8 @@ def preexec_func(): def connect(input_pipe, out_pipe): """Connect the input pipe to the output. We can't use os.dup for IPython or directly write to them (see https://github.com/ipython/ipython/pull/3072/).""" - print("Connecting pipes....") for line in iter(input_pipe.readline, b''): print(line, file=out_pipe) - print("Pipe finished...") input_pipe.close() if grab_jvm_output: diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py index 31955c9449310..b9b183af9d0e5 100644 --- a/python/pyspark/standalone_tests.py +++ b/python/pyspark/standalone_tests.py @@ -34,7 +34,7 @@ if __name__ == "__main__": gateway_already_started = "PYSPARK_GATEWAY_PORT" in os.environ try: - if not gateway_already_started: + if not gateway_already_started and not hasattr(sys, "pypy_translation_info"): print("Running redirection tests since not in existing gateway") _old_stdout = sys.stdout _old_stderr = sys.stderr @@ -42,6 +42,8 @@ sys.stdout = new_stdout = StringIO() sys.stderr = new_stderr = StringIO() print("Redirected to {0} / {1}".format(sys.stdout, sys.stderr), file=_old_stdout) + elif hasattr(sys, "pypy_translation_info"): + print("Skipping redirection tests in pypy") else: print("Skipping redirection tests since gateway already exists") From 33a2bab430335a883fbf7205fc6a64890002bb27 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 16 Jan 2018 20:47:04 -0800 Subject: [PATCH 21/21] Log a warning if we are running in a notebook in PyPy and we want to redirect the output --- python/pyspark/java_gateway.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9d09844dfd352..c6a55b9a5df77 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -51,11 +51,18 @@ def launch_gateway(conf=None): grab_jvm_output = (sys.stdout != sys.__stdout__ and sys.stdout.__class__.__name__ in redirect_shells) + if hasattr(sys, "pypy_translation_info") and grab_jvm_output: + warnings.warn( + "Unable to grab JVM output with PyPy." + "JVM log messages may not be delivered to the notebook.") + grab_jvm_putput = False + if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) if grab_jvm_output: warnings.warn( - "Gateway already launched, can not grab output. JVM messages may not be delivered.", + "Gateway already launched, can not grab output." + "JVM messages may not be delivered to the notebook.", RuntimeWarning) else: SPARK_HOME = _find_spark_home()