diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml index c11f00ed4fa..ba7da60c8e4 100644 --- a/.github/workflows/javaTests.yml +++ b/.github/workflows/javaTests.yml @@ -29,6 +29,7 @@ on: - '*.html' - 'src/main/python/**' - 'dev/**' + - '.github/workflows/python.yml' branches: - main pull_request: @@ -38,6 +39,7 @@ on: - '*.html' - 'src/main/python/**' - 'dev/**' + - '.github/workflows/python.yml' branches: - main diff --git a/src/main/java/org/apache/sysds/api/PythonDMLScript.java b/src/main/java/org/apache/sysds/api/PythonDMLScript.java index 80f5ffcd755..3b1864d71dd 100644 --- a/src/main/java/org/apache/sysds/api/PythonDMLScript.java +++ b/src/main/java/org/apache/sysds/api/PythonDMLScript.java @@ -25,10 +25,26 @@ import org.apache.log4j.Logger; import org.apache.sysds.api.jmlc.Connection; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.runtime.util.UnixPipeUtils; import py4j.DefaultGatewayServerListener; import py4j.GatewayServer; import py4j.Py4JNetworkException; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + public class PythonDMLScript { @@ -36,6 +52,13 @@ public class PythonDMLScript { final private Connection _connection; public static GatewayServer GwS; + private static String fromPythonBase = "py2java"; + private static String toPythonBase = "java2py"; + public HashMap fromPython = null; + public HashMap toPython = null; + public String baseDir; + private static int BATCH_SIZE = 32*1024; + /** * Entry point for Python API. * @@ -78,6 +101,103 @@ public Connection getConnection() { return _connection; } + + public void openPipes(String path, int num) throws IOException { + fromPython = new HashMap<>(num * 2); + toPython = new HashMap<>(num * 2); + baseDir = path; + for (int i = 0; i < num; i++) { + BufferedInputStream pipe_in = UnixPipeUtils.openInput(path + "/" + fromPythonBase + "-" + i, i); + LOG.debug("PY2JAVA pipe "+i+" is ready!"); + fromPython.put(i, pipe_in); + + BufferedOutputStream pipe_out = UnixPipeUtils.openOutput(path + "/" + toPythonBase + "-" + i, i); + toPython.put(i, pipe_out); + } + } + + public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, Types.ValueType type) throws IOException { + long limit = (long) rlen * clen; + LOG.debug("trying to read matrix from "+id+" with "+rlen+" rows and "+clen+" columns. Total size: "+limit); + if(limit > Integer.MAX_VALUE) + throw new DMLRuntimeException("Dense NumPy array of size " + limit + + " cannot be converted to MatrixBlock"); + MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1); + if(fromPython != null){ + BufferedInputStream pipe = fromPython.get(id); + double[] denseBlock = new double[(int) limit]; + UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0); + mb.init(denseBlock, rlen, clen); + } else { + throw new DMLRuntimeException("FIFO Pipes are not initialized."); + } + mb.recomputeNonZeros(); + mb.examSparsity(); + LOG.debug("Reading from Python finished"); + return mb; + } + + public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, Types.ValueType type) throws ExecutionException, InterruptedException { + long limit = (long) rlen * clen; + if(limit > Integer.MAX_VALUE) + throw new DMLRuntimeException("Dense NumPy array of size " + limit + + " cannot be converted to MatrixBlock"); + MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1); + if(fromPython != null){ + ExecutorService pool = CommonThreadPool.get(); + double[] denseBlock = new double[(int) limit]; + int offsetOut = 0; + List> futures = new ArrayList<>(); + for (int i = 0; i < blockSizes.length; i++) { + BufferedInputStream pipe = fromPython.get(i); + int id = i, blockSize = blockSizes[i], _offsetOut = offsetOut; + Callable task = () -> { + UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut); + return null; + }; + + futures.add(pool.submit(task)); + offsetOut += blockSize; + } + // Wait for all tasks and propagate exceptions + for (Future f : futures) { + f.get(); + } + + mb.init(denseBlock, rlen, clen); + } else { + throw new DMLRuntimeException("FIFO Pipes are not initialized."); + } + mb.recomputeNonZeros(); + mb.examSparsity(); + return mb; + } + + public void startWritingMbToPipe(int id, MatrixBlock mb) throws IOException { + if (toPython != null) { + int rlen = mb.getNumRows(); + int clen = mb.getNumColumns(); + int numElem = rlen * clen; + LOG.debug("Trying to write matrix ["+baseDir + "-"+ id+"] with "+rlen+" rows and "+clen+" columns. Total size: "+numElem*8); + + BufferedOutputStream out = toPython.get(id); + long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, Types.ValueType.FP64, mb); + + LOG.debug("Writing of " + bytes +" Bytes to Python ["+baseDir + "-"+ id+"] finished"); + } else { + throw new DMLRuntimeException("FIFO Pipes are not initialized."); + } + } + + public void closePipes() throws IOException { + LOG.debug("Closing all pipes in Java"); + for (BufferedInputStream pipe : fromPython.values()) + pipe.close(); + for (BufferedOutputStream pipe : toPython.values()) + pipe.close(); + LOG.debug("Closed all pipes in Java"); + } + protected static class DMLGateWayListener extends DefaultGatewayServerListener { private static final Log LOG = LogFactory.getLog(DMLGateWayListener.class.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java new file mode 100644 index 00000000000..2dbbdd079f0 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; + +public class UnixPipeUtils { + private static final Log LOG = LogFactory.getLog(UnixPipeUtils.class.getName()); + + /** + * Opens a named pipe for input, reads 4 bytes as an int, compares it to the expected ID. + * If matched, returns the InputStream for further use. + * + * @param pipePath The filesystem path to the FIFO pipe + * @param expectedId The expected handshake ID + * @return BufferedInputStream if handshake succeeds + * @throws IOException if file access fails + * @throws IllegalStateException if handshake ID doesn't match + */ + + public static BufferedInputStream openInput(String pipePath, int expectedId) throws IOException { + File pipeFile = new File(pipePath); + if (!pipeFile.exists()) { + throw new FileNotFoundException("Pipe not found at path: " + pipePath); + } + + FileInputStream fis = new FileInputStream(pipeFile); + BufferedInputStream bis = new BufferedInputStream(fis); + + readHandshake(expectedId, bis); + + return bis; + } + + public static void readHandshake(int expectedId, BufferedInputStream bis) throws IOException { + // Read 4 bytes for handshake + byte[] buffer = new byte[4]; + int bytesRead = bis.read(buffer); + if (bytesRead != 4) { + bis.close(); + throw new IOException("Failed to read handshake integer from pipe"); + } + + // Convert bytes to int (assuming little-endian to match typical Python struct.pack) + int receivedId = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt(); + expectedId += 1000; + + if (receivedId != expectedId) { + bis.close(); + throw new IllegalStateException("Handshake ID mismatch: expected " + expectedId + ", got " + receivedId); + } + } + + public static BufferedOutputStream openOutput(String pipePath, int expectedId) throws IOException { + File pipeFile = new File(pipePath); + if (!pipeFile.exists()) { + throw new FileNotFoundException("Pipe not found at path: " + pipePath); + } + + FileOutputStream fos = new FileOutputStream(pipeFile); + BufferedOutputStream bos = new BufferedOutputStream(fos); + + writeHandshake(expectedId, bos); + + return bos; + } + + public static void writeHandshake(int expectedId, BufferedOutputStream bos) throws IOException { + // Convert int to 4-byte little-endian and send as handshake + byte[] handshake = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(expectedId + 1000).array(); + bos.write(handshake); + bos.flush(); + } + + public static void readNumpyArrayInBatches(BufferedInputStream in, int id, int batchSize, int numElem, + Types.ValueType type, double[] out, int offsetOut) + throws IOException { + int elemSize; + switch (type){ + case UINT8 -> elemSize = 1; + case INT32, FP32 -> elemSize = 4; + default -> elemSize = 8; + } + + try { + // Read start header + readHandshake(id, in); + long bytesRemaining = ((long) numElem) * elemSize; + byte[] buffer = new byte[batchSize]; + + while (bytesRemaining > 0) { + int currentBatchSize = (int) Math.min(batchSize, bytesRemaining); + int totalRead = 0; + + while (totalRead < currentBatchSize) { + int bytesRead = in.read(buffer, totalRead, currentBatchSize - totalRead); + if (bytesRead == -1) { + throw new EOFException("Unexpected end of stream in pipe #" + id + + ": expected " + currentBatchSize + " bytes, got " + totalRead); + } + totalRead += bytesRead; + } + + // Interpret bytes with value type and fill the dense MB + offsetOut = fillDoubleArrayFromByteArray(type, out, offsetOut, buffer, currentBatchSize); + bytesRemaining -= currentBatchSize; + } + + // Read end header + readHandshake(id, in); + + } catch (Exception e) { + LOG.error("Error occurred while reading data from pipe #" + id, e); + throw e; + } + } + + private static int fillDoubleArrayFromByteArray(Types.ValueType type, double[] out, int offsetOut, byte[] buffer, + int currentBatchSize) { + ByteBuffer bb = ByteBuffer.wrap(buffer, 0, currentBatchSize).order(ByteOrder.LITTLE_ENDIAN); + switch (type){ + default -> { + DoubleBuffer doubleBuffer = bb.asDoubleBuffer(); + int numDoubles = doubleBuffer.remaining(); + doubleBuffer.get(out, offsetOut, numDoubles); + offsetOut += numDoubles; + } + case FP32 -> { + FloatBuffer floatBuffer = bb.asFloatBuffer(); + int numFloats = floatBuffer.remaining(); + for (int i = 0; i < numFloats; i++) { + out[offsetOut++] = floatBuffer.get(); + } + } + case INT32 -> { + IntBuffer intBuffer = bb.asIntBuffer(); + int numInts = intBuffer.remaining(); + for (int i = 0; i < numInts; i++) { + out[offsetOut++] = intBuffer.get(); + } + } + case UINT8 -> { + for (int i = 0; i < currentBatchSize; i++) { + out[offsetOut++] = bb.get(i) & 0xFF; + } + } + } + return offsetOut; + } + + public static long writeNumpyArrayInBatches(BufferedOutputStream out, int id, int batchSize, int numElem, + Types.ValueType type, MatrixBlock mb) throws IOException { + int elemSize; + switch (type) { + case UINT8 -> elemSize = 1; + case INT32, FP32 -> elemSize = 4; + default -> elemSize = 8; + } + long totalBytesWritten = 0; + + // Write start header + writeHandshake(id, out); + + int bytesRemaining = numElem * elemSize; + int offset = 0; + + byte[] buffer = new byte[batchSize]; + + while (bytesRemaining > 0) { + int currentBatchSize = Math.min(batchSize, bytesRemaining); + + // Fill buffer from MatrixBlock into byte[] (typed) + int bytesWritten = fillByteArrayFromDoubleArray(type, mb, offset, buffer, currentBatchSize); + totalBytesWritten += bytesWritten; +// if (bytesWritten != currentBatchSize) { +// throw new IOException("Internal error: mismatched buffer fill size"); +// } + + out.write(buffer, 0, currentBatchSize); + offset += currentBatchSize / elemSize; + bytesRemaining -= currentBatchSize; + } + + out.flush(); + + // Write end header + writeHandshake(id, out); + return totalBytesWritten; + } + + private static int fillByteArrayFromDoubleArray(Types.ValueType type, MatrixBlock mb, int offsetIn, + byte[] buffer, int maxBytes) { + ByteBuffer bb = ByteBuffer.wrap(buffer, 0, maxBytes).order(ByteOrder.LITTLE_ENDIAN); + int r,c; + switch (type) { + default -> { // FP64 + DoubleBuffer doubleBuffer = bb.asDoubleBuffer(); + int count = Math.min(doubleBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); + for (int i = 0; i < count; i++) { + r = (offsetIn + i) / mb.getNumColumns(); + c = (offsetIn + i) % mb.getNumColumns(); + doubleBuffer.put(mb.getDouble(r,c)); + } + return count * 8; + } + case FP32 -> { + FloatBuffer floatBuffer = bb.asFloatBuffer(); + int count = Math.min(floatBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); + for (int i = 0; i < count; i++) { + r = (offsetIn + i) / mb.getNumColumns(); + c = (offsetIn + i) % mb.getNumColumns(); + floatBuffer.put((float) mb.getDouble(r,c)); + } + return count * 4; + } + case INT32 -> { + IntBuffer intBuffer = bb.asIntBuffer(); + int count = Math.min(intBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); + for (int i = 0; i < count; i++) { + r = (offsetIn + i) / mb.getNumColumns(); + c = (offsetIn + i) % mb.getNumColumns(); + intBuffer.put((int) mb.getDouble(r,c)); + } + return count * 4; + } + case UINT8 -> { + int count = Math.min(maxBytes, mb.getNumRows() * mb.getNumColumns() - offsetIn); + for (int i = 0; i < count; i++) { + r = (offsetIn + i) / mb.getNumColumns(); + c = (offsetIn + i) % mb.getNumColumns(); + buffer[i] = (byte) ((int) mb.getDouble(r,c) & 0xFF); + } + return count; + } + } + } +} \ No newline at end of file diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index a559850c333..440c51c3c88 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -24,15 +24,19 @@ import json import logging import os +import uuid import socket import sys +import struct +import traceback from contextlib import contextmanager from glob import glob from queue import Queue from subprocess import PIPE, Popen from threading import Thread -from time import sleep +from time import sleep, time from typing import Dict, Iterable, Sequence, Tuple, Union +from concurrent.futures import ThreadPoolExecutor import numpy as np import pandas as pd @@ -66,7 +70,15 @@ class SystemDSContext(object): _log: logging.Logger __stdout: Queue = None __stderr: Queue = None + _FIFO_PATH = "/tmp/systemds/pipes/" + _FIFO_PY2JAVA_BASE = "py2java" + _FIFO_JAVA2PY_BASE = "java2py" + _FIFO_PY2JAVA_PIPES = [] + _FIFO_JAVA2PY_PIPES = [] + _data_transfer_mode = 0 + _multi_pipe_enabled = False _logging_initialized = False + _executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4) def __init__( self, @@ -75,6 +87,8 @@ def __init__( capture_stdout: bool = False, logging_level: int = 20, py4j_logging_level: int = 50, + data_transfer_mode: int = 1, + multi_pipe_enabled: bool = False, ): """Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled Any new instance of this SystemDS Context, would start a separate new JVM. @@ -89,11 +103,106 @@ def __init__( The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL. :param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this, it can be verbose if not set high. + :param data_transfer_mode: default 0, """ + self.__setup_logging(logging_level, py4j_logging_level) self.__start(port, capture_stdout) self.capture_stats(capture_statistics) self._log.debug("Started JVM and SystemDS python context manager") + self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled) + + def __setup_data_transfer(self, data_transfer_mode=0, multi_pipe_enabled=False): + self._data_transfer_mode = data_transfer_mode + self._multi_pipe_enabled = multi_pipe_enabled + if os.name == "posix" and data_transfer_mode == 1: + num_pipes = (os.cpu_count() or 2) if multi_pipe_enabled else 1 + self.__make_fifo_named_pipes(num_pipes) + in_pipes, out_pipes = self.__init_pipes(num_pipes) + + self._log.info( + "Data transfer: Handshake done for {} IN / OUT Pipes".format(num_pipes) + ) + self._FIFO_PY2JAVA_PIPES = out_pipes + self._FIFO_JAVA2PY_PIPES = in_pipes + else: + self._data_transfer_mode = 0 + + def __init_pipes(self, num_pipes): + + def open_pipe_write(path, pipe_id): + self._log.debug("Opening {} for writing in Py".format(path)) + pipe = open(path, "wb") + handshake = struct.pack(" 1: self._log.warning("Multiple config files found selecting: " + files[0]) @@ -352,15 +475,37 @@ def __exit__(self, exc_type, exc_val, exc_tb): def close(self): """Close the connection to the java process and do necessary cleanup.""" if hasattr(self, "java_gateway"): - self.__kill_Popen(self.java_gateway.java_process) + if self._data_transfer_mode == 1: + self._log.debug("Closing all Pipes in Python") + for pipe in self._FIFO_JAVA2PY_PIPES: + pipe.close() + self._FIFO_JAVA2PY_PIPES = [] + for pipe in self._FIFO_PY2JAVA_PIPES: + pipe.close() + self._FIFO_PY2JAVA_PIPES = [] + self._log.debug("All Pipes are closed in Python") + + if self.java_gateway._gateway_client.is_connected: + self._log.debug("Closing all Pipes in Java") + self.java_gateway.entry_point.closePipes() + self._log.debug("All Pipes are closed in Java") + else: + self._log.debug("Java Gateway is not connected anymore") + self.__delete_tmp_files() + self.java_gateway.shutdown() + self.__kill_Popen(self.java_gateway.java_process) if hasattr(self, "__process"): logging.error("Has process variable") self.__kill_Popen(self.__process) if hasattr(self, "__stdout_thread") and self.__stdout_thread.is_alive(): self.__stdout_thread.join(0) + for line in self.get_stdout(): + print(line) if hasattr(self, "__stderr_thread") and self.__stderr_thread.is_alive(): self.__stderr_thread.join(0) + for line in self.get_stderr(): + print(line) def __kill_Popen(self, process: Popen): """Stop the process at the Popen. diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py index 208e248ec60..562897e4635 100644 --- a/src/main/python/systemds/operator/nodes/matrix.py +++ b/src/main/python/systemds/operator/nodes/matrix.py @@ -109,7 +109,7 @@ def compute(self, verbose: bool = False, lineage: bool = False) -> np.array: def _parse_output_result_variables(self, result_variables): return matrix_block_to_numpy( - self.sds_context.java_gateway.jvm, + self.sds_context, result_variables.getMatrixBlock(self._script.out_var_name[0]), ) diff --git a/src/main/python/systemds/operator/nodes/multi_return.py b/src/main/python/systemds/operator/nodes/multi_return.py index a43c478a08c..523537602d5 100644 --- a/src/main/python/systemds/operator/nodes/multi_return.py +++ b/src/main/python/systemds/operator/nodes/multi_return.py @@ -81,7 +81,9 @@ def _parse_output_result_variables(self, result_variables): output = self._outputs[idx] if str(output) == "MatrixNode": result_var.append( - matrix_block_to_numpy(jvmV, result_variables.getMatrixBlock(v)) + matrix_block_to_numpy( + self.sds_context, result_variables.getMatrixBlock(v) + ) ) elif str(output) == "FrameNode": result_var.append( diff --git a/src/main/python/systemds/script_building/script.py b/src/main/python/systemds/script_building/script.py index 37bd4cdca0c..351d3fccfbc 100644 --- a/src/main/python/systemds/script_building/script.py +++ b/src/main/python/systemds/script_building/script.py @@ -95,7 +95,7 @@ def execute(self) -> JavaObject: exception_str = "Py4JNetworkError: no connection to JVM, most likely due to previous crash or closed JVM from calls to close()" trace_back_limit = 0 except Exception as e: - exception_str = str(e) + exception_str = e trace_back_limit = None self.sds_context.exception_and_close(exception_str, trace_back_limit) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 551a2332579..f077a7c807a 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -20,11 +20,62 @@ # ------------------------------------------------------------- import struct +import tempfile +import mmap +import time import numpy as np import pandas as pd import concurrent.futures from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView +import os + + +def format_bytes(size): + for unit in ["Bytes", "KB", "MB", "GB", "TB", "PB"]: + if size < 1024.0: + return f"{size:.2f} {unit}" + size /= 1024.0 + + +def pipe_transfer_header(pipe, pipe_id): + handshake = struct.pack(" 2147483647: + raise Exception("") + # If not numpy array then convert to numpy array if not isinstance(np_arr, np.ndarray): np_arr = np.asarray(np_arr, dtype=np.float64) jvm: JVMView = sds.java_gateway.jvm + ep = sds.java_gateway.entry_point - # flatten and prepare byte buffer. + # flatten and set value type if np_arr.dtype is np.dtype(np.uint8): arr = np_arr.ravel() value_type = jvm.org.apache.sysds.common.Types.ValueType.UINT8 @@ -56,31 +111,137 @@ def numpy_to_matrix_block(sds, np_arr: np.array): else: arr = np_arr.ravel().astype(np.float64) value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64 - buf = arr.tobytes() - # Send data to java. - try: + if sds._data_transfer_mode == 1: + mv = memoryview(arr).cast("B") + total_bytes = mv.nbytes + min_bytes_per_pipe = 1024 * 1024 * 1024 * 1 + batch_size_bytes = 32 * 1024 # pipe's ring buffer is 64KB + + # Using multiple pipes is disabled by default + use_single_pipe = ( + not sds._multi_pipe_enabled or total_bytes < 2 * min_bytes_per_pipe + ) + if use_single_pipe: + # sds._log.debug( + # "Using single FIFO pipe for reading {}".format(format_bytes(total_bytes)) + # ) + pipe_id = 0 + pipe = sds._FIFO_PY2JAVA_PIPES[pipe_id] + fut = sds._executor_pool.submit( + ep.startReadingMbFromPipe, pipe_id, rows, cols, value_type + ) + + pipe_transfer_header(pipe, pipe_id) # start + pipe_transfer_bytes(pipe, 0, total_bytes, batch_size_bytes, mv) + pipe_transfer_header(pipe, pipe_id) # end + + return fut.result() # Java returns MatrixBlock + else: + num_pipes = min( + len(sds._FIFO_PY2JAVA_PIPES), total_bytes // min_bytes_per_pipe + ) + # align blocks per element + num_elems = len(arr) + elem_size = np_arr.dtype.itemsize + min_elems_block = num_elems // num_pipes + left_over = num_elems % num_pipes + block_sizes = sds.java_gateway.new_array(jvm.int, num_pipes) + for i in range(num_pipes): + block_sizes[i] = min_elems_block + int(i < left_over) + + # run java readers in parallel + fut_java = sds._executor_pool.submit( + ep.startReadingMbFromPipes, block_sizes, rows, cols, value_type + ) + + # run writers in parallel + def _pipe_write_task(_pipe_id, _pipe, memview, start, end): + pipe_transfer_header(_pipe, _pipe_id) + pipe_transfer_bytes(_pipe, start, end, batch_size_bytes, memview) + pipe_transfer_header(_pipe, _pipe_id) + + cur = 0 + futures = [] + for i, size in enumerate(block_sizes): + pipe = sds._FIFO_PY2JAVA_PIPES[i] + start_byte = cur * elem_size + cur += size + end_byte = cur * elem_size + + fut = sds._executor_pool.submit( + _pipe_write_task, i, pipe, mv, start_byte, end_byte + ) + futures.append(fut) + + return fut_java.result() # Java returns MatrixBlock + else: + # prepare byte buffer. + buf = arr.tobytes() + + # Send data to java. j_class: JavaClass = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type) - except Exception as e: - sds.exception_and_close(e) -def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject): +def matrix_block_to_numpy(sds, mb: JavaObject): """Converts a MatrixBlock object in the JVM to a numpy array. - :param jvm: The current JVM instance running systemds. + :param sds: The current systemds context. :param mb: A pointer to the JVM's MatrixBlock object. """ + jvm: JVMView = sds.java_gateway.jvm + ep = sds.java_gateway.entry_point + + rows = mb.getNumRows() + cols = mb.getNumColumns() + try: + if sds._data_transfer_mode == 1: + dtype = np.float64 + + elem_size = np.dtype(dtype).itemsize + num_elements = rows * cols + total_bytes = num_elements * elem_size + batch_size_bytes = 32 * 1024 # 32 KB + + arr = np.empty(num_elements, dtype=dtype) + mv = memoryview(arr).cast("B") + + pipe_id = 0 + pipe = sds._FIFO_JAVA2PY_PIPES[pipe_id] - num_ros = mb.getNumRows() - num_cols = mb.getNumColumns() - buf = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertMBtoPy4JDenseArr( - mb - ) - return np.frombuffer(buf, count=num_ros * num_cols, dtype=np.float64).reshape( - (num_ros, num_cols) - ) + sds._log.debug( + "Using single FIFO pipe for reading {}".format( + format_bytes(total_bytes) + ) + ) + + # Java starts writing to pipe in background + fut = sds._executor_pool.submit(ep.startWritingMbToPipe, pipe_id, mb) + + pipe_receive_header(pipe, pipe_id, sds._log) + sds._log.debug( + "Py4j task for writing {} [{}] is: done=[{}], running=[{}]".format( + format_bytes(total_bytes), sds._FIFO_PATH, fut.done(), fut.running() + ) + ) + pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes, sds._log) + pipe_receive_header(pipe, pipe_id, sds._log) + + fut.result() + sds._log.debug("Reading is done for {}".format(format_bytes(total_bytes))) + return arr.reshape((rows, cols)) + + else: + buf = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertMBtoPy4JDenseArr( + mb + ) + return np.frombuffer(buf, count=rows * cols, dtype=np.float64).reshape( + (rows, cols) + ) + except Exception as e: + sds.exception_and_close(e) + return None def convert(jvm, fb, idx, num_elements, value_type, pd_series, conversion="column"): diff --git a/src/main/python/tests/matrix/test_block_converter.py b/src/main/python/tests/matrix/test_block_converter.py index 5fe4b205b61..3e132b42384 100644 --- a/src/main/python/tests/matrix/test_block_converter.py +++ b/src/main/python/tests/matrix/test_block_converter.py @@ -73,7 +73,7 @@ def test_random_nxk(self): def convert_back_and_forth(self, array): matrix_block = numpy_to_matrix_block(self.sds, array) # use the ability to call functions on matrix_block. - returned = matrix_block_to_numpy(self.sds.java_gateway.jvm, matrix_block) + returned = matrix_block_to_numpy(self.sds, matrix_block) self.assertTrue(np.allclose(array, returned)) diff --git a/src/main/python/tests/matrix/test_block_converter_unix_pipe.py b/src/main/python/tests/matrix/test_block_converter_unix_pipe.py new file mode 100644 index 00000000000..53a2eb4630f --- /dev/null +++ b/src/main/python/tests/matrix/test_block_converter_unix_pipe.py @@ -0,0 +1,104 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + + +import os +import shutil +import unittest +import pandas as pd +import numpy as np +from systemds.context import SystemDSContext + + +class TestMatrixBlockConverterUnixPipe(unittest.TestCase): + + sds: SystemDSContext = None + temp_dir: str = "tests/iotests/temp_write_csv/" + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext( + data_transfer_mode=1, logging_level=10, capture_stdout=True + ) + if not os.path.exists(cls.temp_dir): + os.makedirs(cls.temp_dir) + + @classmethod + def tearDownClass(cls): + cls.sds.close() + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_python_to_java(self): + combinations = [ # (n_rows, n_cols) + (5, 0), + (5, 1), + (10, 10), + ] + + for n_rows, n_cols in combinations: + matrix = ( + np.random.random((n_rows, n_cols)) + if n_cols != 0 + else np.random.random(n_rows) + ) + # Transfer into SystemDS and write to CSV + matrix_sds = self.sds.from_numpy(matrix) + matrix_sds.write( + self.temp_dir + "into_systemds_matrix.csv", format="csv", header=False + ).compute(verbose=True) + + # Read the CSV file using pandas + result_df = pd.read_csv( + self.temp_dir + "into_systemds_matrix.csv", header=None + ) + matrix_out = result_df.to_numpy() + if n_cols == 0: + matrix_out = matrix_out.flatten() + # Verify the data + self.assertTrue(np.allclose(matrix_out, matrix)) + + def test_java_to_python(self): + combinations = [ # (n_rows, n_cols) + (5, 1), + (10, 10), + ] + + for n_rows, n_cols in combinations: + matrix = np.random.random((n_rows, n_cols)) + + # Create a CSV file to read into SystemDS + pd.DataFrame(matrix).to_csv( + self.temp_dir + "out_of_systemds_matrix.csv", header=False, index=False + ) + + matrix_sds = self.sds.read( + self.temp_dir + "out_of_systemds_matrix.csv", + data_type="matrix", + format="csv", + ) + matrix_out = matrix_sds.compute() + + # Verify the data + self.assertTrue(np.allclose(matrix_out, matrix)) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java new file mode 100644 index 00000000000..650d6c1053f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.utils; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.UnixPipeUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertArrayEquals; + + +@RunWith(Enclosed.class) +public class UnixPipeUtilsTest { + + @RunWith(Parameterized.class) + public static class ParameterizedTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: type={0}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {Types.ValueType.FP64, 6, 48, 99, new MatrixBlock(2, 3, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})}, + {Types.ValueType.FP32, 6, 24, 88, new MatrixBlock(3, 2, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})}, + {Types.ValueType.INT32, 4, 16, 77, new MatrixBlock(2, 2, new double[]{0, -1, 2, -3})}, + {Types.ValueType.UINT8, 4, 4, 66, new MatrixBlock(2, 2, new double[]{0, 1, 2, 3})} + }); + } + + private final Types.ValueType type; + private final int numElem; + private final int batchSize; + private final int id; + private final MatrixBlock matrixBlock; + + + public ParameterizedTest(Types.ValueType type, int numElem, int batchSize, int id, MatrixBlock matrixBlock) { + this.type = type; + this.numElem = numElem; + this.batchSize = batchSize; + this.id = id; + this.matrixBlock = matrixBlock; + } + + @Test + public void testReadWriteNumpyArrayBatch() throws IOException { + File tempFile = folder.newFile("pipe_test_" + type.name()); + + try (BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + UnixPipeUtils.writeNumpyArrayInBatches(out, id, batchSize, numElem, type, matrixBlock); + } + + double[] output = new double[numElem]; + try (BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, output, 0); + } + + assertArrayEquals(matrixBlock.getDenseBlockValues(), output, 1e-9); + } + } + + public static class NonParameterizedTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test(expected = FileNotFoundException.class) + public void testOpenInputFileNotFound() throws IOException { + // instantiate class once for coverage + new UnixPipeUtils(); + + // Create a path that does not exist + File nonExistentFile = new File(folder.getRoot(), "nonexistent.pipe"); + + // This should throw FileNotFoundException + UnixPipeUtils.openInput(nonExistentFile.getAbsolutePath(), 123); + } + + @Test(expected = FileNotFoundException.class) + public void testOpenOutputFileNotFound() throws IOException { + // Create a path that does not exist + File nonExistentFile = new File(folder.getRoot(), "nonexistent.pipe"); + + // This should throw FileNotFoundException + UnixPipeUtils.openOutput(nonExistentFile.getAbsolutePath(), 123); + } + + + @Test + public void testOpenInputAndOutputHandshakeMatch() throws IOException { + File tempFile = folder.newFile("pipe_test1"); + int id = 42; + + // Write expected handshake + try (BufferedOutputStream bos = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {} + + // Read and validate handshake + try (BufferedInputStream bis = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + // success: no exception = handshake passed + } + } + + @Test(expected = IllegalStateException.class) + public void testOpenInputHandshakeMismatch() throws IOException { + File tempFile = folder.newFile("pipe_test2"); + int writeId = 123; + int wrongReadId = 456; + + try (BufferedOutputStream bos = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), writeId)) {} + + // Will throw due to ID mismatch + UnixPipeUtils.openInput(tempFile.getAbsolutePath(), wrongReadId); + } + + @Test(expected = IOException.class) + public void testOpenInputIncompleteHandshake() throws IOException { + File tempFile = folder.newFile("short_handshake.pipe"); + + // Write only 2 bytes instead of 4 + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + fos.write(new byte[]{0x01, 0x02}); + } + + UnixPipeUtils.openInput(tempFile.getAbsolutePath(), 100); + } + + @Test(expected = EOFException.class) + public void testReadNumpyArrayUnexpectedEOF() throws IOException { + File tempFile = folder.newFile("pipe_test5"); + int id = 12; + int numElem = 5; + int batchSize = 40; + Types.ValueType type = Types.ValueType.FP64; + + // Write partial data (handshake + 3 doubles instead of 5) + try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(tempFile))) { + ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(id + 1000); + out.write(bb.array()); + + // Write 3 doubles only + bb = ByteBuffer.allocate(8 * 3).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < 3; i++) + bb.putDouble(i + 1.0); + out.write(bb.array()); + + // no end handshake + out.flush(); + } + + double[] outArr = new double[numElem]; + try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(tempFile))) { + UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, outArr, 0); + } + } + } +} diff --git a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java index 549d4cde7b6..b048e66e95c 100644 --- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java +++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java @@ -23,16 +23,27 @@ import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.apache.sysds.api.PythonDMLScript; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.UnixPipeUtils; import org.apache.sysds.test.LoggingUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import py4j.GatewayServer; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; import java.security.Permission; import java.util.List; +import static org.junit.Assert.assertArrayEquals; + /** Simple tests to verify startup of Python Gateway server happens without crashes */ public class StartupTest { @@ -58,18 +69,36 @@ public void tearDown() { } private void assertLogMessages(String... expectedMessages) { + assertLogMessages(true, expectedMessages); + } + + private void assertLogMessages(boolean strict, String... expectedMessages) { List log = LoggingUtils.reinsert(appender); log.stream().forEach(l -> System.out.println(l.getMessage())); - Assert.assertEquals("Unexpected number of log messages", expectedMessages.length, log.size()); + if (strict){ + Assert.assertEquals("Unexpected number of log messages", expectedMessages.length, log.size()); - for (int i = 0; i < expectedMessages.length; i++) { - // order does not matter - boolean found = false; + for (int i = 0; i < expectedMessages.length; i++) { + // order does not matter + boolean found = false; + for (String message : expectedMessages) { + found |= log.get(i).getMessage().toString().startsWith(message); + } + Assert.assertTrue("Unexpected log message: " + log.get(i).getMessage(),found); + } + } else { for (String message : expectedMessages) { - found |= log.get(i).getMessage().toString().startsWith(message); + // order does not matter + boolean found = false; + + for (LoggingEvent loggingEvent : log) { + found |= loggingEvent.getMessage().toString().startsWith(message); + } + Assert.assertTrue("Expected log message not found: " + message,found); } - Assert.assertTrue("Unexpected log message: " + log.get(i).getMessage(),found); } + + } @Test(expected = Exception.class) @@ -108,7 +137,7 @@ public void testStartupIncorrect_6() throws Exception { PythonDMLScript.main(new String[]{"-python", "4001"}); Thread.sleep(200); } catch (SecurityException e) { - assertLogMessages( + assertLogMessages(false, "GatewayServer started", "failed startup" ); @@ -125,6 +154,7 @@ public void testStartupCorrect() throws Exception { PythonDMLScript.GwS.shutdown(); Thread.sleep(200); assertLogMessages( + false, "GatewayServer started", "Starting JVM shutdown", "Shutdown done", @@ -132,6 +162,98 @@ public void testStartupCorrect() throws Exception { ); } + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testDataTransfer() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4003"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + + File in = folder.newFile("py2java-0"); + File out = folder.newFile("java2py-0"); + + // Init Test + BufferedOutputStream py2java = UnixPipeUtils.openOutput(in.getAbsolutePath(), 0); + script.openPipes(folder.getRoot().getPath(), 1); + BufferedInputStream java2py = UnixPipeUtils.openInput(out.getAbsolutePath(), 0); + + // Write Test + double[] data = new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0}; + MatrixBlock mb = new MatrixBlock(2, 3, data); + script.startWritingMbToPipe(0, mb); + double[] rcv_data = new double[data.length]; + UnixPipeUtils.readNumpyArrayInBatches(java2py, 0, 32, data.length, Types.ValueType.FP64, rcv_data, 0); + assertArrayEquals(data, rcv_data, 1e-9); + + // Read Test + UnixPipeUtils.writeNumpyArrayInBatches(py2java, 0, 32, data.length, Types.ValueType.FP64, mb); + MatrixBlock rcv_mb = script.startReadingMbFromPipe(0, 2, 3, Types.ValueType.FP64); + assertArrayEquals(data, rcv_mb.getDenseBlockValues(), 1e-9); + + + script.closePipes(); + + PythonDMLScript.GwS.shutdown(); + Thread.sleep(200); + } + + @Test + public void testDataTransferMultiPipes() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4004"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + + File in = folder.newFile("py2java-0"); + folder.newFile("java2py-0"); + File in2 = folder.newFile("py2java-1"); + folder.newFile("java2py-1"); + + // Init Test + BufferedOutputStream py2java = UnixPipeUtils.openOutput(in.getAbsolutePath(), 0); + BufferedOutputStream py2java2 = UnixPipeUtils.openOutput(in2.getAbsolutePath(), 1); + script.openPipes(folder.getRoot().getPath(), 2); + + // Read Test + double[] data = new double[]{1.0, 2.0, 3.0}; + MatrixBlock mb = new MatrixBlock(3, 1, data); + UnixPipeUtils.writeNumpyArrayInBatches(py2java, 0, 32, 3, Types.ValueType.FP64, mb); + UnixPipeUtils.writeNumpyArrayInBatches(py2java2, 1, 32, 3, Types.ValueType.FP64, mb); + MatrixBlock rcv_mb = script.startReadingMbFromPipes(new int[]{3,3}, 6, 1, Types.ValueType.FP64); + data = new double[]{1.0, 2.0, 3.0, 1.0, 2.0, 3.0}; + assertArrayEquals(data, rcv_mb.getDenseBlockValues(), 1e-9); + + script.closePipes(); + + PythonDMLScript.GwS.shutdown(); + Thread.sleep(200); + } + + @Test(expected = DMLRuntimeException.class) + public void testDataTransferNotInit1() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4005"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startReadingMbFromPipe(0, 2, 3, Types.ValueType.FP64); + } + + @Test(expected = DMLRuntimeException.class) + public void testDataTransferNotInit2() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4006"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startWritingMbToPipe(0, null); + } + + @Test(expected = DMLRuntimeException.class) + public void testDataTransferNotInit3() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4007"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startReadingMbFromPipes(new int[]{3,3}, 2, 3, Types.ValueType.FP64); + } + @SuppressWarnings("removal") class NoExitSecurityManager extends SecurityManager { @Override @@ -142,4 +264,20 @@ public void checkExit(int status) { throw new SecurityException("Intercepted exit()"); } } + + @Test(expected = DMLRuntimeException.class) + public void testDataTransferMaxValue1() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4008"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startReadingMbFromPipe(0, Integer.MAX_VALUE, 3, Types.ValueType.FP64); + } + + @Test(expected = DMLRuntimeException.class) + public void testDataTransferMaxValue2() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4009"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startReadingMbFromPipes(new int[]{3,3}, Integer.MAX_VALUE, 2, Types.ValueType.FP64); + } }