ARROW-6870: [C#] Add Support for Dictionary Arrays and Dictionary Encoding#10527
ARROW-6870: [C#] Add Support for Dictionary Arrays and Dictionary Encoding#10527HashidaTKS wants to merge 15 commits into
Conversation
- Implement Dictionary serialization for ArrowStreamReader/Writer
|
cc: @eerhardt |
eerhardt
left a comment
There was a problem hiding this comment.
This looks like a good start. Thanks @HashidaTKS!
Here's my first round of comments.
|
|
||
| public ArrowReaderImplementation() | ||
| { | ||
| _dictionaryMemo = new DictionaryMemo(); |
There was a problem hiding this comment.
Can we lazy load this until it is needed? It currently allocates 3 Dictionary<K, V> objects, which are unnecessary if the payload doesn't contain any dictionaries.
There was a problem hiding this comment.
I have created a LazyCreator class and changed DictionaryMemo _dictionaryMemo to LazyCreator<DictionaryMemo> _lazyDictionaryMemo.
However, I am wondering whether this is the best way to lazy load because it need allocate a LazyCreator object.
Is there any other good idea?
There was a problem hiding this comment.
- I wouldn't use create a new class for this, .NET has https://docs.microsoft.com/dotnet/api/system.lazy-1 already for it.
- Even better is to not use
Lazy<T>at all, and just check for null before using it. You can make a private property to make this really easy:
private DictionaryMemo _dictionaryMemo;
private DictionaryMemo DictionaryMemo => _dictionaryMemo ??= new DictionaryMemo();
// later in code that needs to use DictonaryMemo:
DictionaryMemo.GetDictionaryType(id);This way, nothing new is allocated when we don't see a Dictionary batch in the payload.
There was a problem hiding this comment.
Thank you, I reimplemented them with the way 2.
|
|
||
| private readonly IMemoryOwner<byte> _memoryOwner; | ||
| private readonly IList<IArrowArray> _arrays; | ||
| internal readonly IReadOnlyList<IArrowArray> _arrays; |
There was a problem hiding this comment.
Instead of exposing this field, can you make an internal property?
| internal readonly IReadOnlyList<IArrowArray> _arrays; | |
| private readonly IList<IArrowArray> _arrays; | |
| internal IReadOnlyList<IArrowArray> Arrays => _arrays; |
There was a problem hiding this comment.
Fixed it.
There was already a public property named Arrays so I named the internal property _Arrays.
|
|
||
| _fieldTypeBuilder = new ArrowTypeFlatbufferBuilder(Builder); | ||
| _options = options ?? IpcOptions.Default; | ||
| _dictionaryMemo = new DictionaryMemo(); |
There was a problem hiding this comment.
Can this be lazy-initialized until it is needed? That way every ArrowStreamWriter doesn't need to create it, if they don't use Dictionaries.
| _idToDictionary[id] = dictionary; | ||
| } | ||
|
|
||
| public void AddDictionaryDelta(long id, IArrowArray dictionary) |
There was a problem hiding this comment.
Can this be deleted until it is supported? It is dead code.
There was a problem hiding this comment.
This hasn't been deleted (yet)
There was a problem hiding this comment.
Sorry, I didn't commit the change.
- Separate ArrayData constructors - Remove unused methods / constructors - Load DictionaryMemo lazily - Hide _arrays of RecordBatch - Fix tests
| reader.ReadNextRecordBatch(); | ||
|
|
||
| Assert.Equal(1, memoryPool.Statistics.Allocations); | ||
| Assert.Equal(2, memoryPool.Statistics.Allocations); |
There was a problem hiding this comment.
Memo:
_allocator is called when reading dictionary batches and record batches.
The expected value of memoryPool.Statistics.Allocations is 2 at this context.
There was a problem hiding this comment.
Why change this test at all? If you want to add a new test for dictionaries - we should add a new one, and leave the existing test.
There was a problem hiding this comment.
I fixed it to use [Theory].
|
@eerhardt I have reflected the feedback. |
- Change a LazyCreator constructor position
- Fix a typo
- Slight refactoring for LazyCreator.Instance
|
Added some minor changes. |
- refactor ArrayData constructors
|
Modified ArrayData constructors. I'm sorry to commit many times after requesting a review. |
|
Sorry for the delay @HashidaTKS. I didn't forget about this. I will get back to it this week, hopefully sometime today. |
|
@eerhardt |
| reader.ReadNextRecordBatch(); | ||
|
|
||
| Assert.Equal(1, memoryPool.Statistics.Allocations); | ||
| Assert.Equal(2, memoryPool.Statistics.Allocations); |
There was a problem hiding this comment.
Why change this test at all? If you want to add a new test for dictionaries - we should add a new one, and leave the existing test.
| internal IReadOnlyList<IArrowArray> _Arrays => (IReadOnlyList<IArrowArray>)_arrays; | ||
|
|
||
| private readonly IMemoryOwner<byte> _memoryOwner; | ||
| private readonly IList<IArrowArray> _arrays; |
There was a problem hiding this comment.
| internal IReadOnlyList<IArrowArray> _Arrays => (IReadOnlyList<IArrowArray>)_arrays; | |
| private readonly IMemoryOwner<byte> _memoryOwner; | |
| private readonly IList<IArrowArray> _arrays; | |
| internal IReadOnlyList<IArrowArray> ArrayList => _arrays; | |
| private readonly IMemoryOwner<byte> _memoryOwner; | |
| private readonly List<IArrowArray> _arrays; |
- Using the coding style from https://github.com/dotnet/runtime/blob/main/docs/coding-guidelines/coding-style.md, only fields should have
_prefix. - We shouldn't need to cast here. Just have the private field as a
List<IArrowArray>.
| public Stream BaseStream { get; } | ||
| private readonly bool _leaveOpen; | ||
| private readonly MemoryAllocator _allocator; | ||
| private protected bool HasReadInitialDictionary { get; set; } |
There was a problem hiding this comment.
| private protected bool HasReadInitialDictionary { get; set; } | |
| private bool HasReadInitialDictionary { get; set; } |
Nothing outside this class uses this property.
|
|
||
| public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) : base() |
There was a problem hiding this comment.
| public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) : base() | |
| public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) |
- Needless extra line.
- No need for the explicit call to base.
| } | ||
| } | ||
|
|
||
| protected void ReadInitialDictionaries() |
There was a problem hiding this comment.
| protected void ReadInitialDictionaries() | |
| private void ReadInitialDictionaries() |
| childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value, lazyDictionaryMemo); | ||
| } | ||
|
|
||
| Flatbuf.DictionaryEncoding? de = flatbufField.Dictionary; |
There was a problem hiding this comment.
(nit) Can you spell out de? It doesn't make a great reading experience to figure out what de means.
| Flatbuf.DictionaryEncoding? de = flatbufField.Dictionary; | |
| Flatbuf.DictionaryEncoding? dictionaryEncoding = flatbufField.Dictionary; |
| Flatbuf.Int? indexTypeAsInt = de.Value.IndexType; | ||
| if (!indexTypeAsInt.HasValue) | ||
| { | ||
| throw new InvalidDataException("Dictionary type not defined"); |
There was a problem hiding this comment.
Would this be a more correct message?
| throw new InvalidDataException("Dictionary type not defined"); | |
| throw new InvalidDataException("Dictionary IndexType not defined"); |
| } | ||
|
|
||
| internal static Schema GetSchema(Flatbuf.Schema schema) | ||
| internal static Schema GetSchema(Flatbuf.Schema schema, LazyCreator<DictionaryMemo> lazyDictionaryMemo) |
There was a problem hiding this comment.
Along with the comment about removing LazyCreator, this could be changed to:
| internal static Schema GetSchema(Flatbuf.Schema schema, LazyCreator<DictionaryMemo> lazyDictionaryMemo) | |
| internal static Schema GetSchema(Flatbuf.Schema schema, ref DictionaryMemo dictionaryMemo) |
And then if this code needs to use the dictionaryMemo and it is null, it gets created when it is needed. And the ref gets set.
| { | ||
| public static RecordBatch CreateSampleRecordBatch(int length) | ||
| //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization | ||
| public static RecordBatch CreateSampleRecordBatch(int length, bool createDictionaryArray = false) |
There was a problem hiding this comment.
A general comment about createDictionaryArray - I think the existing tests should be left alone. That way we are still testing as much as we can without any dictionaries. Then we create new tests for dictionary support.
An easy way to do this is to change a bunch of tests from [Fact] to [Theory] and pass in true and false into the test for bool createDictoinaryArray. That way the test runs twice, once with a dictionary, and once without. That should give us the best test coverage.
There was a problem hiding this comment.
I understand.
I fixed tests to use [Theory]
- Fix access modifiers - Fix tests - Change the way of the lazy creation for dictionaryMemo
There was a problem hiding this comment.
@eerhardt
Thank you for the review!
I have responded to what you have pointed out.
- Fix access modifiers and the code format
- Sorry for my carelessness...
- Fix tests
- Change the way of the lazy creation for dictionaryMemo
| public static class TestData | ||
| { | ||
| public static RecordBatch CreateSampleRecordBatch(int length) | ||
| //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization |
There was a problem hiding this comment.
I was planning to always create a dictionary array after ArrowFileWriter/Reader support DictionaryType serialization.
But considering your feedback comments, now I think it is better to leave createDictionaryArray even after that.
I will remove this TODO.
| public Stream BaseStream { get; } | ||
| private readonly bool _leaveOpen; | ||
| private readonly MemoryAllocator _allocator; | ||
| private protected bool HasReadInitialDictionary { get; set; } |
| reader.ReadNextRecordBatch(); | ||
|
|
||
| Assert.Equal(1, memoryPool.Statistics.Allocations); | ||
| Assert.Equal(2, memoryPool.Statistics.Allocations); |
There was a problem hiding this comment.
I fixed it to use [Theory].
| internal IReadOnlyList<IArrowArray> _Arrays => (IReadOnlyList<IArrowArray>)_arrays; | ||
|
|
||
| private readonly IMemoryOwner<byte> _memoryOwner; | ||
| private readonly IList<IArrowArray> _arrays; |
| } | ||
| } | ||
|
|
||
| protected void ReadInitialDictionaries() |
|
|
||
| public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) : base() |
| { | ||
| public static RecordBatch CreateSampleRecordBatch(int length) | ||
| //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization | ||
| public static RecordBatch CreateSampleRecordBatch(int length, bool createDictionaryArray = false) |
There was a problem hiding this comment.
I understand.
I fixed tests to use [Theory]
| public static class TestData | ||
| { | ||
| public static RecordBatch CreateSampleRecordBatch(int length) | ||
| //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization |
There was a problem hiding this comment.
I removed the TODO comment.
| public void Ctor_LeaveOpenDefault_StreamClosedOnDispose() | ||
| { | ||
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100); | ||
| RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: true); |
- change the place of the HasCreatedDictionaryMemo property
- Remove needless extra lines
| HasReadInitialDictionary = true; | ||
| } | ||
|
|
||
| private async ValueTask ReadInitialDictionariesAsync(CancellationToken cancellationToken) |
There was a problem hiding this comment.
(nit) the other methods in this file have the Async method first, and then the synchronous method after it. Can this go above ReadInitialDictionaries()?
|
|
||
| ReadInitialDictionaries(); | ||
|
|
||
| return ReadArrowObject(); |
There was a problem hiding this comment.
The spec says
DictionaryBatch and RecordBatch messages may be interleaved
This appears to only support DictionaryBatches above RecordBatches. If we read all the "initial" dictionaries, and then read a RecordBatch, the next time ReadRecordBatch() is called, if there is a DictionaryBatch, null will be returned. Is that intentional?
There was a problem hiding this comment.
Sorry, it slipped my mind. That is not intentional.
I modified to be able to read interleaved dictionaries.
During the modification, I realized that ReadInitialDictionariesAsync and ReadInitialDictionaries are needless, so I removed them.
Related to this, ArrowStreamWriter does not support writing interleaved dictionaries for now.
I would like to implement it in another PR because this PR will become too large.
How is that?
There was a problem hiding this comment.
I tested reading interleaved dictionaries by creating a test file with python and reading the test file with C#, and it seemed to work fine.
The code for creating the test file is below, this code is based on the test code of the python implementation.
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_ipc.py#L417
import pyarrow as pa
ty = pa.dictionary(pa.int8(), pa.utf8())
data = [["foo", "foo", None],
["foo", "bar", "foo"],
["foo", "bar"],
["foo", None, "bar", "quux"],
["bar", "quux"],
]
batches = [
pa.RecordBatch.from_arrays([pa.array(v, type=ty)], names=['dicts'])
for v in data]
schema = batches[0].schema
def write_batches():
with pa.RecordBatchStreamWriter("./dictionary_batch_test.batch", schema = schema) as writer:
for batch in batches:
writer.write_batch(batch)
st = write_batches()
We should add C# tests for this after ArrowStreamWriter supports writing interleaved dictionaries.
There was a problem hiding this comment.
I would like to implement it in another PR because this PR will become too large.
How is that?
That is perfectly reasonable to me. I think the current functionality is sufficient for the first PR.
There was a problem hiding this comment.
I tested reading interleaved dictionaries by creating a test file with python
In my spare time, I've been trying to set up the integration tests, so we can test the C# implementation against the other languages. Hopefully I can get the initial PR for that up soon.
- Avoid a needless allocation - Remove a needless ctor
- Support reading the replacement dictionaries - Add tests for writing and reading dictionaries used in NestedType arrays - Fix a bug when reading dictionaries used in NestedType arrays
- Regard indexType as signed int32 if it is null
- DictionaryArray with children - Add a test for this - Fix a serialization bug about this
There was a problem hiding this comment.
@eerhardt
Thank you!
I have responded to what you have pointed out.
- Avoid needless allocation
- Remove a needless ctor
- Support reading the replacement dictionaries
- Add tests for writing and reading dictionaries used in NestedType arrays
- Add tests for writing and reading ListType dictionaries
- Fix a bug when reading dictionaries used in NestedType arrays
- Fix a bug when writing ListType dictionaries
- Regard indexType as signed int32 if it is null
|
|
||
| ReadInitialDictionaries(); | ||
|
|
||
| return ReadArrowObject(); |
There was a problem hiding this comment.
Sorry, it slipped my mind. That is not intentional.
I modified to be able to read interleaved dictionaries.
During the modification, I realized that ReadInitialDictionariesAsync and ReadInitialDictionaries are needless, so I removed them.
Related to this, ArrowStreamWriter does not support writing interleaved dictionaries for now.
I would like to implement it in another PR because this PR will become too large.
How is that?
|
|
||
| ReadInitialDictionaries(); | ||
|
|
||
| return ReadArrowObject(); |
There was a problem hiding this comment.
I tested reading interleaved dictionaries by creating a test file with python and reading the test file with C#, and it seemed to work fine.
The code for creating the test file is below, this code is based on the test code of the python implementation.
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_ipc.py#L417
import pyarrow as pa
ty = pa.dictionary(pa.int8(), pa.utf8())
data = [["foo", "foo", None],
["foo", "bar", "foo"],
["foo", "bar"],
["foo", None, "bar", "quux"],
["bar", "quux"],
]
batches = [
pa.RecordBatch.from_arrays([pa.array(v, type=ty)], names=['dicts'])
for v in data]
schema = batches[0].schema
def write_batches():
with pa.RecordBatchStreamWriter("./dictionary_batch_test.batch", schema = schema) as writer:
for batch in batches:
writer.write_batch(batch)
st = write_batches()
We should add C# tests for this after ArrowStreamWriter supports writing interleaved dictionaries.
|
Cc: @eerhardt Would you please re-review this when you have time? |
eerhardt
left a comment
There was a problem hiding this comment.
I just had a couple very minor comments. Once those are addressed, I think we can merge this.
Thank you for the great work here, @HashidaTKS! And thank you for your patience (I had been busy).
- Fix minor issues
|
@eerhardt I appreciate your review and support! |
- do-while -> while
This is a implementation of
DictionaryBatch(de)serialization for the streaming format.The following features are missing for now, I plan to implement these features in another future PR.
isDeltais not supported yet