diff --git a/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs b/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs index e6487f0b..0c51c5dd 100644 --- a/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs +++ b/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs @@ -55,7 +55,7 @@ public static (byte[] Metadata, byte[] Value) Parse(ReadOnlySpan utf8Json) // Pass 2: stream values into a VariantValueWriter using the sorted field IDs. Utf8JsonReader emitter = new Utf8JsonReader(utf8Json); emitter.Read(); - VariantValueWriter writer = new VariantValueWriter(metadataBuilder, idRemap); + using VariantValueWriter writer = new VariantValueWriter(metadataBuilder, idRemap); WriteValue(ref emitter, writer); return (metadata, writer.ToArray()); } diff --git a/src/Apache.Arrow.Scalars/Variant/Buffer.cs b/src/Apache.Arrow.Scalars/Variant/Buffer.cs new file mode 100644 index 00000000..3cbfb62b --- /dev/null +++ b/src/Apache.Arrow.Scalars/Variant/Buffer.cs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Generic; + +namespace Apache.Arrow.Scalars.Variant +{ + /// + /// Mutable, non-thread-safe growable buffer designed for single-owner use as + /// a field on a class (or as a local owned by the same method that created + /// it). The backing array is rented from a caller-supplied + /// so that capacity is "sticky" across reuse cycles + /// within one owner — unlike , which + /// buckets by size class and may hand back a different array than the one + /// last returned. + /// + /// + /// Warning — mutable value type. A Buffer<T> must never + /// be copied by value. All mutation has to go through the original storage + /// location, or a stale copy will silently corrupt output by writing past + /// its own view of the length into a shared backing array: + /// + /// Pass by ref, not by value. Helper methods that take a + /// Buffer<T> parameter must declare it + /// ref Buffer<T>. + /// Assign from a getter only through a ref local + /// (ref Buffer<T> b = ref GetBuffer();) — a plain + /// var local is a copy. + /// Byte-specific helpers live on + /// as ref this extension methods precisely so they can't be + /// invoked on a by-value receiver. Do not add methods that would force + /// a by-value receiver. + /// + /// + /// Explicit lifetime. Pair every with + /// (typically at the begin/end of a scope). + /// must be called before any write; writes on a + /// default-initialized buffer throw . + /// Failing to leaks the backing array to the GC. + /// For the same reason, the local pool must also be cleaned up when done. + /// This can be done with . + /// + /// + /// + internal struct Buffer + { + private const int InitialCapacity = 64; + + private T[] _buf; + private int _length; + + /// Number of items currently written. + public int Length => _length; + + /// + /// The backing array. Slots beyond are unspecified; + /// callers must respect the length when reading. + /// + public T[] RawBuffer => _buf; + + /// + /// Rents a backing array from (falling back to + /// on pool miss) and resets length to + /// zero. Must be called before any write. + /// + public void Acquire(Stack pool) + { + _buf = pool.Count > 0 ? pool.Pop() : ArrayPool.Shared.Rent(InitialCapacity); + _length = 0; + } + + /// + /// Returns the backing array to and clears + /// state. Safe to call on a default-initialized buffer (no-op). + /// + public void Release(Stack pool) + { + if (_buf != null) + { + pool.Push(_buf); + _buf = null; + _length = 0; + } + } + + /// + /// Returns every array stashed in to + /// . Use at end-of-life of the owner + /// to release the per-owner cache built up by . + /// + public static void DrainPool(Stack pool) + { + while (pool.Count > 0) + { + ArrayPool.Shared.Return(pool.Pop()); + } + } + + /// + /// Returns a span covering the next writable + /// items, growing the backing array if necessary. Call + /// after writing to commit. + /// + public Span GetSpan(int sizeHint) + { + EnsureCapacity(_length + sizeHint); + return _buf.AsSpan(_length); + } + + /// Advances the written length by . + public void Advance(int count) => _length += count; + + /// Appends a single item. + public void Append(T value) + { + EnsureCapacity(_length + 1); + _buf[_length++] = value; + } + + /// Appends a span of items. + public void Append(ReadOnlySpan src) + { + EnsureCapacity(_length + src.Length); + src.CopyTo(_buf.AsSpan(_length)); + _length += src.Length; + } + + /// Appends a range from an array. + public void Append(T[] src, int start, int count) + { + EnsureCapacity(_length + count); + Array.Copy(src, start, _buf, _length, count); + _length += count; + } + + /// Copies the written items into a freshly allocated array of exact length. + public T[] ToArray() + { + T[] result = new T[_length]; + Array.Copy(_buf, 0, result, 0, _length); + return result; + } + + private void EnsureCapacity(int required) + { + if (required > _buf.Length) + { + int newSize = _buf.Length; + do + { + newSize *= 2; + } while (newSize < required); + T[] grown = ArrayPool.Shared.Rent(newSize); + Array.Copy(_buf, 0, grown, 0, _length); + ArrayPool.Shared.Return(_buf); + _buf = grown; + } + } + } + + /// + /// Byte-specific writers for of . + /// Declared as ref this extension methods so invocation through a + /// by-value receiver is a compile error, not a silent copy-and-desync. + /// + internal static class ByteBufferExtensions + { + public static void WriteInt16LE(this ref Buffer buf, short value) + { + BinaryPrimitives.WriteInt16LittleEndian(buf.GetSpan(2), value); + buf.Advance(2); + } + + public static void WriteInt32LE(this ref Buffer buf, int value) + { + BinaryPrimitives.WriteInt32LittleEndian(buf.GetSpan(4), value); + buf.Advance(4); + } + + public static void WriteInt64LE(this ref Buffer buf, long value) + { + BinaryPrimitives.WriteInt64LittleEndian(buf.GetSpan(8), value); + buf.Advance(8); + } + + public static void WriteFloatLE(this ref Buffer buf, float value) + { +#if NET8_0_OR_GREATER + BinaryPrimitives.WriteSingleLittleEndian(buf.GetSpan(4), value); + buf.Advance(4); +#else + int bits = System.Runtime.CompilerServices.Unsafe.As(ref value); + buf.WriteInt32LE(bits); +#endif + } + + public static void WriteDoubleLE(this ref Buffer buf, double value) + { +#if NET8_0_OR_GREATER + BinaryPrimitives.WriteDoubleLittleEndian(buf.GetSpan(8), value); + buf.Advance(8); +#else + long bits = BitConverter.DoubleToInt64Bits(value); + buf.WriteInt64LE(bits); +#endif + } + + public static void WriteSmallInt(this ref Buffer buf, int value, int byteWidth) + { + VariantEncodingHelper.WriteLittleEndianInt(buf.GetSpan(byteWidth), value, byteWidth); + buf.Advance(byteWidth); + } + } +} diff --git a/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs b/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs index d5b83373..7b6dbc4b 100644 --- a/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs +++ b/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs @@ -34,7 +34,7 @@ public sealed class VariantBuilder CollectFieldNames(variant, metadataBuilder); byte[] metadata = metadataBuilder.Build(out int[] idRemap); - VariantValueWriter writer = new VariantValueWriter(metadataBuilder, idRemap); + using VariantValueWriter writer = new VariantValueWriter(metadataBuilder, idRemap); WriteValue(writer, variant); return (metadata, writer.ToArray()); } diff --git a/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs b/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs index dc54aa50..0daa47de 100644 --- a/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs +++ b/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs @@ -15,10 +15,8 @@ using System; using System.Buffers; -using System.Buffers.Binary; using System.Collections.Generic; using System.Data.SqlTypes; -using System.IO; using System.Text; namespace Apache.Arrow.Scalars.Variant @@ -34,22 +32,27 @@ namespace Apache.Arrow.Scalars.Variant /// Create a and every field name that will appear. /// Call to produce the metadata bytes and the ID remap. /// Create a with the metadata builder and remap, emit the value via the Write* / Begin* / End* methods, then call . + /// the writer to return its cached backing arrays to . Skipping Dispose leaks those arrays to the GC. /// /// - public sealed class VariantValueWriter + public sealed class VariantValueWriter : IDisposable { private const int StackAllocThreshold = 256; private readonly VariantMetadataBuilder _metadata; private readonly int[] _idRemap; - private readonly MemoryStream _root = new MemoryStream(); + + // Per-writer stacks of cached backing arrays, separate from + // ArrayPool.Shared so that capacity grown on one frame's buffer + // carries over to the next frame through the same writer without + // being redistributed by size class. + private readonly Stack _bytePool = new Stack(); + private readonly Stack _intPool = new Stack(); + + private Buffer _root; private readonly Stack _frameStack = new Stack(); - private readonly Stack _streamPool = new Stack(); private Frame _frame; - -#if !NET8_0_OR_GREATER - private readonly byte[] _scratch = new byte[16]; -#endif + private bool _disposed; /// /// Creates a writer that produces value bytes referencing the given metadata. @@ -67,6 +70,8 @@ public VariantValueWriter(VariantMetadataBuilder metadata, int[] idRemap) "the idRemap array length must match the metadata builder count used to create it.", nameof(idRemap)); } + + _root.Acquire(_bytePool); } /// @@ -74,6 +79,7 @@ public VariantValueWriter(VariantMetadataBuilder metadata, int[] idRemap) /// public byte[] ToArray() { + if (_disposed) throw new ObjectDisposedException(nameof(VariantValueWriter)); if (_frame != null) { throw new InvalidOperationException("Unclosed object or array at the top of the writer."); @@ -81,6 +87,40 @@ public byte[] ToArray() return _root.ToArray(); } + /// + /// Returns all cached backing arrays (the root buffer, any still-open + /// frame buffers, and the per-writer array pools) to + /// . The writer must not be used after + /// ; calls to or any + /// Write* / Begin* method will throw + /// . Idempotent. + /// + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + // Release still-owned frame arrays into the per-writer pools. + if (_frame != null) ReleaseFrameArrays(_frame); + while (_frameStack.Count > 0) + { + Frame f = _frameStack.Pop(); + if (f != null) ReleaseFrameArrays(f); + } + _root.Release(_bytePool); + + // Drain the per-writer pools back to the process-wide shared pool. + Buffer.DrainPool(_bytePool); + Buffer.DrainPool(_intPool); + } + + private void ReleaseFrameArrays(Frame f) + { + f.Buffer.Release(_bytePool); + f.ValueStarts.Release(_intPool); + if (f is ObjectFrame obj) obj.FieldIds.Release(_intPool); + } + // --------------------------------------------------------------- // Object / array scope // --------------------------------------------------------------- @@ -89,10 +129,26 @@ public byte[] ToArray() public void BeginObject() { BeforeWriteValue(); - _frameStack.Push(_frame); - ObjectFrame frame = new ObjectFrame { Buffer = RentStream() }; - frame.FieldIds = ArrayPool.Shared.Rent(InitialFrameCapacity); - frame.ValueStarts = ArrayPool.Shared.Rent(InitialFrameCapacity); + ObjectFrame frame = new ObjectFrame(); + // Keep _frame / _frameStack untouched until every Acquire + the Push + // have succeeded. If any step throws, the catch releases whatever + // was acquired so far (Release is a no-op on un-Acquired buffers), + // and the writer's visible state is as if BeginObject was never + // called — Dispose sees no orphaned arrays. + try + { + frame.Buffer.Acquire(_bytePool); + frame.ValueStarts.Acquire(_intPool); + frame.FieldIds.Acquire(_intPool); + _frameStack.Push(_frame); + } + catch + { + frame.FieldIds.Release(_intPool); + frame.ValueStarts.Release(_intPool); + frame.Buffer.Release(_bytePool); + throw; + } _frame = frame; } @@ -103,6 +159,7 @@ public void BeginObject() /// public void WriteFieldName(string name) { + if (_disposed) throw new ObjectDisposedException(nameof(VariantValueWriter)); if (!(_frame is ObjectFrame objFrame)) { throw new InvalidOperationException("WriteFieldName may only be called inside an object scope."); @@ -112,13 +169,14 @@ public void WriteFieldName(string name) throw new InvalidOperationException("A value must be written for the previous field before writing the next field name."); } int fieldId = _idRemap[_metadata.GetId(name)]; - AppendInt(ref objFrame.FieldIds, ref objFrame.FieldIdCount, fieldId); + objFrame.FieldIds.Append(fieldId); objFrame.PendingValue = true; } /// Ends the current object scope. public void EndObject() { + if (_disposed) throw new ObjectDisposedException(nameof(VariantValueWriter)); if (!(_frame is ObjectFrame objFrame)) { throw new InvalidOperationException("EndObject called without matching BeginObject."); @@ -129,36 +187,78 @@ public void EndObject() } _frame = _frameStack.Pop(); - MemoryStream output = _frame != null ? _frame.Buffer : _root; - WriteObjectBody(output, objFrame); - ArrayPool.Shared.Return(objFrame.FieldIds); - ArrayPool.Shared.Return(objFrame.ValueStarts); - ReturnStream(objFrame.Buffer); + // Once objFrame is popped it's no longer visible to Dispose, so + // WriteObjectBody must not leave its buffers unreleased on throw. + try + { + if (_frame != null) + { + WriteObjectBody(ref _frame.Buffer, objFrame); + } + else + { + WriteObjectBody(ref _root, objFrame); + } + } + finally + { + objFrame.FieldIds.Release(_intPool); + objFrame.ValueStarts.Release(_intPool); + objFrame.Buffer.Release(_bytePool); + } } /// Begins writing an array. Pair with . public void BeginArray() { BeforeWriteValue(); - _frameStack.Push(_frame); - ArrayFrame frame = new ArrayFrame { Buffer = RentStream() }; - frame.ValueStarts = ArrayPool.Shared.Rent(InitialFrameCapacity); + ArrayFrame frame = new ArrayFrame(); + // See BeginObject: defer any visible state change until all the + // rent-and-push steps have succeeded; on throw, release whatever + // was acquired so nothing escapes Dispose's reach. + try + { + frame.Buffer.Acquire(_bytePool); + frame.ValueStarts.Acquire(_intPool); + _frameStack.Push(_frame); + } + catch + { + frame.ValueStarts.Release(_intPool); + frame.Buffer.Release(_bytePool); + throw; + } _frame = frame; } /// Ends the current array scope. public void EndArray() { + if (_disposed) throw new ObjectDisposedException(nameof(VariantValueWriter)); if (!(_frame is ArrayFrame arrFrame)) { throw new InvalidOperationException("EndArray called without matching BeginArray."); } _frame = _frameStack.Pop(); - MemoryStream output = _frame != null ? _frame.Buffer : _root; - WriteArrayBody(output, arrFrame); - ArrayPool.Shared.Return(arrFrame.ValueStarts); - ReturnStream(arrFrame.Buffer); + // Popped frame is no longer visible to Dispose; the finally makes + // sure its buffers are released even if WriteArrayBody throws. + try + { + if (_frame != null) + { + WriteArrayBody(ref _frame.Buffer, arrFrame); + } + else + { + WriteArrayBody(ref _root, arrFrame); + } + } + finally + { + arrFrame.ValueStarts.Release(_intPool); + arrFrame.Buffer.Release(_bytePool); + } } // --------------------------------------------------------------- @@ -168,48 +268,48 @@ public void EndArray() /// Writes a null value. public void WriteNull() { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.NullType)); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.NullType)); } /// Writes a boolean value. public void WriteBoolean(bool value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader( + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader( value ? VariantPrimitiveType.BooleanTrue : VariantPrimitiveType.BooleanFalse)); } /// Writes an 8-bit signed integer. public void WriteInt8(sbyte value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int8)); - ms.WriteByte((byte)value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int8)); + buf.Append((byte)value); } /// Writes a 16-bit signed integer. public void WriteInt16(short value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int16)); - WriteInt16LE(ms, value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int16)); + buf.WriteInt16LE(value); } /// Writes a 32-bit signed integer. public void WriteInt32(int value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int32)); - WriteInt32LE(ms, value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int32)); + buf.WriteInt32LE(value); } /// Writes a 64-bit signed integer. public void WriteInt64(long value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int64)); - WriteInt64LE(ms, value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int64)); + buf.WriteInt64LE(value); } /// @@ -239,24 +339,24 @@ public void WriteIntegerCompact(long value) /// Writes a 32-bit IEEE 754 float. public void WriteFloat(float value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Float)); - WriteFloatLE(ms, value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Float)); + buf.WriteFloatLE(value); } /// Writes a 64-bit IEEE 754 double. public void WriteDouble(double value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Double)); - WriteDoubleLE(ms, value); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Double)); + buf.WriteDoubleLE(value); } /// Writes a Decimal4 (precision ≤ 9) value. public void WriteDecimal4(decimal value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal4)); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal4)); #if NET8_0_OR_GREATER Span bits = stackalloc int[4]; decimal.GetBits(value, bits); @@ -267,15 +367,15 @@ public void WriteDecimal4(decimal value) bool negative = (bits[3] & unchecked((int)0x80000000)) != 0; int unscaled = bits[0]; if (negative) unscaled = -unscaled; - ms.WriteByte(scale); - WriteInt32LE(ms, unscaled); + buf.Append(scale); + buf.WriteInt32LE(unscaled); } /// Writes a Decimal8 (precision ≤ 18) value. public void WriteDecimal8(decimal value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal8)); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal8)); #if NET8_0_OR_GREATER Span bits = stackalloc int[4]; decimal.GetBits(value, bits); @@ -286,15 +386,15 @@ public void WriteDecimal8(decimal value) bool negative = (bits[3] & unchecked((int)0x80000000)) != 0; long unscaled = ((long)bits[1] << 32) | (uint)bits[0]; if (negative) unscaled = -unscaled; - ms.WriteByte(scale); - WriteInt64LE(ms, unscaled); + buf.Append(scale); + buf.WriteInt64LE(unscaled); } /// Writes a Decimal16 (precision ≤ 38) value stored as . public void WriteDecimal16(SqlDecimal value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal16)); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal16)); bool positive = value.IsPositive; byte scale = (byte)value.Scale; @@ -314,160 +414,155 @@ public void WriteDecimal16(SqlDecimal value) lo = (long)uLo; } - ms.WriteByte(scale); - WriteInt64LE(ms, lo); - WriteInt64LE(ms, hi); + buf.Append(scale); + buf.WriteInt64LE(lo); + buf.WriteInt64LE(hi); } /// Writes a string. Uses the short-string encoding when the UTF-8 byte length is ≤ 63. public void WriteString(string value) { if (value == null) throw new ArgumentNullException(nameof(value)); - MemoryStream ms = BeforeWriteValue(); + ref Buffer buf = ref BeforeWriteValue(); int byteCount = Encoding.UTF8.GetByteCount(value); if (byteCount <= 63) { - ms.WriteByte(VariantEncodingHelper.MakeShortStringHeader(byteCount)); + buf.Append(VariantEncodingHelper.MakeShortStringHeader(byteCount)); } else { - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.String)); - WriteInt32LE(ms, byteCount); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.String)); + buf.WriteInt32LE(byteCount); } - // Encode UTF-8 directly into the MemoryStream's buffer. - int dataPos = (int)ms.Position; - int needed = dataPos + byteCount; - if (needed > ms.Length) - { - ms.SetLength(needed); - } - Encoding.UTF8.GetBytes(value, 0, value.Length, ms.GetBuffer(), dataPos); - ms.Position = needed; + // Encode UTF-8 directly into the buffer. + Span dest = buf.GetSpan(byteCount); +#if NET8_0_OR_GREATER + Encoding.UTF8.GetBytes(value, dest); +#else + Encoding.UTF8.GetBytes(value, 0, value.Length, buf.RawBuffer, buf.Length); +#endif + buf.Advance(byteCount); } /// Writes a binary blob. - public void WriteBinary(byte[] data) + public void WriteBinary(ReadOnlySpan data) { - if (data == null) throw new ArgumentNullException(nameof(data)); - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Binary)); - WriteInt32LE(ms, data.Length); - ms.Write(data, 0, data.Length); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Binary)); + buf.WriteInt32LE(data.Length); + buf.Append(data); } /// Writes a UUID. - public void WriteUuid(Guid value) + public unsafe void WriteUuid(Guid value) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Uuid)); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Uuid)); + Span raw = stackalloc byte[16]; + #if NET8_0_OR_GREATER - Span buf = stackalloc byte[16]; - value.TryWriteBytes(buf, bigEndian: true, out int _); - ms.Write(buf); + value.TryWriteBytes(raw, bigEndian: true, out _); #else - byte[] native = value.ToByteArray(); // Convert from .NET mixed-endian to big-endian (RFC 4122). - _scratch[0] = native[3]; _scratch[1] = native[2]; _scratch[2] = native[1]; _scratch[3] = native[0]; - _scratch[4] = native[5]; _scratch[5] = native[4]; - _scratch[6] = native[7]; _scratch[7] = native[6]; - Buffer.BlockCopy(native, 8, _scratch, 8, 8); - ms.Write(_scratch, 0, 16); + byte* guidPtr = (byte*)&value; + fixed (byte* bytePtr = raw) + { + bytePtr[0] = guidPtr[3]; + bytePtr[1] = guidPtr[2]; + bytePtr[2] = guidPtr[1]; + bytePtr[3] = guidPtr[0]; + bytePtr[4] = guidPtr[5]; + bytePtr[5] = guidPtr[4]; + bytePtr[6] = guidPtr[7]; + bytePtr[7] = guidPtr[6]; + ((long*)bytePtr)[1] = ((long*)guidPtr)[1]; + } #endif + buf.Append(raw); } /// Writes a date as days since the Unix epoch. public void WriteDateDays(int days) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Date)); - WriteInt32LE(ms, days); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Date)); + buf.WriteInt32LE(days); } /// Writes a timestamp (tz-adjusted microseconds since the Unix epoch). public void WriteTimestampMicros(long micros) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Timestamp)); - WriteInt64LE(ms, micros); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Timestamp)); + buf.WriteInt64LE(micros); } /// Writes a timestamp-without-timezone (microseconds since the Unix epoch). public void WriteTimestampNtzMicros(long micros) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtz)); - WriteInt64LE(ms, micros); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtz)); + buf.WriteInt64LE(micros); } /// Writes a time-without-timezone value (microseconds since midnight). public void WriteTimeNtzMicros(long micros) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimeNtz)); - WriteInt64LE(ms, micros); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimeNtz)); + buf.WriteInt64LE(micros); } /// Writes a timestamp with timezone (nanoseconds since the Unix epoch). public void WriteTimestampTzNanos(long nanos) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampTzNanos)); - WriteInt64LE(ms, nanos); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampTzNanos)); + buf.WriteInt64LE(nanos); } /// Writes a timestamp without timezone (nanoseconds since the Unix epoch). public void WriteTimestampNtzNanos(long nanos) { - MemoryStream ms = BeforeWriteValue(); - ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtzNanos)); - WriteInt64LE(ms, nanos); + ref Buffer buf = ref BeforeWriteValue(); + buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtzNanos)); + buf.WriteInt64LE(nanos); } // --------------------------------------------------------------- // Internal bookkeeping // --------------------------------------------------------------- - private MemoryStream BeforeWriteValue() + private ref Buffer BeforeWriteValue() { + if (_disposed) throw new ObjectDisposedException(nameof(VariantValueWriter)); if (_frame is ObjectFrame objFrame) { if (!objFrame.PendingValue) { throw new InvalidOperationException("A field name is required before writing an object field value. Call WriteFieldName first."); } - AppendInt(ref objFrame.ValueStarts, ref objFrame.ValueStartCount, (int)objFrame.Buffer.Position); + objFrame.ValueStarts.Append(objFrame.Buffer.Length); objFrame.PendingValue = false; - return objFrame.Buffer; + return ref objFrame.Buffer; } if (_frame is ArrayFrame arrFrame) { - AppendInt(ref arrFrame.ValueStarts, ref arrFrame.ValueStartCount, (int)arrFrame.Buffer.Position); - return arrFrame.Buffer; + arrFrame.ValueStarts.Append(arrFrame.Buffer.Length); + return ref arrFrame.Buffer; } - return _root; + return ref _root; } - private static void AppendInt(ref int[] array, ref int count, int value) + private static void WriteObjectBody(ref Buffer output, ObjectFrame frame) { - if (count == array.Length) - { - int[] grown = ArrayPool.Shared.Rent(array.Length * 2); - Array.Copy(array, 0, grown, 0, count); - ArrayPool.Shared.Return(array); - array = grown; - } - array[count++] = value; - } - - private void WriteObjectBody(MemoryStream output, ObjectFrame frame) - { - int fieldCount = frame.FieldIdCount; + int fieldCount = frame.FieldIds.Length; // Sentinel marks the end of the last value in the frame buffer. - AppendInt(ref frame.ValueStarts, ref frame.ValueStartCount, (int)frame.Buffer.Position); + frame.ValueStarts.Append(frame.Buffer.Length); - int[] fieldIds = frame.FieldIds; - int[] valueStarts = frame.ValueStarts; + int[] fieldIds = frame.FieldIds.RawBuffer; + int[] valueStarts = frame.ValueStarts.RawBuffer; // Sort indices so fields are emitted in sorted-field-id order. #if NET8_0_OR_GREATER @@ -506,169 +601,75 @@ private void WriteObjectBody(MemoryStream output, ObjectFrame frame) int offsetSize = VariantEncodingHelper.ByteWidthForValue(Math.Max(1, offsets[fieldCount])); bool isLarge = fieldCount > 255; - output.WriteByte(VariantEncodingHelper.MakeObjectHeader(fieldIdSize, offsetSize, isLarge)); + output.Append(VariantEncodingHelper.MakeObjectHeader(fieldIdSize, offsetSize, isLarge)); if (isLarge) { - WriteInt32LE(output, fieldCount); + output.WriteInt32LE(fieldCount); } else { - output.WriteByte((byte)fieldCount); + output.Append((byte)fieldCount); } for (int i = 0; i < fieldCount; i++) { - WriteSmallInt(output, fieldIds[sortOrder[i]], fieldIdSize); + output.WriteSmallInt(fieldIds[sortOrder[i]], fieldIdSize); } for (int i = 0; i <= fieldCount; i++) { - WriteSmallInt(output, offsets[i], offsetSize); + output.WriteSmallInt(offsets[i], offsetSize); } - byte[] valueBuffer = frame.Buffer.GetBuffer(); + byte[] valueBuffer = frame.Buffer.RawBuffer; for (int i = 0; i < fieldCount; i++) { int idx = sortOrder[i]; int start = valueStarts[idx]; int length = valueStarts[idx + 1] - start; - output.Write(valueBuffer, start, length); + output.Append(valueBuffer, start, length); } } - private void WriteArrayBody(MemoryStream output, ArrayFrame frame) + private static void WriteArrayBody(ref Buffer output, ArrayFrame frame) { - int elementCount = frame.ValueStartCount; + int elementCount = frame.ValueStarts.Length; // Sentinel marks the end of the last element. - AppendInt(ref frame.ValueStarts, ref frame.ValueStartCount, (int)frame.Buffer.Position); - int[] valueStarts = frame.ValueStarts; + frame.ValueStarts.Append(frame.Buffer.Length); + int[] valueStarts = frame.ValueStarts.RawBuffer; int offsetSize = VariantEncodingHelper.ByteWidthForValue(Math.Max(1, valueStarts[elementCount])); bool isLarge = elementCount > 255; - output.WriteByte(VariantEncodingHelper.MakeArrayHeader(offsetSize, isLarge)); + output.Append(VariantEncodingHelper.MakeArrayHeader(offsetSize, isLarge)); if (isLarge) { - WriteInt32LE(output, elementCount); + output.WriteInt32LE(elementCount); } else { - output.WriteByte((byte)elementCount); + output.Append((byte)elementCount); } for (int i = 0; i <= elementCount; i++) { - WriteSmallInt(output, valueStarts[i], offsetSize); + output.WriteSmallInt(valueStarts[i], offsetSize); } - output.Write(frame.Buffer.GetBuffer(), 0, (int)frame.Buffer.Position); + output.Append(frame.Buffer.RawBuffer, 0, frame.Buffer.Length); } - private MemoryStream RentStream() - { - if (_streamPool.Count > 0) - { - MemoryStream ms = _streamPool.Pop(); - ms.SetLength(0); - return ms; - } - return new MemoryStream(); - } - - private void ReturnStream(MemoryStream ms) - { - _streamPool.Push(ms); - } - - private void WriteSmallInt(MemoryStream ms, int value, int byteWidth) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[4]; - VariantEncodingHelper.WriteLittleEndianInt(buf, value, byteWidth); - ms.Write(buf.Slice(0, byteWidth)); -#else - VariantEncodingHelper.WriteLittleEndianInt(_scratch, value, byteWidth); - ms.Write(_scratch, 0, byteWidth); -#endif - } - - private void WriteInt16LE(MemoryStream ms, short value) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[2]; - BinaryPrimitives.WriteInt16LittleEndian(buf, value); - ms.Write(buf); -#else - BinaryPrimitives.WriteInt16LittleEndian(_scratch, value); - ms.Write(_scratch, 0, 2); -#endif - } - - private void WriteInt32LE(MemoryStream ms, int value) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[4]; - BinaryPrimitives.WriteInt32LittleEndian(buf, value); - ms.Write(buf); -#else - BinaryPrimitives.WriteInt32LittleEndian(_scratch, value); - ms.Write(_scratch, 0, 4); -#endif - } - - private void WriteInt64LE(MemoryStream ms, long value) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[8]; - BinaryPrimitives.WriteInt64LittleEndian(buf, value); - ms.Write(buf); -#else - BinaryPrimitives.WriteInt64LittleEndian(_scratch, value); - ms.Write(_scratch, 0, 8); -#endif - } - - private void WriteFloatLE(MemoryStream ms, float value) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[4]; - BinaryPrimitives.WriteSingleLittleEndian(buf, value); - ms.Write(buf); -#else - int bits = System.Runtime.CompilerServices.Unsafe.As(ref value); - BinaryPrimitives.WriteInt32LittleEndian(_scratch, bits); - ms.Write(_scratch, 0, 4); -#endif - } - - private void WriteDoubleLE(MemoryStream ms, double value) - { -#if NET8_0_OR_GREATER - Span buf = stackalloc byte[8]; - BinaryPrimitives.WriteDoubleLittleEndian(buf, value); - ms.Write(buf); -#else - long bits = BitConverter.DoubleToInt64Bits(value); - BinaryPrimitives.WriteInt64LittleEndian(_scratch, bits); - ms.Write(_scratch, 0, 8); -#endif - } - - private const int InitialFrameCapacity = 16; - private abstract class Frame { - public MemoryStream Buffer; - public int[] ValueStarts; - public int ValueStartCount; + public Buffer Buffer; + public Buffer ValueStarts; } private sealed class ObjectFrame : Frame { - public int[] FieldIds; - public int FieldIdCount; + public Buffer FieldIds; public bool PendingValue; } diff --git a/src/Apache.Arrow/Arrays/GuidArray.cs b/src/Apache.Arrow/Arrays/GuidArray.cs index 0f4c3e1f..cad9cfab 100644 --- a/src/Apache.Arrow/Arrays/GuidArray.cs +++ b/src/Apache.Arrow/Arrays/GuidArray.cs @@ -140,6 +140,9 @@ public static unsafe void GuidToRFC4122(Guid guid, Span bytes) if (bytes.Length != GuidType.ByteWidth) throw new ArgumentException("Byte span must be exactly 16 bytes long.", nameof(bytes)); +#if NET8_0_OR_GREATER + guid.TryWriteBytes(bytes, bigEndian: true, out _); +#else byte* guidPtr = (byte*)&guid; fixed (byte* bytePtr = bytes) { @@ -153,6 +156,7 @@ public static unsafe void GuidToRFC4122(Guid guid, Span bytes) bytePtr[7] = guidPtr[6]; ((long*)bytePtr)[1] = ((long*)guidPtr)[1]; } +#endif } /// @@ -164,6 +168,9 @@ public static unsafe Guid RFC4122ToGuid(ReadOnlySpan bytes) if (bytes.Length != GuidType.ByteWidth) throw new ArgumentException("Byte span must be exactly 16 bytes long.", nameof(bytes)); +#if NET8_0_OR_GREATER + Guid result = new Guid(bytes, bigEndian: true); +#else Guid result = new Guid(); byte* guidPtr = (byte*)&result; fixed (byte* bytePtr = bytes) @@ -177,8 +184,9 @@ public static unsafe Guid RFC4122ToGuid(ReadOnlySpan bytes) guidPtr[6] = bytePtr[7]; guidPtr[7] = bytePtr[6]; ((long*)guidPtr)[1] = ((long*)bytePtr)[1]; - return result; } +#endif + return result; } public Guid? GetGuid(int index) diff --git a/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs b/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs index 2282859b..8fe4c01b 100644 --- a/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs +++ b/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs @@ -152,6 +152,21 @@ public void Encode_Uuid() Assert.Equal(guid, reader.GetUuid()); } + [Fact] + public void Encode_Uuid_ProducesRfc4122BigEndianValueBytes() + { + // Round-tripping alone can't catch an endian bug in WriteUuid if + // VariantReader has the mirror bug. Pin the wire format directly: + // TestVectors.PrimitiveUuid is the canonical encoding of + // "550e8400-e29b-41d4-a716-446655440000" — 0x50 header + 16 bytes + // in RFC 4122 big-endian order (first three fields big-endian, + // last 8 bytes as-is). + Guid guid = new Guid("550e8400-e29b-41d4-a716-446655440000"); + (byte[] _, byte[] value) = _builder.Encode(VariantValue.FromUuid(guid)); + + Assert.Equal(TestVectors.PrimitiveUuid.ToArray(), value); + } + [Fact] public void Encode_Decimal4() {