From 10837a65d94c9ae9017b293255d367c6d7ce4803 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 12 May 2017 16:09:41 -0400 Subject: [PATCH 1/5] Add abstract stream writer and reader C++ APIs. Rename record batch stream reader and writer classes for better clarity --- c_glib/arrow-glib/stream-reader.cpp | 20 ++-- c_glib/arrow-glib/stream-reader.h | 2 +- c_glib/arrow-glib/stream-reader.hpp | 4 +- c_glib/arrow-glib/writer.cpp | 16 +-- c_glib/arrow-glib/writer.h | 2 +- ci/travis_before_script_cpp.sh | 2 + cpp/CMakeLists.txt | 8 ++ cpp/src/arrow/ipc/file-to-stream.cc | 8 +- cpp/src/arrow/ipc/ipc-read-write-test.cc | 24 ++-- cpp/src/arrow/ipc/json-integration-test.cc | 12 +- cpp/src/arrow/ipc/reader.cc | 54 ++++----- cpp/src/arrow/ipc/reader.h | 105 ++++++++++++----- cpp/src/arrow/ipc/stream-to-file.cc | 8 +- cpp/src/arrow/ipc/writer.cc | 49 ++++---- cpp/src/arrow/ipc/writer.h | 129 ++++++++++++++------- python/doc/source/api.rst | 8 +- python/doc/source/ipc.rst | 23 ++-- python/pyarrow/__init__.py | 7 +- python/pyarrow/includes/libarrow.pxd | 34 +++--- python/pyarrow/io.pxi | 47 +++++--- python/pyarrow/ipc.py | 14 ++- 21 files changed, 354 insertions(+), 222 deletions(-) diff --git a/c_glib/arrow-glib/stream-reader.cpp b/c_glib/arrow-glib/stream-reader.cpp index cc18cd84d314..d1bf0519b1ee 100644 --- a/c_glib/arrow-glib/stream-reader.cpp +++ b/c_glib/arrow-glib/stream-reader.cpp @@ -43,7 +43,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamReaderPrivate_ { - std::shared_ptr stream_reader; + std::shared_ptr stream_reader; } GArrowStreamReaderPrivate; enum { @@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_READER: priv->stream_reader = - *static_cast *>(g_value_get_pointer(value)); + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass) gobject_class->get_property = garrow_stream_reader_get_property; spec = g_param_spec_pointer("stream-reader", - "ipc::StreamReader", - "The raw std::shared *", + "ipc::InputStreamReader", + "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec); @@ -143,10 +143,10 @@ GArrowStreamReader * garrow_stream_reader_new(GArrowInputStream *stream, GError **error) { - std::shared_ptr arrow_stream_reader; + std::shared_ptr arrow_stream_reader; auto status = - arrow::ipc::StreamReader::Open(garrow_input_stream_get_raw(stream), - &arrow_stream_reader); + arrow::ipc::InputStreamReader::Open(garrow_input_stream_get_raw(stream), + &arrow_stream_reader); if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) { return garrow_stream_reader_new_raw(&arrow_stream_reader); } else { @@ -179,7 +179,7 @@ garrow_stream_reader_get_schema(GArrowStreamReader *stream_reader) */ GArrowRecordBatch * garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader, - GError **error) + GError **error) { auto arrow_stream_reader = garrow_stream_reader_get_raw(stream_reader); @@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader, G_END_DECLS GArrowStreamReader * -garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader) +garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader) { auto stream_reader = GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER, @@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr *arrow_st return stream_reader; } -std::shared_ptr +std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader) { GArrowStreamReaderPrivate *priv; diff --git a/c_glib/arrow-glib/stream-reader.h b/c_glib/arrow-glib/stream-reader.h index 2ea2c26a9e54..f6cdaea5f7a9 100644 --- a/c_glib/arrow-glib/stream-reader.h +++ b/c_glib/arrow-glib/stream-reader.h @@ -55,7 +55,7 @@ typedef struct _GArrowStreamReaderClass GArrowStreamReaderClass; /** * GArrowStreamReader: * - * It wraps `arrow::ipc::StreamReader`. + * It wraps `arrow::ipc::InputStreamReader`. */ struct _GArrowStreamReader { diff --git a/c_glib/arrow-glib/stream-reader.hpp b/c_glib/arrow-glib/stream-reader.hpp index ca8e6895a4fd..fdd147e88e31 100644 --- a/c_glib/arrow-glib/stream-reader.hpp +++ b/c_glib/arrow-glib/stream-reader.hpp @@ -24,5 +24,5 @@ #include -GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader); -std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); +GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader); +std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp index 625a19e3b0b0..a83feef21fc0 100644 --- a/c_glib/arrow-glib/writer.cpp +++ b/c_glib/arrow-glib/writer.cpp @@ -47,7 +47,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamWriterPrivate_ { - std::shared_ptr stream_writer; + std::shared_ptr stream_writer; } GArrowStreamWriterPrivate; enum { @@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_WRITER: priv->stream_writer = - *static_cast *>(g_value_get_pointer(value)); + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass) gobject_class->get_property = garrow_stream_writer_get_property; spec = g_param_spec_pointer("stream-writer", - "ipc::StreamWriter", - "The raw std::shared *", + "ipc::OutputStreamWriter", + "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec); @@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink, GArrowSchema *schema, GError **error) { - std::shared_ptr arrow_stream_writer; + std::shared_ptr arrow_stream_writer; auto status = - arrow::ipc::StreamWriter::Open(garrow_output_stream_get_raw(sink).get(), - garrow_schema_get_raw(schema), - &arrow_stream_writer); + arrow::ipc::OutputStreamWriter::Open(garrow_output_stream_get_raw(sink).get(), + garrow_schema_get_raw(schema), + &arrow_stream_writer); if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) { return garrow_stream_writer_new_raw(&arrow_stream_writer); } else { diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h index 2aaa776f8176..b0f95d4c8ea5 100644 --- a/c_glib/arrow-glib/writer.h +++ b/c_glib/arrow-glib/writer.h @@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass GArrowStreamWriterClass; /** * GArrowStreamWriter: * - * It wraps `arrow::ipc::StreamWriter`. + * It wraps `arrow::ipc::OutputStreamWriter`. */ struct _GArrowStreamWriter { diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 3f9f67c35928..7d4ecb709ed8 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -38,10 +38,12 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then cmake -DARROW_TEST_MEMCHECK=on \ $CMAKE_COMMON_FLAGS \ -DARROW_CXXFLAGS="-Wconversion -Werror" \ + -DARROW_NO_DEPRECATED_API=on \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ -DARROW_CXXFLAGS=-Werror \ + -DARROW_NO_DEPRECATED_API=on \ $ARROW_CPP_DIR fi diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6b2ceec32777..0ad7ef560fb3 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -89,6 +89,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") "Build the Arrow micro benchmarks" OFF) + option(ARROW_NO_DEPRECATED_API + "Exclude deprecated APIs from build" + OFF) + option(ARROW_IPC "Build the Arrow IPC extensions" ON) @@ -154,6 +158,10 @@ include(BuildUtils) include(SetupCxxFlags) +if (ARROW_NO_DEPRECATED_API) + add_definitions(-DARROW_NO_DEPRECATED_API) +endif() + # Add common flags set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_COMMON_FLAGS}") set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}") diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc index 8161b191380d..a52716b72313 100644 --- a/cpp/src/arrow/ipc/file-to-stream.cc +++ b/cpp/src/arrow/ipc/file-to-stream.cc @@ -28,14 +28,14 @@ namespace arrow { // Reads a file on the file system and prints to stdout the stream version of it. Status ConvertToStream(const char* path) { std::shared_ptr in_file; - std::shared_ptr reader; + std::shared_ptr reader; RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file)); - RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); + RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader)); io::StdoutStream sink; - std::shared_ptr writer; - RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(ipc::OutputStreamWriter::Open(&sink, reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr chunk; RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index b4a88b5519b7..c7cca9f45871 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture { if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); } RETURN_NOT_OK(mmap_->Seek(0)); - std::shared_ptr file_writer; - RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); + std::shared_ptr file_writer; + RETURN_NOT_OK(BatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true)); RETURN_NOT_OK(file_writer->Close()); int64_t offset; RETURN_NOT_OK(mmap_->Tell(&offset)); - std::shared_ptr file_reader; - RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader)); + std::shared_ptr file_reader; + RETURN_NOT_OK(BatchFileReader::Open(mmap_, offset, &file_reader)); return file_reader->GetRecordBatch(0, result); } @@ -487,8 +487,8 @@ class TestFileFormat : public ::testing::TestWithParam { Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { // Write the file - std::shared_ptr writer; - RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(BatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); const int num_batches = static_cast(in_batches.size()); @@ -504,8 +504,8 @@ class TestFileFormat : public ::testing::TestWithParam { // Open the file auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(BatchFileReader::Open(buf_reader, footer_offset, &reader)); EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { @@ -553,8 +553,8 @@ class TestStreamFormat : public ::testing::TestWithParam { Status RoundTripHelper( const RecordBatch& batch, std::vector>* out_batches) { // Write the file - std::shared_ptr writer; - RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(OutputStreamWriter::Open(sink_.get(), batch.schema(), &writer)); int num_batches = 5; for (int i = 0; i < num_batches; ++i) { RETURN_NOT_OK(writer->WriteRecordBatch(batch)); @@ -565,8 +565,8 @@ class TestStreamFormat : public ::testing::TestWithParam { // Open the file auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(InputStreamReader::Open(buf_reader, &reader)); std::shared_ptr chunk; while (true) { diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index aa95500003ec..5afd020aadca 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -76,8 +76,8 @@ static Status ConvertJsonToArrow( std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; } - std::shared_ptr writer; - RETURN_NOT_OK(ipc::FileWriter::Open(out_file.get(), reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(ipc::BatchFileWriter::Open(out_file.get(), reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr batch; @@ -96,8 +96,8 @@ static Status ConvertArrowToJson( RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file)); RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file)); - std::shared_ptr reader; - RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader)); if (FLAGS_verbose) { std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; @@ -137,8 +137,8 @@ static Status ValidateArrowVsJson( std::shared_ptr arrow_file; RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file)); - std::shared_ptr arrow_reader; - RETURN_NOT_OK(ipc::FileReader::Open(arrow_file, &arrow_reader)); + std::shared_ptr arrow_reader; + RETURN_NOT_OK(ipc::BatchFileReader::Open(arrow_file, &arrow_reader)); auto json_schema = json_reader->schema(); auto arrow_schema = arrow_reader->schema(); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index aea4c9cd5ec1..8241d6401676 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona } // ---------------------------------------------------------------------- -// StreamReader implementation +// InputStreamReader implementation static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); @@ -176,10 +176,12 @@ static inline std::string message_type_name(Message::Type type) { return "unknown"; } -class StreamReader::StreamReaderImpl { +BatchStreamReader::~BatchStreamReader() {} + +class InputStreamReader::InputStreamReaderImpl { public: - StreamReaderImpl() {} - ~StreamReaderImpl() {} + InputStreamReaderImpl() {} + ~InputStreamReaderImpl() {} Status Open(const std::shared_ptr& stream) { stream_ = stream; @@ -267,33 +269,31 @@ class StreamReader::StreamReaderImpl { std::shared_ptr schema_; }; -StreamReader::StreamReader() { - impl_.reset(new StreamReaderImpl()); +InputStreamReader::InputStreamReader() { + impl_.reset(new InputStreamReaderImpl()); } -StreamReader::~StreamReader() {} - -Status StreamReader::Open(const std::shared_ptr& stream, - std::shared_ptr* reader) { +Status InputStreamReader::Open(const std::shared_ptr& stream, + std::shared_ptr* reader) { // Private ctor - *reader = std::shared_ptr(new StreamReader()); + *reader = std::shared_ptr(new InputStreamReader()); return (*reader)->impl_->Open(stream); } -std::shared_ptr StreamReader::schema() const { +std::shared_ptr InputStreamReader::schema() const { return impl_->schema(); } -Status StreamReader::GetNextRecordBatch(std::shared_ptr* batch) { +Status InputStreamReader::GetNextRecordBatch(std::shared_ptr* batch) { return impl_->GetNextRecordBatch(batch); } // ---------------------------------------------------------------------- // Reader implementation -class FileReader::FileReaderImpl { +class BatchFileReader::BatchFileReaderImpl { public: - FileReaderImpl() { dictionary_memo_ = std::make_shared(); } + BatchFileReaderImpl() { dictionary_memo_ = std::make_shared(); } Status ReadFooter() { int magic_size = static_cast(strlen(kArrowMagicBytes)); @@ -432,38 +432,38 @@ class FileReader::FileReaderImpl { std::shared_ptr schema_; }; -FileReader::FileReader() { - impl_.reset(new FileReaderImpl()); +BatchFileReader::BatchFileReader() { + impl_.reset(new BatchFileReaderImpl()); } -FileReader::~FileReader() {} +BatchFileReader::~BatchFileReader() {} -Status FileReader::Open(const std::shared_ptr& file, - std::shared_ptr* reader) { +Status BatchFileReader::Open(const std::shared_ptr& file, + std::shared_ptr* reader) { int64_t footer_offset; RETURN_NOT_OK(file->GetSize(&footer_offset)); return Open(file, footer_offset, reader); } -Status FileReader::Open(const std::shared_ptr& file, - int64_t footer_offset, std::shared_ptr* reader) { - *reader = std::shared_ptr(new FileReader()); +Status BatchFileReader::Open(const std::shared_ptr& file, + int64_t footer_offset, std::shared_ptr* reader) { + *reader = std::shared_ptr(new BatchFileReader()); return (*reader)->impl_->Open(file, footer_offset); } -std::shared_ptr FileReader::schema() const { +std::shared_ptr BatchFileReader::schema() const { return impl_->schema(); } -int FileReader::num_record_batches() const { +int BatchFileReader::num_record_batches() const { return impl_->num_record_batches(); } -MetadataVersion FileReader::version() const { +MetadataVersion BatchFileReader::version() const { return impl_->version(); } -Status FileReader::GetRecordBatch(int i, std::shared_ptr* batch) { +Status BatchFileReader::GetRecordBatch(int i, std::shared_ptr* batch) { return impl_->GetRecordBatch(i, batch); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 1972446743bc..90b85ae2c35e 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -44,29 +44,48 @@ class RandomAccessFile; namespace ipc { -class ARROW_EXPORT StreamReader { +/// \brief Abstract interface for reading stream of record batches +class ARROW_EXPORT BatchStreamReader { public: - ~StreamReader(); + virtual ~BatchStreamReader(); - // Open an stream. - static Status Open(const std::shared_ptr& stream, - std::shared_ptr* reader); + /// \return the shared schema of the record batches in the stream + virtual std::shared_ptr schema() const = 0; - std::shared_ptr schema() const; + /// Read the next record batch in the stream. Return nullptr for batch when + /// reaching end of stream + /// + /// \param(out) batch the next loaded batch, nullptr at end of stream + /// \return Status + virtual Status GetNextRecordBatch(std::shared_ptr* batch) = 0; +}; - // Returned batch is nullptr when end of stream reached - Status GetNextRecordBatch(std::shared_ptr* batch); +/// \class InputStreamReader +/// \brief Synchronous batch stream reader that reads from io::InputStream +class ARROW_EXPORT InputStreamReader : public BatchStreamReader { + public: + /// Create batch reader from InputStream + /// + /// \param(in) stream an input stream instance + /// \param(out) reader the created reader object + /// \return Status + static Status Open(const std::shared_ptr& stream, + std::shared_ptr* reader); + + std::shared_ptr schema() const override; + Status GetNextRecordBatch(std::shared_ptr* batch) override; private: - StreamReader(); + InputStreamReader(); - class ARROW_NO_EXPORT StreamReaderImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT InputStreamReaderImpl; + std::unique_ptr impl_; }; -class ARROW_EXPORT FileReader { +/// \brief Reads the random access record batch file format +class ARROW_EXPORT BatchFileReader { public: - ~FileReader(); + ~BatchFileReader(); // Open a file-like object that is assumed to be self-contained; i.e., the // end of the file interface is the end of the Arrow file. Note that there @@ -74,7 +93,7 @@ class ARROW_EXPORT FileReader { // need only locate the end of the Arrow file stream to discover the metadata // and then proceed to read the data into memory. static Status Open(const std::shared_ptr& file, - std::shared_ptr* reader); + std::shared_ptr* reader); // If the file is embedded within some larger file or memory region, you can // pass the absolute memory offset to the end of the file (which contains the @@ -84,46 +103,80 @@ class ARROW_EXPORT FileReader { // @param file: the data source // @param footer_offset: the position of the end of the Arrow "file" static Status Open(const std::shared_ptr& file, - int64_t footer_offset, std::shared_ptr* reader); + int64_t footer_offset, std::shared_ptr* reader); /// The schema includes any dictionaries std::shared_ptr schema() const; + /// Returns number of record batches in the file int num_record_batches() const; + /// Returns MetadataVersion in the file metadata MetadataVersion version() const; - // Read a record batch from the file. Does not copy memory if the input - // source supports zero-copy. - // - // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an - // "always copy" option) + /// Read a record batch from the file. Does not copy memory if the input + /// source supports zero-copy. + /// + /// \param(in) i the index of the record batch to return + /// \param(out) batch the read batch + /// \return Status Status GetRecordBatch(int i, std::shared_ptr* batch); private: - FileReader(); + BatchFileReader(); - class ARROW_NO_EXPORT FileReaderImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT BatchFileReaderImpl; + std::unique_ptr impl_; }; -// Generic read functionsh; does not copy data if the input supports zero copy reads +// Generic read functions; does not copy data if the input supports zero copy reads + +/// Read record batch from file given metadata and schema +/// +/// \param(in) metadata a Message containing the record batch metadata +/// \param(in) schema the record batch schema +/// \param(in) file a random access file +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, const std::shared_ptr& schema, io::RandomAccessFile* file, std::shared_ptr* out); +/// Read record batch from file given metadata and schema +/// +/// \param(in) metadata a Message containing the record batch metadata +/// \param(in) schema the record batch schema +/// \param(in) file a random access file +/// \param(in) max_recursion_depth the maximum permitted nesting depth +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, const std::shared_ptr& schema, int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr* out); -/// Read encapsulated message and RecordBatch +/// Read record batch as encapsulated IPC message with metadata size prefix and +/// header +/// +/// \param(in) schema the record batch schema +/// \param(in) offset the file location of the start of the message +/// \param(in) file the file where the batch is located +/// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, io::RandomAccessFile* file, std::shared_ptr* out); -/// EXPERIMENTAL: Read arrow::Tensor from a contiguous message +/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file +/// +/// \param(in) offset the file location of the start of the message +/// \param(in) file the file where the batch is located +/// \param(out) out the read tensor Status ARROW_EXPORT ReadTensor( int64_t offset, io::RandomAccessFile* file, std::shared_ptr* out); +/// Backwards-compatibility for Arrow < 0.4.0 +/// +#ifndef ARROW_NO_DEPRECATED_API +using StreamReader = BatchStreamReader; +using FileReader = BatchFileReader; +#endif + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index ec0ac435a9d0..d135e5416e37 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -30,12 +30,12 @@ namespace arrow { // $ | stream-to-file > file.arrow Status ConvertToFile() { std::shared_ptr input(new io::StdinStream); - std::shared_ptr reader; - RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(ipc::InputStreamReader::Open(input, &reader)); io::StdoutStream sink; - std::shared_ptr writer; - RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(ipc::BatchFileWriter::Open(&sink, reader->schema(), &writer)); std::shared_ptr batch; while (true) { diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 78d6b9eb92b4..39dad2a9cc4e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -580,18 +580,22 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) { return Status::OK(); } +// ---------------------------------------------------------------------- + +BatchStreamWriter::~BatchStreamWriter() {} + // ---------------------------------------------------------------------- // Stream writer implementation -class StreamWriter::StreamWriterImpl { +class OutputStreamWriter::OutputStreamWriterImpl { public: - StreamWriterImpl() + OutputStreamWriterImpl() : dictionary_memo_(std::make_shared()), pool_(default_memory_pool()), position_(-1), started_(false) {} - virtual ~StreamWriterImpl() = default; + virtual ~OutputStreamWriterImpl() = default; Status Open(io::OutputStream* sink, const std::shared_ptr& schema) { sink_ = sink; @@ -721,37 +725,36 @@ class StreamWriter::StreamWriterImpl { std::vector record_batches_; }; -StreamWriter::StreamWriter() { - impl_.reset(new StreamWriterImpl()); +OutputStreamWriter::OutputStreamWriter() { + impl_.reset(new OutputStreamWriterImpl()); } -Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +Status OutputStreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -StreamWriter::~StreamWriter() {} - -void StreamWriter::set_memory_pool(MemoryPool* pool) { +void OutputStreamWriter::set_memory_pool(MemoryPool* pool) { impl_->set_memory_pool(pool); } -Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out) { +Status OutputStreamWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, std::shared_ptr* out) { // ctor is private - *out = std::shared_ptr(new StreamWriter()); + *out = std::shared_ptr(new OutputStreamWriter()); return (*out)->impl_->Open(sink, schema); } -Status StreamWriter::Close() { +Status OutputStreamWriter::Close() { return impl_->Close(); } // ---------------------------------------------------------------------- // File writer implementation -class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { +class BatchFileWriter::BatchFileWriterImpl + : public OutputStreamWriter::OutputStreamWriterImpl { public: - using BASE = StreamWriter::StreamWriterImpl; + using BASE = OutputStreamWriter::OutputStreamWriterImpl; Status Start() override { RETURN_NOT_OK(WriteAligned( @@ -783,23 +786,23 @@ class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { } }; -FileWriter::FileWriter() { - impl_.reset(new FileWriterImpl()); +BatchFileWriter::BatchFileWriter() { + impl_.reset(new BatchFileWriterImpl()); } -FileWriter::~FileWriter() {} +BatchFileWriter::~BatchFileWriter() {} -Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out) { - *out = std::shared_ptr(new FileWriter()); // ctor is private +Status BatchFileWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, std::shared_ptr* out) { + *out = std::shared_ptr(new BatchFileWriter()); // ctor is private return (*out)->impl_->Open(sink, schema); } -Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +Status BatchFileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -Status FileWriter::Close() { +Status BatchFileWriter::Close() { return impl_->Close(); } diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index b71becb8c73b..5be1a6f597d5 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -46,6 +46,81 @@ class OutputStream; namespace ipc { +/// \class BatchStreamWriter +/// \brief Abstract interface for writing a stream of record batches +class ARROW_EXPORT BatchStreamWriter { + public: + virtual ~BatchStreamWriter(); + + /// Write a record batch to the stream + /// + /// \param allow_64bit boolean permitting field lengths exceeding INT32_MAX + /// \return Status indicate success or failure + virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0; + + /// Perform any logic necessary to finish the stream + /// + /// \return Status indicate success or failure + virtual Status Close() = 0; + + /// In some cases, writing may require memory allocation. We use the default + /// memory pool, but provide the option to override + /// + /// \param pool the memory pool to use for required allocations + virtual void set_memory_pool(MemoryPool* pool) = 0; +}; + +/// \class OutputStreamWriter +/// \brief Synchronous batch stream writer that writes to io::OutputStream +class ARROW_EXPORT OutputStreamWriter : public BatchStreamWriter { + public: + /// Create a new writer from stream sink and schema. User is responsible for + /// closing the actual OutputStream. + /// + /// \param(in) sink output stream to write to + /// \param(in) schema the schema of the record batches to be written + /// \param(out) out the created stream writer + /// \return Status indicating success or failure + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; + Status Close() override; + void set_memory_pool(MemoryPool* pool) override; + + protected: + OutputStreamWriter(); + class ARROW_NO_EXPORT OutputStreamWriterImpl; + std::unique_ptr impl_; +}; + +/// \brief Creates the random access record batch file format +/// +/// Implements the random access file format, which structurally is a record +/// batch stream followed by a metadata footer at the end of the file. Magic +/// numbers are written at the start and end of the file +class ARROW_EXPORT BatchFileWriter : public OutputStreamWriter { + public: + virtual ~BatchFileWriter(); + + /// Create a new writer from stream sink and schema + /// + /// \param(in) sink output stream to write to + /// \param(in) schema the schema of the record batches to be written + /// \param(out) out the created stream writer + /// \return Status indicating success or failure + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; + Status Close() override; + + private: + BatchFileWriter(); + class ARROW_NO_EXPORT BatchFileWriterImpl; + std::unique_ptr impl_; +}; + /// Write the RecordBatch (collection of equal-length Arrow arrays) to the /// output stream in a contiguous block. The record batch metadata is written as /// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) @@ -58,13 +133,13 @@ namespace ipc { /// to the end of the body and end of the metadata / data header (suffixed by /// the header size) is returned in out-variables /// -/// @param(in) buffer_start_offset the start offset to use in the buffer metadata, +/// \param(in) buffer_start_offset the start offset to use in the buffer metadata, /// default should be 0 -/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be +/// \param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be /// readable by other Arrow implementations -/// @param(out) metadata_length: the size of the length-prefixed flatbuffer +/// \param(out) metadata_length: the size of the length-prefixed flatbuffer /// including padding to a 64-byte boundary -/// @param(out) body_length: the size of the contiguous buffer block plus +/// \param(out) body_length: the size of the contiguous buffer block plus /// padding bytes Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, @@ -85,45 +160,6 @@ Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // write the tensor including metadata, padding, and data Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size); -class ARROW_EXPORT StreamWriter { - public: - virtual ~StreamWriter(); - - static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out); - - virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false); - - /// Perform any logic necessary to finish the stream. User is responsible for - /// closing the actual OutputStream - virtual Status Close(); - - // In some cases, writing may require memory allocation. We use the default - // memory pool, but provide the option to override - void set_memory_pool(MemoryPool* pool); - - protected: - StreamWriter(); - class ARROW_NO_EXPORT StreamWriterImpl; - std::unique_ptr impl_; -}; - -class ARROW_EXPORT FileWriter : public StreamWriter { - public: - virtual ~FileWriter(); - - static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out); - - Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; - Status Close() override; - - private: - FileWriter(); - class ARROW_NO_EXPORT FileWriterImpl; - std::unique_ptr impl_; -}; - /// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data /// may not be readable by all Arrow implementations Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, @@ -135,6 +171,13 @@ Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); +/// Backwards-compatibility for Arrow < 0.4.0 +/// +#ifndef ARROW_NO_DEPRECATED_API +using FileWriter = BatchFileWriter; +using StreamWriter = OutputStreamWriter; +#endif + } // namespace ipc } // namespace arrow diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index a8dd8c5e110a..810e57cc5719 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -177,10 +177,10 @@ Interprocess Communication and Messaging .. autosummary:: :toctree: generated/ - FileReader - FileWriter - StreamReader - StreamWriter + BatchFileReader + BatchFileWriter + BatchStreamReader + BatchStreamWriter .. _api.memory_pool: diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst index e63e7455bb81..cce2ae8d8153 100644 --- a/python/doc/source/ipc.rst +++ b/python/doc/source/ipc.rst @@ -55,13 +55,13 @@ First, let's create a small record batch: batch.num_columns Now, we can begin writing a stream containing some number of these batches. For -this we use :class:`~pyarrow.StreamWriter`, which can write to a writeable +this we use :class:`~pyarrow.BatchStreamWriter`, which can write to a writeable ``NativeFile`` object or a writeable Python object: .. ipython:: python sink = pa.InMemoryOutputStream() - writer = pa.StreamWriter(sink, batch.schema) + writer = pa.BatchStreamWriter(sink, batch.schema) Here we used an in-memory Arrow buffer stream, but this could have been a socket or some other IO sink. @@ -80,11 +80,11 @@ particular stream. Now we can do: buf.size Now ``buf`` contains the complete stream as an in-memory byte buffer. We can -read such a stream with :class:`~pyarrow.StreamReader`: +read such a stream with :class:`~pyarrow.BatchStreamReader`: .. ipython:: python - reader = pa.StreamReader(buf) + reader = pa.BatchStreamReader(buf) reader.schema batches = [b for b in reader] @@ -103,13 +103,13 @@ batches are also zero-copy and do not allocate any new memory on read. Writing and Reading Random Access Files --------------------------------------- -The :class:`~pyarrow.FileWriter` has the same API as -:class:`~pyarrow.StreamWriter`: +The :class:`~pyarrow.BatchFileWriter` has the same API as +:class:`~pyarrow.BatchStreamWriter`: .. ipython:: python sink = pa.InMemoryOutputStream() - writer = pa.FileWriter(sink, batch.schema) + writer = pa.BatchFileWriter(sink, batch.schema) for i in range(10): writer.write_batch(batch) @@ -118,13 +118,14 @@ The :class:`~pyarrow.FileWriter` has the same API as buf = sink.get_result() buf.size -The difference between :class:`~pyarrow.FileReader` and -:class:`~pyarrow.StreamReader` is that the input source must have a ``seek`` -method for random access. The stream reader only requires read operations: +The difference between :class:`~pyarrow.BatchFileReader` and +:class:`~pyarrow.BatchStreamReader` is that the input source must have a +``seek`` method for random access. The stream reader only requires read +operations: .. ipython:: python - reader = pa.FileReader(buf) + reader = pa.BatchFileReader(buf) Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random: diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 7d79811d9883..c5beb06b4008 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -101,7 +101,12 @@ def jemalloc_memory_pool(): from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem -from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter +from pyarrow.ipc import (BatchFileReader, BatchFileWriter, + BatchStreamReader, BatchStreamWriter) localfs = LocalFilesystem.get_instance() + + +# DEPRECATED +from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3d56c14bae21..1cc6074c7773 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -547,38 +547,40 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: - cdef cppclass CStreamWriter " arrow::ipc::StreamWriter": - @staticmethod - CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CStreamWriter]* out) - + cdef cppclass CBatchStreamWriter " arrow::ipc::BatchStreamWriter": CStatus Close() CStatus WriteRecordBatch(const CRecordBatch& batch) - cdef cppclass CStreamReader " arrow::ipc::StreamReader": + cdef cppclass CBatchStreamReader " arrow::ipc::BatchStreamReader": + shared_ptr[CSchema] schema() + CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) + cdef cppclass CInputStreamReader \ + " arrow::ipc::InputStreamReader"(CBatchStreamReader): @staticmethod CStatus Open(const shared_ptr[InputStream]& stream, - shared_ptr[CStreamReader]* out) + shared_ptr[CInputStreamReader]* out) - shared_ptr[CSchema] schema() - - CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) - - cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter): + cdef cppclass COutputStreamWriter \ + " arrow::ipc::OutputStreamWriter"(CBatchStreamWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CFileWriter]* out) + shared_ptr[COutputStreamWriter]* out) - cdef cppclass CFileReader " arrow::ipc::FileReader": + cdef cppclass CBatchFileWriter \ + " arrow::ipc::BatchFileWriter"(CBatchStreamWriter): + @staticmethod + CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, + shared_ptr[CBatchFileWriter]* out) + cdef cppclass CBatchFileReader " arrow::ipc::BatchFileReader": @staticmethod CStatus Open(const shared_ptr[RandomAccessFile]& file, - shared_ptr[CFileReader]* out) + shared_ptr[CBatchFileReader]* out) @staticmethod CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file, - int64_t footer_offset, shared_ptr[CFileReader]* out) + int64_t footer_offset, shared_ptr[CBatchFileReader]* out) shared_ptr[CSchema] schema() diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a0a96e72864d..91dbf00d04b9 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -916,9 +916,9 @@ cdef class HdfsFile(NativeFile): # ---------------------------------------------------------------------- # File and stream readers and writers -cdef class _StreamWriter: +cdef class _BatchStreamWriter: cdef: - shared_ptr[CStreamWriter] writer + shared_ptr[CBatchStreamWriter] writer shared_ptr[OutputStream] sink bint closed @@ -930,12 +930,17 @@ cdef class _StreamWriter: self.close() def _open(self, sink, Schema schema): + cdef: + shared_ptr[COutputStreamWriter] writer + get_writer(sink, &self.sink) with nogil: - check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema, - &self.writer)) + check_status( + COutputStreamWriter.Open(self.sink.get(), schema.sp_schema, + &writer)) + self.writer = writer self.closed = False def write_batch(self, RecordBatch batch): @@ -949,9 +954,9 @@ cdef class _StreamWriter: self.closed = True -cdef class _StreamReader: +cdef class _BatchStreamReader: cdef: - shared_ptr[CStreamReader] reader + shared_ptr[CBatchStreamReader] reader cdef readonly: Schema schema @@ -961,15 +966,17 @@ cdef class _StreamReader: def _open(self, source): cdef: - shared_ptr[RandomAccessFile] reader + shared_ptr[RandomAccessFile] file_handle shared_ptr[InputStream] in_stream + shared_ptr[CInputStreamReader] reader - get_reader(source, &reader) - in_stream = reader + get_reader(source, &file_handle) + in_stream = file_handle with nogil: - check_status(CStreamReader.Open(in_stream, &self.reader)) + check_status(CInputStreamReader.Open(in_stream, &reader)) + self.reader = reader self.schema = Schema() self.schema.init_schema(self.reader.get().schema()) @@ -1009,24 +1016,25 @@ cdef class _StreamReader: return pyarrow_wrap_table(table) -cdef class _FileWriter(_StreamWriter): +cdef class _BatchFileWriter(_BatchStreamWriter): def _open(self, sink, Schema schema): - cdef shared_ptr[CFileWriter] writer + cdef shared_ptr[CBatchFileWriter] writer get_writer(sink, &self.sink) with nogil: - check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) + check_status( + CBatchFileWriter.Open(self.sink.get(), schema.sp_schema, + &writer)) # Cast to base class, because has same interface - self.writer = writer + self.writer = writer self.closed = False -cdef class _FileReader: +cdef class _BatchFileReader: cdef: - shared_ptr[CFileReader] reader + shared_ptr[CBatchFileReader] reader def __cinit__(self): pass @@ -1041,9 +1049,10 @@ cdef class _FileReader: with nogil: if offset != 0: - check_status(CFileReader.Open2(reader, offset, &self.reader)) + check_status(CBatchFileReader.Open2(reader, offset, + &self.reader)) else: - check_status(CFileReader.Open(reader, &self.reader)) + check_status(CBatchFileReader.Open(reader, &self.reader)) property num_record_batches: diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index c37a1ce7df1e..571a3c2dfcfd 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -20,7 +20,7 @@ import pyarrow.lib as lib -class StreamReader(lib._StreamReader): +class BatchStreamReader(lib._BatchStreamReader): """ Reader for the Arrow streaming binary format @@ -37,7 +37,7 @@ def __iter__(self): yield self.get_next_batch() -class StreamWriter(lib._StreamWriter): +class BatchStreamWriter(lib._BatchStreamWriter): """ Writer for the Arrow streaming binary format @@ -52,7 +52,7 @@ def __init__(self, sink, schema): self._open(sink, schema) -class FileReader(lib._FileReader): +class BatchFileReader(lib._BatchFileReader): """ Class for reading Arrow record batch data from the Arrow binary file format @@ -68,7 +68,7 @@ def __init__(self, source, footer_offset=None): self._open(source, footer_offset=footer_offset) -class FileWriter(lib._FileWriter): +class BatchFileWriter(lib._BatchFileWriter): """ Writer to create the Arrow binary file format @@ -81,3 +81,9 @@ class FileWriter(lib._FileWriter): """ def __init__(self, sink, schema): self._open(sink, schema) + + +StreamWriter = BatchStreamWriter +StreamReader = BatchStreamWriter +FileWriter = BatchFileWriter +FileReader = BatchFileReader From 22346d47d53e1b63f75b05f7e7613ff214e2b44d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 13 May 2017 16:01:54 -0400 Subject: [PATCH 2/5] Fix unit tests Change-Id: I38b1e570c69af59aac917e96845b7add947d7196 --- python/pyarrow/ipc.py | 2 +- python/pyarrow/tests/test_ipc.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 571a3c2dfcfd..f18ade563c86 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -84,6 +84,6 @@ def __init__(self, sink, schema): StreamWriter = BatchStreamWriter -StreamReader = BatchStreamWriter +StreamReader = BatchStreamReader FileWriter = BatchFileWriter FileReader = BatchFileReader diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 02040678958e..fa996606ca44 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -99,12 +99,12 @@ def test_read_all(self): class TestStream(MessagingTest, unittest.TestCase): def _get_writer(self, sink, schema): - return pa.StreamWriter(sink, schema) + return pa.BatchStreamWriter(sink, schema) def test_simple_roundtrip(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.StreamReader(file_contents) + reader = pa.BatchStreamReader(file_contents) assert reader.schema.equals(batches[0].schema) From 04fa2854e285d8e15aece0a6980ca4eee1c79576 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 13 May 2017 16:34:49 -0400 Subject: [PATCH 3/5] Feedback on ipc reader/writer names. Add open_stream/open_file Python APIs Change-Id: I92cc2d5de55f625dee543bd7dc223225fd8f7977 --- cpp/src/arrow/ipc/file-to-stream.cc | 12 ++-- cpp/src/arrow/ipc/ipc-read-write-test.cc | 25 +++++---- cpp/src/arrow/ipc/json-integration-test.cc | 13 +++-- cpp/src/arrow/ipc/reader.cc | 52 ++++++++--------- cpp/src/arrow/ipc/reader.h | 36 ++++++------ cpp/src/arrow/ipc/stream-to-file.cc | 12 ++-- cpp/src/arrow/ipc/writer.cc | 65 ++++++++++++---------- cpp/src/arrow/ipc/writer.h | 39 ++++++------- python/doc/source/api.rst | 10 ++-- python/pyarrow/__init__.py | 10 ++-- python/pyarrow/includes/libarrow.pxd | 32 ++++++----- python/pyarrow/io.pxi | 41 +++++++------- python/pyarrow/ipc.py | 48 +++++++++++++--- python/pyarrow/tests/test_ipc.py | 20 +++---- 14 files changed, 232 insertions(+), 183 deletions(-) diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc index a52716b72313..39c720cb35aa 100644 --- a/cpp/src/arrow/ipc/file-to-stream.cc +++ b/cpp/src/arrow/ipc/file-to-stream.cc @@ -24,18 +24,19 @@ #include "arrow/util/io-util.h" namespace arrow { +namespace ipc { // Reads a file on the file system and prints to stdout the stream version of it. Status ConvertToStream(const char* path) { std::shared_ptr in_file; - std::shared_ptr reader; + std::shared_ptr reader; RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file)); - RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader)); + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader)); io::StdoutStream sink; - std::shared_ptr writer; - RETURN_NOT_OK(ipc::OutputStreamWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr chunk; RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); @@ -44,6 +45,7 @@ Status ConvertToStream(const char* path) { return writer->Close(); } +} // namespace ipc } // namespace arrow int main(int argc, char** argv) { @@ -51,7 +53,7 @@ int main(int argc, char** argv) { std::cerr << "Usage: file-to-stream " << std::endl; return 1; } - arrow::Status status = arrow::ConvertToStream(argv[1]); + arrow::Status status = arrow::ipc::ConvertToStream(argv[1]); if (!status.ok()) { std::cerr << "Could not convert to stream: " << status.ToString() << std::endl; return 1; diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index c7cca9f45871..c99816c760e6 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture { if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); } RETURN_NOT_OK(mmap_->Seek(0)); - std::shared_ptr file_writer; - RETURN_NOT_OK(BatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); + std::shared_ptr file_writer; + RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer)); RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true)); RETURN_NOT_OK(file_writer->Close()); int64_t offset; RETURN_NOT_OK(mmap_->Tell(&offset)); - std::shared_ptr file_reader; - RETURN_NOT_OK(BatchFileReader::Open(mmap_, offset, &file_reader)); + std::shared_ptr file_reader; + RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader)); return file_reader->GetRecordBatch(0, result); } @@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam { Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { // Write the file - std::shared_ptr writer; - RETURN_NOT_OK(BatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK( + RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); const int num_batches = static_cast(in_batches.size()); @@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam { // Open the file auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - RETURN_NOT_OK(BatchFileReader::Open(buf_reader, footer_offset, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader)); EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { @@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam { Status RoundTripHelper( const RecordBatch& batch, std::vector>* out_batches) { // Write the file - std::shared_ptr writer; - RETURN_NOT_OK(OutputStreamWriter::Open(sink_.get(), batch.schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer)); int num_batches = 5; for (int i = 0; i < num_batches; ++i) { RETURN_NOT_OK(writer->WriteRecordBatch(batch)); @@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam { // Open the file auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - RETURN_NOT_OK(InputStreamReader::Open(buf_reader, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader)); std::shared_ptr chunk; while (true) { diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 5afd020aadca..424755a94068 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -76,8 +76,9 @@ static Status ConvertJsonToArrow( std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; } - std::shared_ptr writer; - RETURN_NOT_OK(ipc::BatchFileWriter::Open(out_file.get(), reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK( + ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr batch; @@ -96,8 +97,8 @@ static Status ConvertArrowToJson( RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file)); RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file)); - std::shared_ptr reader; - RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader)); if (FLAGS_verbose) { std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; @@ -137,8 +138,8 @@ static Status ValidateArrowVsJson( std::shared_ptr arrow_file; RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file)); - std::shared_ptr arrow_reader; - RETURN_NOT_OK(ipc::BatchFileReader::Open(arrow_file, &arrow_reader)); + std::shared_ptr arrow_reader; + RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader)); auto json_schema = json_reader->schema(); auto arrow_schema = arrow_reader->schema(); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8241d6401676..078748ef1c49 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona } // ---------------------------------------------------------------------- -// InputStreamReader implementation +// RecordBatchStreamReader implementation static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); @@ -176,12 +176,12 @@ static inline std::string message_type_name(Message::Type type) { return "unknown"; } -BatchStreamReader::~BatchStreamReader() {} +RecordBatchReader::~RecordBatchReader() {} -class InputStreamReader::InputStreamReaderImpl { +class RecordBatchStreamReader::RecordBatchStreamReaderImpl { public: - InputStreamReaderImpl() {} - ~InputStreamReaderImpl() {} + RecordBatchStreamReaderImpl() {} + ~RecordBatchStreamReaderImpl() {} Status Open(const std::shared_ptr& stream) { stream_ = stream; @@ -269,31 +269,31 @@ class InputStreamReader::InputStreamReaderImpl { std::shared_ptr schema_; }; -InputStreamReader::InputStreamReader() { - impl_.reset(new InputStreamReaderImpl()); +RecordBatchStreamReader::RecordBatchStreamReader() { + impl_.reset(new RecordBatchStreamReaderImpl()); } -Status InputStreamReader::Open(const std::shared_ptr& stream, - std::shared_ptr* reader) { +Status RecordBatchStreamReader::Open(const std::shared_ptr& stream, + std::shared_ptr* reader) { // Private ctor - *reader = std::shared_ptr(new InputStreamReader()); + *reader = std::shared_ptr(new RecordBatchStreamReader()); return (*reader)->impl_->Open(stream); } -std::shared_ptr InputStreamReader::schema() const { +std::shared_ptr RecordBatchStreamReader::schema() const { return impl_->schema(); } -Status InputStreamReader::GetNextRecordBatch(std::shared_ptr* batch) { +Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr* batch) { return impl_->GetNextRecordBatch(batch); } // ---------------------------------------------------------------------- // Reader implementation -class BatchFileReader::BatchFileReaderImpl { +class RecordBatchFileReader::RecordBatchFileReaderImpl { public: - BatchFileReaderImpl() { dictionary_memo_ = std::make_shared(); } + RecordBatchFileReaderImpl() { dictionary_memo_ = std::make_shared(); } Status ReadFooter() { int magic_size = static_cast(strlen(kArrowMagicBytes)); @@ -432,38 +432,38 @@ class BatchFileReader::BatchFileReaderImpl { std::shared_ptr schema_; }; -BatchFileReader::BatchFileReader() { - impl_.reset(new BatchFileReaderImpl()); +RecordBatchFileReader::RecordBatchFileReader() { + impl_.reset(new RecordBatchFileReaderImpl()); } -BatchFileReader::~BatchFileReader() {} +RecordBatchFileReader::~RecordBatchFileReader() {} -Status BatchFileReader::Open(const std::shared_ptr& file, - std::shared_ptr* reader) { +Status RecordBatchFileReader::Open(const std::shared_ptr& file, + std::shared_ptr* reader) { int64_t footer_offset; RETURN_NOT_OK(file->GetSize(&footer_offset)); return Open(file, footer_offset, reader); } -Status BatchFileReader::Open(const std::shared_ptr& file, - int64_t footer_offset, std::shared_ptr* reader) { - *reader = std::shared_ptr(new BatchFileReader()); +Status RecordBatchFileReader::Open(const std::shared_ptr& file, + int64_t footer_offset, std::shared_ptr* reader) { + *reader = std::shared_ptr(new RecordBatchFileReader()); return (*reader)->impl_->Open(file, footer_offset); } -std::shared_ptr BatchFileReader::schema() const { +std::shared_ptr RecordBatchFileReader::schema() const { return impl_->schema(); } -int BatchFileReader::num_record_batches() const { +int RecordBatchFileReader::num_record_batches() const { return impl_->num_record_batches(); } -MetadataVersion BatchFileReader::version() const { +MetadataVersion RecordBatchFileReader::version() const { return impl_->version(); } -Status BatchFileReader::GetRecordBatch(int i, std::shared_ptr* batch) { +Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr* batch) { return impl_->GetRecordBatch(i, batch); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 90b85ae2c35e..8d40e5a01dd9 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -45,9 +45,9 @@ class RandomAccessFile; namespace ipc { /// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT BatchStreamReader { +class ARROW_EXPORT RecordBatchReader { public: - virtual ~BatchStreamReader(); + virtual ~RecordBatchReader(); /// \return the shared schema of the record batches in the stream virtual std::shared_ptr schema() const = 0; @@ -60,9 +60,9 @@ class ARROW_EXPORT BatchStreamReader { virtual Status GetNextRecordBatch(std::shared_ptr* batch) = 0; }; -/// \class InputStreamReader +/// \class RecordBatchStreamReader /// \brief Synchronous batch stream reader that reads from io::InputStream -class ARROW_EXPORT InputStreamReader : public BatchStreamReader { +class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { public: /// Create batch reader from InputStream /// @@ -70,22 +70,22 @@ class ARROW_EXPORT InputStreamReader : public BatchStreamReader { /// \param(out) reader the created reader object /// \return Status static Status Open(const std::shared_ptr& stream, - std::shared_ptr* reader); + std::shared_ptr* reader); std::shared_ptr schema() const override; Status GetNextRecordBatch(std::shared_ptr* batch) override; private: - InputStreamReader(); + RecordBatchStreamReader(); - class ARROW_NO_EXPORT InputStreamReaderImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT RecordBatchStreamReaderImpl; + std::unique_ptr impl_; }; -/// \brief Reads the random access record batch file format -class ARROW_EXPORT BatchFileReader { +/// \brief Reads the record batch file format +class ARROW_EXPORT RecordBatchFileReader { public: - ~BatchFileReader(); + ~RecordBatchFileReader(); // Open a file-like object that is assumed to be self-contained; i.e., the // end of the file interface is the end of the Arrow file. Note that there @@ -93,7 +93,7 @@ class ARROW_EXPORT BatchFileReader { // need only locate the end of the Arrow file stream to discover the metadata // and then proceed to read the data into memory. static Status Open(const std::shared_ptr& file, - std::shared_ptr* reader); + std::shared_ptr* reader); // If the file is embedded within some larger file or memory region, you can // pass the absolute memory offset to the end of the file (which contains the @@ -103,7 +103,7 @@ class ARROW_EXPORT BatchFileReader { // @param file: the data source // @param footer_offset: the position of the end of the Arrow "file" static Status Open(const std::shared_ptr& file, - int64_t footer_offset, std::shared_ptr* reader); + int64_t footer_offset, std::shared_ptr* reader); /// The schema includes any dictionaries std::shared_ptr schema() const; @@ -123,10 +123,10 @@ class ARROW_EXPORT BatchFileReader { Status GetRecordBatch(int i, std::shared_ptr* batch); private: - BatchFileReader(); + RecordBatchFileReader(); - class ARROW_NO_EXPORT BatchFileReaderImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT RecordBatchFileReaderImpl; + std::unique_ptr impl_; }; // Generic read functions; does not copy data if the input supports zero copy reads @@ -173,8 +173,8 @@ Status ARROW_EXPORT ReadTensor( /// Backwards-compatibility for Arrow < 0.4.0 /// #ifndef ARROW_NO_DEPRECATED_API -using StreamReader = BatchStreamReader; -using FileReader = BatchFileReader; +using StreamReader = RecordBatchReader; +using FileReader = RecordBatchFileReader; #endif } // namespace ipc diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index d135e5416e37..b94205427dfe 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -24,18 +24,19 @@ #include "arrow/util/io-util.h" namespace arrow { +namespace ipc { // Converts a stream from stdin to a file written to standard out. // A typical usage would be: // $ | stream-to-file > file.arrow Status ConvertToFile() { std::shared_ptr input(new io::StdinStream); - std::shared_ptr reader; - RETURN_NOT_OK(ipc::InputStreamReader::Open(input, &reader)); + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader)); io::StdoutStream sink; - std::shared_ptr writer; - RETURN_NOT_OK(ipc::BatchFileWriter::Open(&sink, reader->schema(), &writer)); + std::shared_ptr writer; + RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer)); std::shared_ptr batch; while (true) { @@ -46,10 +47,11 @@ Status ConvertToFile() { return writer->Close(); } +} // namespace ipc } // namespace arrow int main(int argc, char** argv) { - arrow::Status status = arrow::ConvertToFile(); + arrow::Status status = arrow::ipc::ConvertToFile(); if (!status.ok()) { std::cerr << "Could not convert to file: " << status.ToString() << std::endl; return 1; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 39dad2a9cc4e..0f0791f2c880 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -88,9 +88,9 @@ static inline bool NeedTruncate( return offset != 0 || min_length < buffer->size(); } -class RecordBatchWriter : public ArrayVisitor { +class RecordBatchSerializer : public ArrayVisitor { public: - RecordBatchWriter(MemoryPool* pool, int64_t buffer_start_offset, + RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth, bool allow_64bit) : pool_(pool), max_recursion_depth_(max_recursion_depth), @@ -99,7 +99,7 @@ class RecordBatchWriter : public ArrayVisitor { DCHECK_GT(max_recursion_depth, 0); } - virtual ~RecordBatchWriter() = default; + virtual ~RecordBatchSerializer() = default; Status VisitArray(const Array& arr) { if (max_recursion_depth_ <= 0) { @@ -480,9 +480,9 @@ class RecordBatchWriter : public ArrayVisitor { bool allow_64bit_; }; -class DictionaryWriter : public RecordBatchWriter { +class DictionaryWriter : public RecordBatchSerializer { public: - using RecordBatchWriter::RecordBatchWriter; + using RecordBatchSerializer::RecordBatchSerializer; Status WriteMetadataMessage( int64_t num_rows, int64_t body_length, std::shared_ptr* out) override { @@ -500,7 +500,7 @@ class DictionaryWriter : public RecordBatchWriter { auto schema = std::make_shared(fields); RecordBatch batch(schema, dictionary->length(), {dictionary}); - return RecordBatchWriter::Write(batch, dst, metadata_length, body_length); + return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); } private: @@ -521,7 +521,8 @@ Status AlignStreamPosition(io::OutputStream* stream) { Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool, int max_recursion_depth, bool allow_64bit) { - RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth, allow_64bit); + RecordBatchSerializer writer( + pool, buffer_start_offset, max_recursion_depth, allow_64bit); return writer.Write(batch, dst, metadata_length, body_length); } @@ -582,20 +583,20 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) { // ---------------------------------------------------------------------- -BatchStreamWriter::~BatchStreamWriter() {} +RecordBatchWriter::~RecordBatchWriter() {} // ---------------------------------------------------------------------- // Stream writer implementation -class OutputStreamWriter::OutputStreamWriterImpl { +class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { public: - OutputStreamWriterImpl() + RecordBatchStreamWriterImpl() : dictionary_memo_(std::make_shared()), pool_(default_memory_pool()), position_(-1), started_(false) {} - virtual ~OutputStreamWriterImpl() = default; + virtual ~RecordBatchStreamWriterImpl() = default; Status Open(io::OutputStream* sink, const std::shared_ptr& schema) { sink_ = sink; @@ -725,36 +726,38 @@ class OutputStreamWriter::OutputStreamWriterImpl { std::vector record_batches_; }; -OutputStreamWriter::OutputStreamWriter() { - impl_.reset(new OutputStreamWriterImpl()); +RecordBatchStreamWriter::RecordBatchStreamWriter() { + impl_.reset(new RecordBatchStreamWriterImpl()); } -Status OutputStreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +Status RecordBatchStreamWriter::WriteRecordBatch( + const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -void OutputStreamWriter::set_memory_pool(MemoryPool* pool) { +void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) { impl_->set_memory_pool(pool); } -Status OutputStreamWriter::Open(io::OutputStream* sink, - const std::shared_ptr& schema, std::shared_ptr* out) { +Status RecordBatchStreamWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, + std::shared_ptr* out) { // ctor is private - *out = std::shared_ptr(new OutputStreamWriter()); + *out = std::shared_ptr(new RecordBatchStreamWriter()); return (*out)->impl_->Open(sink, schema); } -Status OutputStreamWriter::Close() { +Status RecordBatchStreamWriter::Close() { return impl_->Close(); } // ---------------------------------------------------------------------- // File writer implementation -class BatchFileWriter::BatchFileWriterImpl - : public OutputStreamWriter::OutputStreamWriterImpl { +class RecordBatchFileWriter::RecordBatchFileWriterImpl + : public RecordBatchStreamWriter::RecordBatchStreamWriterImpl { public: - using BASE = OutputStreamWriter::OutputStreamWriterImpl; + using BASE = RecordBatchStreamWriter::RecordBatchStreamWriterImpl; Status Start() override { RETURN_NOT_OK(WriteAligned( @@ -786,23 +789,25 @@ class BatchFileWriter::BatchFileWriterImpl } }; -BatchFileWriter::BatchFileWriter() { - impl_.reset(new BatchFileWriterImpl()); +RecordBatchFileWriter::RecordBatchFileWriter() { + impl_.reset(new RecordBatchFileWriterImpl()); } -BatchFileWriter::~BatchFileWriter() {} +RecordBatchFileWriter::~RecordBatchFileWriter() {} -Status BatchFileWriter::Open(io::OutputStream* sink, - const std::shared_ptr& schema, std::shared_ptr* out) { - *out = std::shared_ptr(new BatchFileWriter()); // ctor is private +Status RecordBatchFileWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, std::shared_ptr* out) { + *out = std::shared_ptr( + new RecordBatchFileWriter()); // ctor is private return (*out)->impl_->Open(sink, schema); } -Status BatchFileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { +Status RecordBatchFileWriter::WriteRecordBatch( + const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -Status BatchFileWriter::Close() { +Status RecordBatchFileWriter::Close() { return impl_->Close(); } diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 5be1a6f597d5..5db19e7e280c 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -46,11 +46,11 @@ class OutputStream; namespace ipc { -/// \class BatchStreamWriter +/// \class RecordBatchWriter /// \brief Abstract interface for writing a stream of record batches -class ARROW_EXPORT BatchStreamWriter { +class ARROW_EXPORT RecordBatchWriter { public: - virtual ~BatchStreamWriter(); + virtual ~RecordBatchWriter(); /// Write a record batch to the stream /// @@ -70,9 +70,10 @@ class ARROW_EXPORT BatchStreamWriter { virtual void set_memory_pool(MemoryPool* pool) = 0; }; -/// \class OutputStreamWriter -/// \brief Synchronous batch stream writer that writes to io::OutputStream -class ARROW_EXPORT OutputStreamWriter : public BatchStreamWriter { +/// \class RecordBatchStreamWriter +/// \brief Synchronous batch stream writer that writes the Arrow streaming +/// format +class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { public: /// Create a new writer from stream sink and schema. User is responsible for /// closing the actual OutputStream. @@ -82,26 +83,26 @@ class ARROW_EXPORT OutputStreamWriter : public BatchStreamWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out); + std::shared_ptr* out); Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; void set_memory_pool(MemoryPool* pool) override; protected: - OutputStreamWriter(); - class ARROW_NO_EXPORT OutputStreamWriterImpl; - std::unique_ptr impl_; + RecordBatchStreamWriter(); + class ARROW_NO_EXPORT RecordBatchStreamWriterImpl; + std::unique_ptr impl_; }; -/// \brief Creates the random access record batch file format +/// \brief Creates the Arrow record batch file format /// /// Implements the random access file format, which structurally is a record /// batch stream followed by a metadata footer at the end of the file. Magic /// numbers are written at the start and end of the file -class ARROW_EXPORT BatchFileWriter : public OutputStreamWriter { +class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { public: - virtual ~BatchFileWriter(); + virtual ~RecordBatchFileWriter(); /// Create a new writer from stream sink and schema /// @@ -110,15 +111,15 @@ class ARROW_EXPORT BatchFileWriter : public OutputStreamWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out); + std::shared_ptr* out); Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; private: - BatchFileWriter(); - class ARROW_NO_EXPORT BatchFileWriterImpl; - std::unique_ptr impl_; + RecordBatchFileWriter(); + class ARROW_NO_EXPORT RecordBatchFileWriterImpl; + std::unique_ptr impl_; }; /// Write the RecordBatch (collection of equal-length Arrow arrays) to the @@ -174,8 +175,8 @@ Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst, /// Backwards-compatibility for Arrow < 0.4.0 /// #ifndef ARROW_NO_DEPRECATED_API -using FileWriter = BatchFileWriter; -using StreamWriter = OutputStreamWriter; +using FileWriter = RecordBatchFileWriter; +using StreamWriter = RecordBatchStreamWriter; #endif } // namespace ipc diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index 810e57cc5719..e7bea7013b9a 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -177,10 +177,12 @@ Interprocess Communication and Messaging .. autosummary:: :toctree: generated/ - BatchFileReader - BatchFileWriter - BatchStreamReader - BatchStreamWriter + RecordBatchFileReader + RecordBatchFileWriter + RecordBatchStreamReader + RecordBatchStreamWriter + open_file + open_stream .. _api.memory_pool: diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index c5beb06b4008..d6d2aa4a671e 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -101,12 +101,10 @@ def jemalloc_memory_pool(): from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem -from pyarrow.ipc import (BatchFileReader, BatchFileWriter, - BatchStreamReader, BatchStreamWriter) +from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter, + RecordBatchStreamReader, RecordBatchStreamWriter, + open_stream, + open_file) localfs = LocalFilesystem.get_instance() - - -# DEPRECATED -from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1cc6074c7773..b03dd59dffca 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -547,40 +547,44 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: - cdef cppclass CBatchStreamWriter " arrow::ipc::BatchStreamWriter": + cdef cppclass CRecordBatchWriter \ + " arrow::ipc::RecordBatchWriter": CStatus Close() CStatus WriteRecordBatch(const CRecordBatch& batch) - cdef cppclass CBatchStreamReader " arrow::ipc::BatchStreamReader": + cdef cppclass CRecordBatchReader \ + " arrow::ipc::RecordBatchReader": shared_ptr[CSchema] schema() CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) - cdef cppclass CInputStreamReader \ - " arrow::ipc::InputStreamReader"(CBatchStreamReader): + cdef cppclass CRecordBatchStreamReader \ + " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod CStatus Open(const shared_ptr[InputStream]& stream, - shared_ptr[CInputStreamReader]* out) + shared_ptr[CRecordBatchStreamReader]* out) - cdef cppclass COutputStreamWriter \ - " arrow::ipc::OutputStreamWriter"(CBatchStreamWriter): + cdef cppclass CRecordBatchStreamWriter \ + " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[COutputStreamWriter]* out) + shared_ptr[CRecordBatchStreamWriter]* out) - cdef cppclass CBatchFileWriter \ - " arrow::ipc::BatchFileWriter"(CBatchStreamWriter): + cdef cppclass CRecordBatchFileWriter \ + " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CBatchFileWriter]* out) + shared_ptr[CRecordBatchFileWriter]* out) - cdef cppclass CBatchFileReader " arrow::ipc::BatchFileReader": + cdef cppclass CRecordBatchFileReader \ + " arrow::ipc::RecordBatchFileReader": @staticmethod CStatus Open(const shared_ptr[RandomAccessFile]& file, - shared_ptr[CBatchFileReader]* out) + shared_ptr[CRecordBatchFileReader]* out) @staticmethod CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file, - int64_t footer_offset, shared_ptr[CBatchFileReader]* out) + int64_t footer_offset, + shared_ptr[CRecordBatchFileReader]* out) shared_ptr[CSchema] schema() diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 91dbf00d04b9..4cbf603c2a5d 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -916,9 +916,9 @@ cdef class HdfsFile(NativeFile): # ---------------------------------------------------------------------- # File and stream readers and writers -cdef class _BatchStreamWriter: +cdef class _RecordBatchWriter: cdef: - shared_ptr[CBatchStreamWriter] writer + shared_ptr[CRecordBatchWriter] writer shared_ptr[OutputStream] sink bint closed @@ -931,16 +931,17 @@ cdef class _BatchStreamWriter: def _open(self, sink, Schema schema): cdef: - shared_ptr[COutputStreamWriter] writer + shared_ptr[CRecordBatchStreamWriter] writer get_writer(sink, &self.sink) with nogil: check_status( - COutputStreamWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) + CRecordBatchStreamWriter.Open(self.sink.get(), + schema.sp_schema, + &writer)) - self.writer = writer + self.writer = writer self.closed = False def write_batch(self, RecordBatch batch): @@ -954,9 +955,9 @@ cdef class _BatchStreamWriter: self.closed = True -cdef class _BatchStreamReader: +cdef class _RecordBatchReader: cdef: - shared_ptr[CBatchStreamReader] reader + shared_ptr[CRecordBatchReader] reader cdef readonly: Schema schema @@ -968,15 +969,15 @@ cdef class _BatchStreamReader: cdef: shared_ptr[RandomAccessFile] file_handle shared_ptr[InputStream] in_stream - shared_ptr[CInputStreamReader] reader + shared_ptr[CRecordBatchStreamReader] reader get_reader(source, &file_handle) in_stream = file_handle with nogil: - check_status(CInputStreamReader.Open(in_stream, &reader)) + check_status(CRecordBatchStreamReader.Open(in_stream, &reader)) - self.reader = reader + self.reader = reader self.schema = Schema() self.schema.init_schema(self.reader.get().schema()) @@ -1016,25 +1017,25 @@ cdef class _BatchStreamReader: return pyarrow_wrap_table(table) -cdef class _BatchFileWriter(_BatchStreamWriter): +cdef class _RecordBatchFileWriter(_RecordBatchWriter): def _open(self, sink, Schema schema): - cdef shared_ptr[CBatchFileWriter] writer + cdef shared_ptr[CRecordBatchFileWriter] writer get_writer(sink, &self.sink) with nogil: check_status( - CBatchFileWriter.Open(self.sink.get(), schema.sp_schema, + CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema, &writer)) # Cast to base class, because has same interface - self.writer = writer + self.writer = writer self.closed = False -cdef class _BatchFileReader: +cdef class _RecordBatchFileReader: cdef: - shared_ptr[CBatchFileReader] reader + shared_ptr[CRecordBatchFileReader] reader def __cinit__(self): pass @@ -1049,10 +1050,10 @@ cdef class _BatchFileReader: with nogil: if offset != 0: - check_status(CBatchFileReader.Open2(reader, offset, - &self.reader)) + check_status(CRecordBatchFileReader.Open2( + reader, offset, &self.reader)) else: - check_status(CBatchFileReader.Open(reader, &self.reader)) + check_status(CRecordBatchFileReader.Open(reader, &self.reader)) property num_record_batches: diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index f18ade563c86..8338de33a0d2 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -20,7 +20,7 @@ import pyarrow.lib as lib -class BatchStreamReader(lib._BatchStreamReader): +class RecordBatchStreamReader(lib._RecordBatchReader): """ Reader for the Arrow streaming binary format @@ -37,7 +37,7 @@ def __iter__(self): yield self.get_next_batch() -class BatchStreamWriter(lib._BatchStreamWriter): +class RecordBatchStreamWriter(lib._RecordBatchWriter): """ Writer for the Arrow streaming binary format @@ -52,7 +52,7 @@ def __init__(self, sink, schema): self._open(sink, schema) -class BatchFileReader(lib._BatchFileReader): +class RecordBatchFileReader(lib._RecordBatchFileReader): """ Class for reading Arrow record batch data from the Arrow binary file format @@ -68,7 +68,7 @@ def __init__(self, source, footer_offset=None): self._open(source, footer_offset=footer_offset) -class BatchFileWriter(lib._BatchFileWriter): +class RecordBatchFileWriter(lib._RecordBatchFileWriter): """ Writer to create the Arrow binary file format @@ -83,7 +83,39 @@ def __init__(self, sink, schema): self._open(sink, schema) -StreamWriter = BatchStreamWriter -StreamReader = BatchStreamReader -FileWriter = BatchFileWriter -FileReader = BatchFileReader +def open_stream(source): + """ + Create reader for Arrow streaming format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + footer_offset : int, default None + If the file is embedded in some larger file, this is the byte offset to + the very end of the file data + + Returns + ------- + reader : RecordBatchStreamReader + """ + return RecordBatchStreamReader(source) + + +def open_file(source, footer_offset=None): + """ + Create reader for Arrow file format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + footer_offset : int, default None + If the file is embedded in some larger file, this is the byte offset to + the very end of the file data + + Returns + ------- + reader : RecordBatchFileReader + """ + return RecordBatchFileReader(source, footer_offset=footer_offset) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index fa996606ca44..4d19804dac2a 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -70,13 +70,13 @@ class TestFile(MessagingTest, unittest.TestCase): # Also tests writing zero-copy NumPy array with additional padding def _get_writer(self, sink, schema): - return pa.FileWriter(sink, schema) + return pa.RecordBatchFileWriter(sink, schema) def test_simple_roundtrip(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.FileReader(file_contents) + reader = pa.open_file(file_contents) assert reader.num_record_batches == len(batches) @@ -89,7 +89,7 @@ def test_read_all(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.FileReader(file_contents) + reader = pa.open_file(file_contents) result = reader.read_all() expected = pa.Table.from_batches(batches) @@ -99,12 +99,12 @@ def test_read_all(self): class TestStream(MessagingTest, unittest.TestCase): def _get_writer(self, sink, schema): - return pa.BatchStreamWriter(sink, schema) + return pa.RecordBatchStreamWriter(sink, schema) def test_simple_roundtrip(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.BatchStreamReader(file_contents) + reader = pa.open_stream(file_contents) assert reader.schema.equals(batches[0].schema) @@ -121,7 +121,7 @@ def test_simple_roundtrip(self): def test_read_all(self): batches = self.write_batches() file_contents = self._get_source() - reader = pa.StreamReader(file_contents) + reader = pa.open_stream(file_contents) result = reader.read_all() expected = pa.Table.from_batches(batches) @@ -147,7 +147,7 @@ def run(self): connection, client_address = self._sock.accept() try: source = connection.makefile(mode='rb') - reader = pa.StreamReader(source) + reader = pa.open_stream(source) self._schema = reader.schema if self._do_read_all: self._table = reader.read_all() @@ -185,7 +185,7 @@ def _get_sink(self): return self._sock.makefile(mode='wb') def _get_writer(self, sink, schema): - return pa.StreamWriter(sink, schema) + return pa.RecordBatchStreamWriter(sink, schema) def test_simple_roundtrip(self): self.start_server(do_read_all=False) @@ -241,12 +241,12 @@ def test_get_record_batch_size(): def write_file(batch, sink): - writer = pa.FileWriter(sink, batch.schema) + writer = pa.RecordBatchFileWriter(sink, batch.schema) writer.write_batch(batch) writer.close() def read_file(source): - reader = pa.FileReader(source) + reader = pa.open_file(source) return [reader.get_batch(i) for i in range(reader.num_record_batches)] From a797ee3e934a3f959f5afe72464be43e9127b9e3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 13 May 2017 16:40:07 -0400 Subject: [PATCH 4/5] Fix glib Change-Id: I2b375c917e6004c3ffebf3657491d4756f792951 --- c_glib/arrow-glib/stream-reader.cpp | 18 +++++++++--------- c_glib/arrow-glib/stream-reader.hpp | 4 ++-- c_glib/arrow-glib/writer.cpp | 16 ++++++++-------- c_glib/arrow-glib/writer.h | 2 +- c_glib/arrow-glib/writer.hpp | 8 ++++---- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/c_glib/arrow-glib/stream-reader.cpp b/c_glib/arrow-glib/stream-reader.cpp index d1bf0519b1ee..19c36c20fbb5 100644 --- a/c_glib/arrow-glib/stream-reader.cpp +++ b/c_glib/arrow-glib/stream-reader.cpp @@ -43,7 +43,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamReaderPrivate_ { - std::shared_ptr stream_reader; + std::shared_ptr stream_reader; } GArrowStreamReaderPrivate; enum { @@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_READER: priv->stream_reader = - *static_cast *>(g_value_get_pointer(value)); + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass) gobject_class->get_property = garrow_stream_reader_get_property; spec = g_param_spec_pointer("stream-reader", - "ipc::InputStreamReader", - "The raw std::shared *", + "ipc::RecordBatchStreamReader", + "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec); @@ -143,10 +143,10 @@ GArrowStreamReader * garrow_stream_reader_new(GArrowInputStream *stream, GError **error) { - std::shared_ptr arrow_stream_reader; + std::shared_ptr arrow_stream_reader; auto status = - arrow::ipc::InputStreamReader::Open(garrow_input_stream_get_raw(stream), - &arrow_stream_reader); + arrow::ipc::RecordBatchStreamReader::Open(garrow_input_stream_get_raw(stream), + &arrow_stream_reader); if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) { return garrow_stream_reader_new_raw(&arrow_stream_reader); } else { @@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader, G_END_DECLS GArrowStreamReader * -garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader) +garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader) { auto stream_reader = GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER, @@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr *arr return stream_reader; } -std::shared_ptr +std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader) { GArrowStreamReaderPrivate *priv; diff --git a/c_glib/arrow-glib/stream-reader.hpp b/c_glib/arrow-glib/stream-reader.hpp index fdd147e88e31..5191b4edb1cd 100644 --- a/c_glib/arrow-glib/stream-reader.hpp +++ b/c_glib/arrow-glib/stream-reader.hpp @@ -24,5 +24,5 @@ #include -GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader); -std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); +GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr *arrow_stream_reader); +std::shared_ptr garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader); diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp index a83feef21fc0..092993b5c224 100644 --- a/c_glib/arrow-glib/writer.cpp +++ b/c_glib/arrow-glib/writer.cpp @@ -47,7 +47,7 @@ G_BEGIN_DECLS */ typedef struct GArrowStreamWriterPrivate_ { - std::shared_ptr stream_writer; + std::shared_ptr stream_writer; } GArrowStreamWriterPrivate; enum { @@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object, switch (prop_id) { case PROP_STREAM_WRITER: priv->stream_writer = - *static_cast *>(g_value_get_pointer(value)); + *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); @@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass) gobject_class->get_property = garrow_stream_writer_get_property; spec = g_param_spec_pointer("stream-writer", - "ipc::OutputStreamWriter", - "The raw std::shared *", + "ipc::RecordBatchStreamWriter", + "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec); @@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink, GArrowSchema *schema, GError **error) { - std::shared_ptr arrow_stream_writer; + std::shared_ptr arrow_stream_writer; auto status = - arrow::ipc::OutputStreamWriter::Open(garrow_output_stream_get_raw(sink).get(), - garrow_schema_get_raw(schema), - &arrow_stream_writer); + arrow::ipc::RecordBatchStreamWriter::Open(garrow_output_stream_get_raw(sink).get(), + garrow_schema_get_raw(schema), + &arrow_stream_writer); if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) { return garrow_stream_writer_new_raw(&arrow_stream_writer); } else { diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h index b0f95d4c8ea5..2f8e90cd2859 100644 --- a/c_glib/arrow-glib/writer.h +++ b/c_glib/arrow-glib/writer.h @@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass GArrowStreamWriterClass; /** * GArrowStreamWriter: * - * It wraps `arrow::ipc::OutputStreamWriter`. + * It wraps `arrow::ipc::RecordBatchStreamWriter`. */ struct _GArrowStreamWriter { diff --git a/c_glib/arrow-glib/writer.hpp b/c_glib/arrow-glib/writer.hpp index 199f205e28fe..47f5e6839654 100644 --- a/c_glib/arrow-glib/writer.hpp +++ b/c_glib/arrow-glib/writer.hpp @@ -24,8 +24,8 @@ #include -GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr *arrow_stream_writer); -std::shared_ptr garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer); +GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr *arrow_stream_writer); +std::shared_ptr garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer); -GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr *arrow_file_writer); -arrow::ipc::FileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer); +GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr *arrow_file_writer); +arrow::ipc::RecordBatchFileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer); From d7b7c9ce14d5a7d617b15c6217251d36a4d37ddb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 13 May 2017 17:02:08 -0400 Subject: [PATCH 5/5] Add missing dtors for pimpl pattern Change-Id: I0c06793275de0bf96be5602e5c99ae9cf7ad2b14 --- cpp/src/arrow/ipc/reader.cc | 2 ++ cpp/src/arrow/ipc/reader.h | 2 ++ cpp/src/arrow/ipc/writer.cc | 2 ++ cpp/src/arrow/ipc/writer.h | 2 ++ 4 files changed, 8 insertions(+) diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 078748ef1c49..2b7b90f2f2ea 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -273,6 +273,8 @@ RecordBatchStreamReader::RecordBatchStreamReader() { impl_.reset(new RecordBatchStreamReaderImpl()); } +RecordBatchStreamReader::~RecordBatchStreamReader() {} + Status RecordBatchStreamReader::Open(const std::shared_ptr& stream, std::shared_ptr* reader) { // Private ctor diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 8d40e5a01dd9..dd29a36d4059 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -64,6 +64,8 @@ class ARROW_EXPORT RecordBatchReader { /// \brief Synchronous batch stream reader that reads from io::InputStream class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { public: + virtual ~RecordBatchStreamReader(); + /// Create batch reader from InputStream /// /// \param(in) stream an input stream instance diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 0f0791f2c880..ced071020212 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -730,6 +730,8 @@ RecordBatchStreamWriter::RecordBatchStreamWriter() { impl_.reset(new RecordBatchStreamWriterImpl()); } +RecordBatchStreamWriter::~RecordBatchStreamWriter() {} + Status RecordBatchStreamWriter::WriteRecordBatch( const RecordBatch& batch, bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 5db19e7e280c..899a1b2cc1e3 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -75,6 +75,8 @@ class ARROW_EXPORT RecordBatchWriter { /// format class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { public: + virtual ~RecordBatchStreamWriter(); + /// Create a new writer from stream sink and schema. User is responsible for /// closing the actual OutputStream. ///