Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ public static Schema DecodeSchema(ReadOnlyMemory<byte> buffer)
}

ByteBuffer schemaBuffer = ArrowReaderImplementation.CreateByteBuffer(buffer.Slice(bufferPosition));
var schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
//DictionaryBatch not supported for now
DictionaryMemo dictionaryMemo = null;
var schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer), ref dictionaryMemo);
return schema;
}

internal static Schema DecodeSchema(ByteBuffer schemaBuffer)
{
var schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
//DictionaryBatch not supported for now
DictionaryMemo dictionaryMemo = null;
var schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer), ref dictionaryMemo);
return schema;
}
}
Expand Down
29 changes: 26 additions & 3 deletions csharp/src/Apache.Arrow/Arrays/ArrayData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,52 @@ public sealed class ArrayData : IDisposable
public readonly int Offset;
public readonly ArrowBuffer[] Buffers;
public readonly ArrayData[] Children;
public readonly ArrayData Dictionary; //Only used for dictionary type

//This is left for compatibility with lower version binaries
//before the dictionary type was supported.
public ArrayData(
IArrowType dataType,
int length, int nullCount, int offset,
IEnumerable<ArrowBuffer> buffers, IEnumerable<ArrayData> children) :
this(dataType, length, nullCount, offset, buffers, children, null)
{ }

//This is left for compatibility with lower version binaries
//before the dictionary type was supported.
public ArrayData(
IArrowType dataType,
int length, int nullCount, int offset,
ArrowBuffer[] buffers, ArrayData[] children) :
this(dataType, length, nullCount, offset, buffers, children, null)
{ }

public ArrayData(
IArrowType dataType,
int length, int nullCount = 0, int offset = 0,
IEnumerable<ArrowBuffer> buffers = null, IEnumerable<ArrayData> children = null)
IEnumerable<ArrowBuffer> buffers = null, IEnumerable<ArrayData> children = null, ArrayData dictionary = null)
Comment thread
eerhardt marked this conversation as resolved.
{
DataType = dataType ?? NullType.Default;
Length = length;
NullCount = nullCount;
Offset = offset;
Buffers = buffers?.ToArray();
Children = children?.ToArray();
Dictionary = dictionary;
}

public ArrayData(
IArrowType dataType,
int length, int nullCount = 0, int offset = 0,
ArrowBuffer[] buffers = null, ArrayData[] children = null)
ArrowBuffer[] buffers = null, ArrayData[] children = null, ArrayData dictionary = null)
{
DataType = dataType ?? NullType.Default;
Length = length;
NullCount = nullCount;
Offset = offset;
Buffers = buffers;
Children = children;
Dictionary = dictionary;
}

public void Dispose()
Expand All @@ -74,6 +95,8 @@ public void Dispose()
child?.Dispose();
}
}

Dictionary?.Dispose();
}

public ArrayData Slice(int offset, int length)
Expand All @@ -86,7 +109,7 @@ public ArrayData Slice(int offset, int length)
length = Math.Min(Length - offset, length);
offset += Offset;

return new ArrayData(DataType, length, RecalculateNullCount, offset, Buffers, Children);
return new ArrayData(DataType, length, RecalculateNullCount, offset, Buffers, Children, Dictionary);
}
}
}
1 change: 1 addition & 0 deletions csharp/src/Apache.Arrow/Arrays/ArrowArrayFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static IArrowArray BuildArray(ArrayData data)
case ArrowTypeId.Decimal256:
return new Decimal256Array(data);
case ArrowTypeId.Dictionary:
return new DictionaryArray(data);
case ArrowTypeId.FixedSizedBinary:
case ArrowTypeId.HalfFloat:
case ArrowTypeId.Interval:
Expand Down
61 changes: 61 additions & 0 deletions csharp/src/Apache.Arrow/Arrays/DictionaryArray.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using Apache.Arrow.Types;

