diff --git a/matlab/src/cpp/arrow/matlab/error/error.h b/matlab/src/cpp/arrow/matlab/error/error.h index 425e089d9f2f..47bde56dacf8 100644 --- a/matlab/src/cpp/arrow/matlab/error/error.h +++ b/matlab/src/cpp/arrow/matlab/error/error.h @@ -247,6 +247,8 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED = static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed"; static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED = "arrow:io:ipc:FailedToOpenRecordBatchReader"; +static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE = + "arrow:io:ipc:InvalidConstructionType"; static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex"; static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed"; static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed"; diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index f3c833484d38..1704eee8f7a7 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -17,6 +17,8 @@ #include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" #include "arrow/io/file.h" +#include "arrow/io/memory.h" +#include "arrow/matlab/buffer/matlab_buffer.h" #include "arrow/matlab/error/error.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" @@ -36,14 +38,13 @@ RecordBatchStreamReader::RecordBatchStreamReader( REGISTER_METHOD(RecordBatchStreamReader, readTable); } -libmexclass::proxy::MakeResult RecordBatchStreamReader::make( +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile( const libmexclass::proxy::FunctionArguments& constructor_arguments) { namespace mda = ::matlab::data; using RecordBatchStreamReaderProxy = arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; const mda::StructArray opts = constructor_arguments[0]; - const mda::StringArray filename_mda = opts[0]["Filename"]; const auto filename_utf16 = std::u16string(filename_mda[0]); MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, @@ -60,6 +61,43 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make( return std::make_shared(std::move(reader)); } +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; + + const mda::StructArray opts = constructor_arguments[0]; + const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; + const auto matlab_buffer = + std::make_shared(bytes_mda); + auto buffer_reader = std::make_shared(matlab_buffer); + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + return std::make_shared(std::move(reader)); +} + +libmexclass::proxy::MakeResult RecordBatchStreamReader::make( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + const mda::StructArray opts = constructor_arguments[0]; + + // Dispatch to the appropriate static "make" method depending + // on the input type. + const mda::StringArray type_mda = opts[0]["Type"]; + const auto type_utf16 = std::u16string(type_mda[0]); + if (type_utf16 == u"Bytes") { + return RecordBatchStreamReader::fromBytes(constructor_arguments); + } else if (type_utf16 == u"File") { + return RecordBatchStreamReader::fromFile(constructor_arguments); + } else { + return libmexclass::error::Error{ + "arrow:io:ipc:InvalidConstructionType", + "Invalid construction type for RecordBatchStreamReader."}; + } +} + void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) { namespace mda = ::matlab::data; using SchemaProxy = arrow::matlab::tabular::proxy::Schema; diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h index 56fb29398782..0492c46dc04c 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy { static libmexclass::proxy::MakeResult make( const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromFile( + const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromBytes( + const libmexclass::proxy::FunctionArguments& constructor_arguments); protected: std::shared_ptr reader; diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index 60ca38eba9ad..44f70815ed07 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -26,14 +26,34 @@ Schema end - methods - function obj = RecordBatchStreamReader(filename) + methods (Static) + function obj = fromBytes(bytes) + arguments + bytes(:, 1) uint8 + end + args = struct(Bytes=bytes, Type="Bytes"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + end + + function obj = fromFile(filename) arguments filename(1, 1) string {mustBeNonzeroLengthText} end - args = struct(Filename=filename); + args = struct(Filename=filename, Type="File"); proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + end + end + + methods + function obj = RecordBatchStreamReader(proxy) + arguments + proxy(1, 1) libmexclass.proxy.Proxy + end + obj.Proxy = proxy; end function schema = get.Schema(obj) diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m index 6ca67197739a..ad52726f3cb7 100644 --- a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m +++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m @@ -25,9 +25,39 @@ end properties (TestParameter) + RecordBatchStreamReaderConstructorFcn = {@tRecordBatchStreamReader.FromBytes, @arrow.io.ipc.RecordBatchStreamReader.fromFile} RecordBatchReadFcn = {@read, @readRecordBatch} end + methods (Static) + + % Read the given file into memory as an array of bytes (uint8). + function bytes = readBytes(filename) + if ismissing(filename) + % Simulate the behavior of fromFile when a filename + % that is a missing string value is supplied. + error(message("MATLAB:validators:mustBeNonzeroLengthText", "")) + end + fid = fopen(filename, "r"); + try + bytes = fread(fid, "uint8=>uint8"); + catch e + % Simulate the behavior of fromFile when an invalid + % filename is supplied. + error(message("MATLAB:validators:mustBeNonzeroLengthText", "")) + end + fclose(fid); + end + + % Read the given file into memory as bytes and then construct a + % RecordBatchStreamReader from the bytes. + function reader = FromBytes(filename) + bytes = tRecordBatchStreamReader.readBytes(filename); + reader = arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes); + end + + end + methods(TestClassSetup) function setupDataFolder(testCase) @@ -82,19 +112,19 @@ function setupMultipleBatchStreamFile(testCase) methods (Test) - function ZeroLengthFilenameError(testCase) + function ZeroLengthFilenameError(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a zero length string. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(""); + fcn = @() RecordBatchStreamReaderConstructorFcn(""); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end - function MissingStringFilenameError(testCase) + function MissingStringFilenameError(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a missing string. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(string(missing)); + fcn = @() RecordBatchStreamReaderConstructorFcn(string(missing)); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end @@ -106,43 +136,43 @@ function FilenameInvalidTypeError(testCase) testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function Schema(testCase) + function Schema(testCase, RecordBatchStreamReaderConstructorFcn) % Verify the getter method for Schema returns the % expected value. fieldA = arrow.field("A", arrow.string()); fieldB = arrow.field("B", arrow.float32()); expectedSchema = arrow.schema([fieldA fieldB]); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); end - function SchemaNoSetter(testCase) + function SchemaNoSetter(testCase, RecordBatchStreamReaderConstructorFcn) % Verify the Schema property is not settable. fieldC = arrow.field("C", arrow.date32()); schema = arrow.schema(fieldC); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); testCase.verifyError(@() setfield(reader, "Schema", schema), "MATLAB:class:SetProhibited"); end - function ReadErrorIfEndOfStream(testCase, RecordBatchReadFcn) + function ReadErrorIfEndOfStream(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read throws an execption with the identifier arrow:io:ipc:EndOfStream % on an Arrow IPC Stream file containing zero batches. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); fcn = @() RecordBatchReadFcn(reader); testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn) + function ReadOneBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read can successfully read an Arrow IPC Stream file % containing one batch. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); expectedMatlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected = arrow.recordBatch(expectedMatlabTable); @@ -153,10 +183,10 @@ function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn) testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn) + function ReadMultipleBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read can successfully read an Arrow IPC Stream file % containing mulitple batches. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); expectedMatlabTable1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected1 = arrow.recordBatch(expectedMatlabTable1); @@ -172,12 +202,12 @@ function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn) testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function HasNext(testCase, RecordBatchReadFcn) + function HasNext(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify that the hasnext method returns true the correct % number of times depending on the number of record % batches in an Arrow IPC Stream format. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); % hasnext should return true 0 times for a 0 batch file. iterations = 0; while reader.hasnext() @@ -186,7 +216,7 @@ function HasNext(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 0); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); % hasnext should return true 1 time for a 1 batch file. iterations = 0; while reader.hasnext() @@ -195,7 +225,7 @@ function HasNext(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 1); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); % hasnext should return true 2 times for a 2 batch file. iterations = 0; while reader.hasnext() @@ -205,12 +235,12 @@ function HasNext(testCase, RecordBatchReadFcn) testCase.verifyEqual(iterations, 2); end - function Done(testCase, RecordBatchReadFcn) + function Done(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify that the done method returns false the correct % number of times depending on the number of record % batches in an Arrow IPC Stream format. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); % done should return false 0 times for a 0 batch file. iterations = 0; while ~reader.done() @@ -219,7 +249,7 @@ function Done(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 0); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); % done should return false 1 time for a 1 batch file. iterations = 0; while ~reader.done() @@ -228,7 +258,7 @@ function Done(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 1); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); % done should return false 2 times for a 2 batch file. iterations = 0; while ~reader.done() @@ -238,40 +268,40 @@ function Done(testCase, RecordBatchReadFcn) testCase.verifyEqual(iterations, 2); end - function ReadTableZeroBatchStreamFile(testCase) + function ReadTableZeroBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing zero batches as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); matlabTable = table('Size', [0, 2], 'VariableTypes', ["string", "single"], 'VariableNames', ["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableOneBatchStreamFile(testCase) + function ReadTableOneBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing one batch as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); matlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableMultipleBatchStreamFile(testCase) + function ReadTableMultipleBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing multiple batches as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); matlabTable = table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn) + function ReadTableAfterReadRecordBatch(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify readTable returns only the remaining record batches % in an Arrow IPC Stream file after calling readRecordBatch first. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); testCase.verifyTrue(reader.hasnext()); testCase.verifyFalse(reader.done()); @@ -292,10 +322,10 @@ function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn) testCase.verifyTrue(reader.done()); end - function ReadTableMultipleCalls(testCase) + function ReadTableMultipleCalls(testCase, RecordBatchStreamReaderConstructorFcn) % Verify readTable returns an empty table if it is called % multiple times in a row. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); expected = arrow.table(... table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]) ... @@ -323,14 +353,22 @@ function ReadTableMultipleCalls(testCase) testCase.verifyTrue(reader.done()); end - function ErrorIfNotIpcStreamFile(testCase) + function ErrorIfNotIpcStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier arrow:io:ipc:FailedToOpenRecordBatchReader if % the provided file is not an Arrow IPC Stream file. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile); + fcn = @() RecordBatchStreamReaderConstructorFcn(testCase.RandomAccessFile); testCase.verifyError(fcn, "arrow:io:ipc:FailedToOpenRecordBatchReader"); end + function ErrorIfNotProxy(testCase) + % Verify the RecordBatchStreamReader constructor throws an exception + % with the identifier MATLAB:validation:UnableToConvert if the input + % is not a Proxy object. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile); + testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); + end + end end