From 2c3b5d93ce7c77e9113a4901c1ab7f18c2fc58f5 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Tue, 14 Jan 2025 15:18:32 -0500 Subject: [PATCH 01/10] Add fromBytes and fromFile static "make" methods to arrow.io.ipc.RecordBatchStreamReader MATLAB class. --- .../ipc/proxy/record_batch_stream_reader.cc | 57 ++++++++++++++----- .../io/ipc/proxy/record_batch_stream_reader.h | 4 ++ .../+arrow/+io/+ipc/RecordBatchStreamReader.m | 22 +++++-- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index f3c833484d38..cbd9c5562e29 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/matlab/buffer/matlab_buffer.h" #include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" #include "arrow/io/file.h" +#include "arrow/io/memory.h" #include "arrow/matlab/error/error.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" @@ -36,6 +38,35 @@ RecordBatchStreamReader::RecordBatchStreamReader( REGISTER_METHOD(RecordBatchStreamReader, readTable); } +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + const mda::StructArray opts = constructor_arguments[0]; + const mda::StringArray filename_mda = opts[0]["Filename"]; + const auto filename_utf16 = std::u16string(filename_mda[0]); + MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, + arrow::util::UTF16StringToUTF8(filename_utf16), + error::UNICODE_CONVERSION_ERROR_ID); + + MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), + error::FAILED_TO_OPEN_FILE_FOR_READ); + + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(input_stream), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + + return std::make_shared(std::move(reader)); +} + +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + const mda::StructArray opts = constructor_arguments[0]; + const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; + const auto matlab_buffer = std::make_shared(bytes_mda); + auto buffer_reader = std::make_shared(matlab_buffer); + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + return std::make_shared(std::move(reader)); +} + libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const libmexclass::proxy::FunctionArguments& constructor_arguments) { namespace mda = ::matlab::data; @@ -44,20 +75,18 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const mda::StructArray opts = constructor_arguments[0]; - const mda::StringArray filename_mda = opts[0]["Filename"]; - const auto filename_utf16 = std::u16string(filename_mda[0]); - MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, - arrow::util::UTF16StringToUTF8(filename_utf16), - error::UNICODE_CONVERSION_ERROR_ID); - - MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), - error::FAILED_TO_OPEN_FILE_FOR_READ); - - MATLAB_ASSIGN_OR_ERROR(auto reader, - arrow::ipc::RecordBatchStreamReader::Open(input_stream), - error::IPC_RECORD_BATCH_READER_OPEN_FAILED); - - return std::make_shared(std::move(reader)); + // Dispatch to the appropriate static "make" method depending + // on the input type. + const mda::StringArray type_mda = opts[0]["Type"]; + const auto type_utf16 = std::u16string(type_mda[0]); + if (type_utf16.equals(u"Bytes")) { + return RecordBatchStreamReader::fromBytes(constructor_arguments); + } else if (type_utf16.equals(u"Filename")) { + return RecordBatchStreamReader::fromFile(constructor_arguments); + } else { + // TODO: Create static error id string + return libmexclass::error::Error{"arrow:some:test:id", "Invalid construction type for RecordBatchStreamReader."}; + } } void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) { diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h index 56fb29398782..0492c46dc04c 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy { static libmexclass::proxy::MakeResult make( const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromFile( + const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromBytes( + const libmexclass::proxy::FunctionArguments& constructor_arguments); protected: std::shared_ptr reader; diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index 60ca38eba9ad..4ce58f120854 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -26,14 +26,26 @@ Schema end + methods (Static) + function obj = fromBytes(bytes) + args = struct(Bytes=bytes, Type="Bytes"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + obj.Proxy = arrow.internal.proxy.create(proxyName, args); + end + + function obj = fromFile(filename) + args = struct(Filename=filename, Type="File"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + obj.Proxy = arrow.internal.proxy.create(proxyName, args); + end + end + methods - function obj = RecordBatchStreamReader(filename) + function obj = RecordBatchStreamReader(proxy) arguments - filename(1, 1) string {mustBeNonzeroLengthText} + proxy(1, 1) libmexclass.proxy.Proxy end - args = struct(Filename=filename); - proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); + obj.Proxy = proxy; end function schema = get.Schema(obj) From 4510a11c11390a867eb646e899353180aa24fefa Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Tue, 14 Jan 2025 15:26:15 -0500 Subject: [PATCH 02/10] Fix compilation errors. --- .../ipc/proxy/record_batch_stream_reader.cc | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index cbd9c5562e29..74dacb3beb61 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -39,6 +39,10 @@ RecordBatchStreamReader::RecordBatchStreamReader( } libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; + const mda::StructArray opts = constructor_arguments[0]; const mda::StringArray filename_mda = opts[0]["Filename"]; const auto filename_utf16 = std::u16string(filename_mda[0]); @@ -57,10 +61,14 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexcla } libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; + const mda::StructArray opts = constructor_arguments[0]; const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; - const auto matlab_buffer = std::make_shared(bytes_mda); - auto buffer_reader = std::make_shared(matlab_buffer); + const auto matlab_buffer = std::make_shared(bytes_mda); + auto buffer_reader = std::make_shared(matlab_buffer); MATLAB_ASSIGN_OR_ERROR(auto reader, arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), error::IPC_RECORD_BATCH_READER_OPEN_FAILED); @@ -69,19 +77,16 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexcl libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const libmexclass::proxy::FunctionArguments& constructor_arguments) { - namespace mda = ::matlab::data; - using RecordBatchStreamReaderProxy = - arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; - + namespace mda = ::matlab::data; const mda::StructArray opts = constructor_arguments[0]; // Dispatch to the appropriate static "make" method depending // on the input type. const mda::StringArray type_mda = opts[0]["Type"]; const auto type_utf16 = std::u16string(type_mda[0]); - if (type_utf16.equals(u"Bytes")) { + if (type_utf16 == u"Bytes") { return RecordBatchStreamReader::fromBytes(constructor_arguments); - } else if (type_utf16.equals(u"Filename")) { + } else if (type_utf16 == u"Filename") { return RecordBatchStreamReader::fromFile(constructor_arguments); } else { // TODO: Create static error id string From 1f4fa230ea0da4f48a535d5125f2d122cbc60f73 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Tue, 14 Jan 2025 15:30:15 -0500 Subject: [PATCH 03/10] Fix initialization of Proxy instance in fromFile and fromBytes. --- matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index 4ce58f120854..505a07363c66 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -30,13 +30,15 @@ function obj = fromBytes(bytes) args = struct(Bytes=bytes, Type="Bytes"); proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); end function obj = fromFile(filename) args = struct(Filename=filename, Type="File"); proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); end end From 555629e691c34ada0a7081e798a3a6f8815b33d2 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Tue, 14 Jan 2025 15:38:27 -0500 Subject: [PATCH 04/10] Add error ID arrow:io:ipc:InvalidConstructionType. --- matlab/src/cpp/arrow/matlab/error/error.h | 1 + .../arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/matlab/src/cpp/arrow/matlab/error/error.h b/matlab/src/cpp/arrow/matlab/error/error.h index 425e089d9f2f..729e0f1d0503 100644 --- a/matlab/src/cpp/arrow/matlab/error/error.h +++ b/matlab/src/cpp/arrow/matlab/error/error.h @@ -247,6 +247,7 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED = static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed"; static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED = "arrow:io:ipc:FailedToOpenRecordBatchReader"; +static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE = "arrow:io:ipc:InvalidConstructionType"; static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex"; static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed"; static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed"; diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index 74dacb3beb61..259f0271a946 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -86,11 +86,10 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const auto type_utf16 = std::u16string(type_mda[0]); if (type_utf16 == u"Bytes") { return RecordBatchStreamReader::fromBytes(constructor_arguments); - } else if (type_utf16 == u"Filename") { + } else if (type_utf16 == u"File") { return RecordBatchStreamReader::fromFile(constructor_arguments); } else { - // TODO: Create static error id string - return libmexclass::error::Error{"arrow:some:test:id", "Invalid construction type for RecordBatchStreamReader."}; + return libmexclass::error::Error{"arrow:io:ipc:InvalidConstructionType", "Invalid construction type for RecordBatchStreamReader."}; } } From abdc67da7d1196a9ec64e329f391c4d444cf59ac Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 09:43:29 -0500 Subject: [PATCH 05/10] Add input argument validation to fromFile and fromBytes. --- .../matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index 505a07363c66..bb0408439635 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -28,18 +28,24 @@ methods (Static) function obj = fromBytes(bytes) + arguments + bytes(:, 1) uint8 + end args = struct(Bytes=bytes, Type="Bytes"); proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; proxy = arrow.internal.proxy.create(proxyName, args); obj = arrow.io.ipc.RecordBatchStreamReader(proxy); - end + end function obj = fromFile(filename) + arguments + filename(1, 1) string {mustBeNonzeroLengthText} + end args = struct(Filename=filename, Type="File"); proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; proxy = arrow.internal.proxy.create(proxyName, args); obj = arrow.io.ipc.RecordBatchStreamReader(proxy); - end + end end methods From 6190e4b2beef7822316545cd5bfa21217671513f Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 13:24:30 -0500 Subject: [PATCH 06/10] Parameterize RecordBatchStreamReader tests against fromFile and fromBytes constructor functions. --- .../arrow/io/ipc/tRecordBatchStreamReader.m | 95 ++++++++++++------- 1 file changed, 59 insertions(+), 36 deletions(-) diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m index 6ca67197739a..0f7b489cce86 100644 --- a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m +++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m @@ -25,9 +25,32 @@ end properties (TestParameter) + RecordBatchStreamReaderConstructorFcn = {@tRecordBatchStreamReader.FromBytes, @arrow.io.ipc.RecordBatchStreamReader.fromFile} RecordBatchReadFcn = {@read, @readRecordBatch} end + methods (Static) + % Read the given file into memory as an array of bytes (uint8) + % and then construct a RecordBatchStreamReader from the bytes. + function reader = FromBytes(filename) + if ismissing(filename) + % Simulate the behavior of fromFile when a filename + % that is a missing string value is supplied. + error(message("MATLAB:validators:mustBeNonzeroLengthText", "")) + end + fid = fopen(filename, "r"); + try + bytes = fread(fid, "uint8=>uint8"); + catch e + % Simulate the behavior of fromFile when an invalid + % filename is supplied. + error(message("MATLAB:validators:mustBeNonzeroLengthText", "")) + end + fclose(fid); + reader = arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes); + end + end + methods(TestClassSetup) function setupDataFolder(testCase) @@ -82,19 +105,19 @@ function setupMultipleBatchStreamFile(testCase) methods (Test) - function ZeroLengthFilenameError(testCase) + function ZeroLengthFilenameError(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a zero length string. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(""); + fcn = @() RecordBatchStreamReaderConstructorFcn(""); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end - function MissingStringFilenameError(testCase) + function MissingStringFilenameError(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a missing string. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(string(missing)); + fcn = @() RecordBatchStreamReaderConstructorFcn(string(missing)); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end @@ -106,43 +129,43 @@ function FilenameInvalidTypeError(testCase) testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function Schema(testCase) + function Schema(testCase, RecordBatchStreamReaderConstructorFcn) % Verify the getter method for Schema returns the % expected value. fieldA = arrow.field("A", arrow.string()); fieldB = arrow.field("B", arrow.float32()); expectedSchema = arrow.schema([fieldA fieldB]); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); testCase.verifyEqual(reader.Schema, expectedSchema); end - function SchemaNoSetter(testCase) + function SchemaNoSetter(testCase, RecordBatchStreamReaderConstructorFcn) % Verify the Schema property is not settable. fieldC = arrow.field("C", arrow.date32()); schema = arrow.schema(fieldC); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); testCase.verifyError(@() setfield(reader, "Schema", schema), "MATLAB:class:SetProhibited"); end - function ReadErrorIfEndOfStream(testCase, RecordBatchReadFcn) + function ReadErrorIfEndOfStream(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read throws an execption with the identifier arrow:io:ipc:EndOfStream % on an Arrow IPC Stream file containing zero batches. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); fcn = @() RecordBatchReadFcn(reader); testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn) + function ReadOneBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read can successfully read an Arrow IPC Stream file % containing one batch. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); expectedMatlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected = arrow.recordBatch(expectedMatlabTable); @@ -153,10 +176,10 @@ function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn) testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn) + function ReadMultipleBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify read can successfully read an Arrow IPC Stream file % containing mulitple batches. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); expectedMatlabTable1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected1 = arrow.recordBatch(expectedMatlabTable1); @@ -172,12 +195,12 @@ function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn) testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); end - function HasNext(testCase, RecordBatchReadFcn) + function HasNext(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify that the hasnext method returns true the correct % number of times depending on the number of record % batches in an Arrow IPC Stream format. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); % hasnext should return true 0 times for a 0 batch file. iterations = 0; while reader.hasnext() @@ -186,7 +209,7 @@ function HasNext(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 0); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); % hasnext should return true 1 time for a 1 batch file. iterations = 0; while reader.hasnext() @@ -195,7 +218,7 @@ function HasNext(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 1); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); % hasnext should return true 2 times for a 2 batch file. iterations = 0; while reader.hasnext() @@ -205,12 +228,12 @@ function HasNext(testCase, RecordBatchReadFcn) testCase.verifyEqual(iterations, 2); end - function Done(testCase, RecordBatchReadFcn) + function Done(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify that the done method returns false the correct % number of times depending on the number of record % batches in an Arrow IPC Stream format. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); % done should return false 0 times for a 0 batch file. iterations = 0; while ~reader.done() @@ -219,7 +242,7 @@ function Done(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 0); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); % done should return false 1 time for a 1 batch file. iterations = 0; while ~reader.done() @@ -228,7 +251,7 @@ function Done(testCase, RecordBatchReadFcn) end testCase.verifyEqual(iterations, 1); - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); % done should return false 2 times for a 2 batch file. iterations = 0; while ~reader.done() @@ -238,40 +261,40 @@ function Done(testCase, RecordBatchReadFcn) testCase.verifyEqual(iterations, 2); end - function ReadTableZeroBatchStreamFile(testCase) + function ReadTableZeroBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing zero batches as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.ZeroBatchStreamFile); matlabTable = table('Size', [0, 2], 'VariableTypes', ["string", "single"], 'VariableNames', ["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableOneBatchStreamFile(testCase) + function ReadTableOneBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing one batch as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.OneBatchStreamFile); matlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableMultipleBatchStreamFile(testCase) + function ReadTableMultipleBatchStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify read can successfully read an Arrow IPC Stream file % containing multiple batches as an arrow.tabular.Table. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); matlabTable = table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]); expected = arrow.table(matlabTable); actual = reader.readTable(); testCase.verifyEqual(actual, expected); end - function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn) + function ReadTableAfterReadRecordBatch(testCase, RecordBatchStreamReaderConstructorFcn, RecordBatchReadFcn) % Verify readTable returns only the remaining record batches % in an Arrow IPC Stream file after calling readRecordBatch first. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); testCase.verifyTrue(reader.hasnext()); testCase.verifyFalse(reader.done()); @@ -292,10 +315,10 @@ function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn) testCase.verifyTrue(reader.done()); end - function ReadTableMultipleCalls(testCase) + function ReadTableMultipleCalls(testCase, RecordBatchStreamReaderConstructorFcn) % Verify readTable returns an empty table if it is called % multiple times in a row. - reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + reader = RecordBatchStreamReaderConstructorFcn(testCase.MultipleBatchStreamFile); expected = arrow.table(... table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]) ... @@ -323,11 +346,11 @@ function ReadTableMultipleCalls(testCase) testCase.verifyTrue(reader.done()); end - function ErrorIfNotIpcStreamFile(testCase) + function ErrorIfNotIpcStreamFile(testCase, RecordBatchStreamReaderConstructorFcn) % Verify RecordBatchStreamReader throws an exception with the % identifier arrow:io:ipc:FailedToOpenRecordBatchReader if % the provided file is not an Arrow IPC Stream file. - fcn = @() arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile); + fcn = @() RecordBatchStreamReaderConstructorFcn(testCase.RandomAccessFile); testCase.verifyError(fcn, "arrow:io:ipc:FailedToOpenRecordBatchReader"); end From 7d76b8ed534d3d07f20a3e8148b4f968aa5d4051 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 13:27:38 -0500 Subject: [PATCH 07/10] Add readBytes helper method. --- matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m index 0f7b489cce86..208a3c280a4c 100644 --- a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m +++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m @@ -30,9 +30,9 @@ end methods (Static) - % Read the given file into memory as an array of bytes (uint8) - % and then construct a RecordBatchStreamReader from the bytes. - function reader = FromBytes(filename) + + % Read the given file into memory as an array of bytes (uint8). + function bytes = readBytes(filename) if ismissing(filename) % Simulate the behavior of fromFile when a filename % that is a missing string value is supplied. @@ -47,8 +47,15 @@ error(message("MATLAB:validators:mustBeNonzeroLengthText", "")) end fclose(fid); + end + + % Read the given file into memory as bytes and then construct a + % RecordBatchStreamReader from the bytes. + function reader = FromBytes(filename) + bytes = tRecordBatchStreamReader.readBytes(filename); reader = arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes); end + end methods(TestClassSetup) From a0b58c3a542f6253aec669db339796c69955eaf8 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 16:12:03 -0500 Subject: [PATCH 08/10] Add test to verify that an error is thrown if a value that is not a Proxy object is supplied to the RecordBatchStreamReader constructor. --- matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m index 208a3c280a4c..ad52726f3cb7 100644 --- a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m +++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m @@ -361,6 +361,14 @@ function ErrorIfNotIpcStreamFile(testCase, RecordBatchStreamReaderConstructorFcn testCase.verifyError(fcn, "arrow:io:ipc:FailedToOpenRecordBatchReader"); end + function ErrorIfNotProxy(testCase) + % Verify the RecordBatchStreamReader constructor throws an exception + % with the identifier MATLAB:validation:UnableToConvert if the input + % is not a Proxy object. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile); + testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); + end + end end From 2c9de9bd1ba9be05bfce44fe21628b3e0f148ba4 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 16:33:05 -0500 Subject: [PATCH 09/10] Fix linting errors from CI. --- matlab/src/cpp/arrow/matlab/error/error.h | 3 +- .../ipc/proxy/record_batch_stream_reader.cc | 71 ++++++++++--------- 2 files changed, 40 insertions(+), 34 deletions(-) diff --git a/matlab/src/cpp/arrow/matlab/error/error.h b/matlab/src/cpp/arrow/matlab/error/error.h index 729e0f1d0503..47bde56dacf8 100644 --- a/matlab/src/cpp/arrow/matlab/error/error.h +++ b/matlab/src/cpp/arrow/matlab/error/error.h @@ -247,7 +247,8 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED = static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed"; static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED = "arrow:io:ipc:FailedToOpenRecordBatchReader"; -static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE = "arrow:io:ipc:InvalidConstructionType"; +static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE = + "arrow:io:ipc:InvalidConstructionType"; static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex"; static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed"; static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed"; diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index 259f0271a946..1704eee8f7a7 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/matlab/buffer/matlab_buffer.h" #include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" +#include "arrow/matlab/buffer/matlab_buffer.h" #include "arrow/matlab/error/error.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" @@ -38,46 +38,49 @@ RecordBatchStreamReader::RecordBatchStreamReader( REGISTER_METHOD(RecordBatchStreamReader, readTable); } -libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexclass::proxy::FunctionArguments& constructor_arguments) { - namespace mda = ::matlab::data; - using RecordBatchStreamReaderProxy = +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; - const mda::StructArray opts = constructor_arguments[0]; - const mda::StringArray filename_mda = opts[0]["Filename"]; - const auto filename_utf16 = std::u16string(filename_mda[0]); - MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, - arrow::util::UTF16StringToUTF8(filename_utf16), - error::UNICODE_CONVERSION_ERROR_ID); + const mda::StructArray opts = constructor_arguments[0]; + const mda::StringArray filename_mda = opts[0]["Filename"]; + const auto filename_utf16 = std::u16string(filename_mda[0]); + MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, + arrow::util::UTF16StringToUTF8(filename_utf16), + error::UNICODE_CONVERSION_ERROR_ID); - MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), - error::FAILED_TO_OPEN_FILE_FOR_READ); + MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), + error::FAILED_TO_OPEN_FILE_FOR_READ); - MATLAB_ASSIGN_OR_ERROR(auto reader, - arrow::ipc::RecordBatchStreamReader::Open(input_stream), - error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(input_stream), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); - return std::make_shared(std::move(reader)); + return std::make_shared(std::move(reader)); } -libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexclass::proxy::FunctionArguments& constructor_arguments) { - namespace mda = ::matlab::data; - using RecordBatchStreamReaderProxy = - arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; - - const mda::StructArray opts = constructor_arguments[0]; - const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; - const auto matlab_buffer = std::make_shared(bytes_mda); - auto buffer_reader = std::make_shared(matlab_buffer); - MATLAB_ASSIGN_OR_ERROR(auto reader, - arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), - error::IPC_RECORD_BATCH_READER_OPEN_FAILED); - return std::make_shared(std::move(reader)); +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; + + const mda::StructArray opts = constructor_arguments[0]; + const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; + const auto matlab_buffer = + std::make_shared(bytes_mda); + auto buffer_reader = std::make_shared(matlab_buffer); + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + return std::make_shared(std::move(reader)); } libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const libmexclass::proxy::FunctionArguments& constructor_arguments) { - namespace mda = ::matlab::data; + namespace mda = ::matlab::data; const mda::StructArray opts = constructor_arguments[0]; // Dispatch to the appropriate static "make" method depending @@ -85,11 +88,13 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const mda::StringArray type_mda = opts[0]["Type"]; const auto type_utf16 = std::u16string(type_mda[0]); if (type_utf16 == u"Bytes") { - return RecordBatchStreamReader::fromBytes(constructor_arguments); + return RecordBatchStreamReader::fromBytes(constructor_arguments); } else if (type_utf16 == u"File") { - return RecordBatchStreamReader::fromFile(constructor_arguments); + return RecordBatchStreamReader::fromFile(constructor_arguments); } else { - return libmexclass::error::Error{"arrow:io:ipc:InvalidConstructionType", "Invalid construction type for RecordBatchStreamReader."}; + return libmexclass::error::Error{ + "arrow:io:ipc:InvalidConstructionType", + "Invalid construction type for RecordBatchStreamReader."}; } } From e8124967aa3431320b2c29d1aa24566c31d484c3 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Wed, 15 Jan 2025 16:37:29 -0500 Subject: [PATCH 10/10] Fix indentation. --- .../+arrow/+io/+ipc/RecordBatchStreamReader.m | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index bb0408439635..44f70815ed07 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -27,25 +27,25 @@ end methods (Static) - function obj = fromBytes(bytes) - arguments - bytes(:, 1) uint8 - end - args = struct(Bytes=bytes, Type="Bytes"); - proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - proxy = arrow.internal.proxy.create(proxyName, args); - obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + function obj = fromBytes(bytes) + arguments + bytes(:, 1) uint8 end + args = struct(Bytes=bytes, Type="Bytes"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + end - function obj = fromFile(filename) - arguments - filename(1, 1) string {mustBeNonzeroLengthText} - end - args = struct(Filename=filename, Type="File"); - proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - proxy = arrow.internal.proxy.create(proxyName, args); - obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + function obj = fromFile(filename) + arguments + filename(1, 1) string {mustBeNonzeroLengthText} end + args = struct(Filename=filename, Type="File"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + proxy = arrow.internal.proxy.create(proxyName, args); + obj = arrow.io.ipc.RecordBatchStreamReader(proxy); + end end methods