namespace Apache.Arrow
{
public class DictionaryArray : Array
{
public IArrowArray Dictionary { get; }
public IArrowArray Indices { get; }
public ArrowBuffer IndicesBuffer => Data.Buffers[1];

public DictionaryArray(ArrayData data) : base(data)
{
data.EnsureBufferCount(2);
data.EnsureDataType(ArrowTypeId.Dictionary);

if (data.Dictionary == null)
{
throw new ArgumentException($"{nameof(data.Dictionary)} must not be null");
}

var dicType = (DictionaryType)data.DataType;
data.Dictionary.EnsureDataType(dicType.ValueType.TypeId);
Comment thread
eerhardt marked this conversation as resolved.

var indicesData = new ArrayData(dicType.IndexType, data.Length, data.NullCount, data.Offset, data.Buffers, data.Children);

Indices = ArrowArrayFactory.BuildArray(indicesData);
Dictionary = ArrowArrayFactory.BuildArray(data.Dictionary);
}

public DictionaryArray(DictionaryType dataType, IArrowArray indicesArray, IArrowArray dictionary) :
base(new ArrayData(dataType, indicesArray.Length, indicesArray.Data.NullCount, indicesArray.Data.Offset, indicesArray.Data.Buffers, indicesArray.Data.Children, dictionary.Data))
{
Data.EnsureBufferCount(2);

indicesArray.Data.EnsureDataType(dataType.IndexType.TypeId);
dictionary.Data.EnsureDataType(dataType.ValueType.TypeId);

Indices = indicesArray;
Dictionary = dictionary;
}

public override void Accept(IArrowArrayVisitor visitor) => Accept(this, visitor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static int ReadFooterLength(Memory<byte> buffer)
private void ReadSchema(Memory<byte> buffer)
{
// Deserialize the footer from the footer flatbuffer
_footer = new ArrowFooter(Flatbuf.Footer.GetRootAsFooter(CreateByteBuffer(buffer)));
_footer = new ArrowFooter(Flatbuf.Footer.GetRootAsFooter(CreateByteBuffer(buffer)), ref _dictionaryMemo);

Schema = _footer.Schema;
}
Expand Down
4 changes: 2 additions & 2 deletions csharp/src/Apache.Arrow/Ipc/ArrowFooter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public ArrowFooter(Schema schema, IEnumerable<Block> dictionaries, IEnumerable<B
#endif
}

public ArrowFooter(Flatbuf.Footer footer)
: this(Ipc.MessageSerializer.GetSchema(footer.Schema.GetValueOrDefault()), GetDictionaries(footer),
public ArrowFooter(Flatbuf.Footer footer, ref DictionaryMemo dictionaryMemo)
: this(Ipc.MessageSerializer.GetSchema(footer.Schema.GetValueOrDefault(), ref dictionaryMemo), GetDictionaries(footer),
GetRecordBatches(footer))
{ }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class ArrowMemoryReaderImplementation : ArrowReaderImplementatio
private readonly ReadOnlyMemory<byte> _buffer;
private int _bufferPosition;

public ArrowMemoryReaderImplementation(ReadOnlyMemory<byte> buffer)
public ArrowMemoryReaderImplementation(ReadOnlyMemory<byte> buffer) : base()
{
_buffer = buffer;
}
Expand Down Expand Up @@ -111,7 +111,7 @@ private void ReadSchema()
}

ByteBuffer schemaBuffer = CreateByteBuffer(_buffer.Slice(_bufferPosition));
Schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemaBuffer));
Schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemaBuffer), ref _dictionaryMemo);
_bufferPosition += schemaMessageLength;
}
}
Expand Down
65 changes: 61 additions & 4 deletions csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ internal abstract class ArrowReaderImplementation : IDisposable
public Schema Schema { get; protected set; }
protected bool HasReadSchema => Schema != null;

private protected DictionaryMemo _dictionaryMemo;
private protected DictionaryMemo DictionaryMemo => _dictionaryMemo ??= new DictionaryMemo();

public void Dispose()
{
Dispose(true);
Expand Down Expand Up @@ -79,6 +82,16 @@ private static bool MatchEnum(Flatbuf.MessageHeader messageHeader, Type flatBuff
}
}

/// <summary>
/// Create a record batch or dictionary batch from Flatbuf.Message.
/// </summary>
/// <remarks>
/// This method adds data to _dictionaryMemo and returns null when the message type is DictionaryBatch.
/// </remarks>>
/// <returns>
/// The record batch when the message type is RecordBatch.
/// Null when the message type is not RecordBatch.
/// </returns>
protected RecordBatch CreateArrowObjectFromMessage(
Flatbuf.Message message, ByteBuffer bodyByteBuffer, IMemoryOwner<byte> memoryOwner)
{
Expand All @@ -88,8 +101,8 @@ protected RecordBatch CreateArrowObjectFromMessage(
// TODO: Read schema and verify equality?
break;
case Flatbuf.MessageHeader.DictionaryBatch:
// TODO: not supported currently
Debug.WriteLine("Dictionaries are not yet supported.");
Flatbuf.DictionaryBatch dictionaryBatch = message.Header<Flatbuf.DictionaryBatch>().Value;
ReadDictionaryBatch(dictionaryBatch, bodyByteBuffer, memoryOwner);
break;
case Flatbuf.MessageHeader.RecordBatch:
Flatbuf.RecordBatch rb = message.Header<Flatbuf.RecordBatch>().Value;
Expand All @@ -109,6 +122,36 @@ internal static ByteBuffer CreateByteBuffer(ReadOnlyMemory<byte> buffer)
return new ByteBuffer(new ReadOnlyMemoryBufferAllocator(buffer), 0);
}

private void ReadDictionaryBatch(Flatbuf.DictionaryBatch dictionaryBatch, ByteBuffer bodyByteBuffer, IMemoryOwner<byte> memoryOwner)
{
long id = dictionaryBatch.Id;
IArrowType valueType = DictionaryMemo.GetDictionaryType(id);
Flatbuf.RecordBatch? recordBatch = dictionaryBatch.Data;

if (!recordBatch.HasValue)
{
throw new InvalidDataException("Dictionary must contain RecordBatch");
}

Field valueField = new Field("dummy", valueType, true);
var schema = new Schema(new[] { valueField }, default);
IList<IArrowArray> arrays = BuildArrays(schema, bodyByteBuffer, recordBatch.Value);

if (arrays.Count != 1)
{
throw new InvalidDataException("Dictionary record batch must contain only one field");
}

if (dictionaryBatch.IsDelta)
{
throw new NotImplementedException("Dictionary delta is not supported yet");
}
else
{
DictionaryMemo.AddOrReplaceDictionary(id, arrays[0]);
}
}

private List<IArrowArray> BuildArrays(
Schema schema,
ByteBuffer messageBuffer,
Expand Down Expand Up @@ -179,7 +222,14 @@ private ArrayData LoadPrimitiveField(

ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData);

return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children);
IArrowArray dictionary = null;
if (field.DataType.TypeId == ArrowTypeId.Dictionary)
{
long id = DictionaryMemo.GetId(field);
dictionary = DictionaryMemo.GetDictionary(id);
}

return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children, dictionary?.Data);
}

private ArrayData LoadVariableField(
Expand Down Expand Up @@ -218,7 +268,14 @@ private ArrayData LoadVariableField(
ArrowBuffer[] arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData);

return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children);
IArrowArray dictionary = null;
if (field.DataType.TypeId == ArrowTypeId.Dictionary)
{
long id = DictionaryMemo.GetId(field);
dictionary = DictionaryMemo.GetDictionary(id);
}

return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children, dictionary?.Data);
}

private ArrayData[] GetChildren(
Expand Down
Loading