Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions GVFS/GVFS.Common/NamedPipes/NamedPipeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@

namespace GVFS.Common.NamedPipes
{
/// <summary>
/// The server side of a Named Pipe used for interprocess communication.
///
/// Named Pipe protocol:
/// The client / server process sends a "message" (or line) of data as a
/// sequence of bytes terminated by a 0x3 byte (ASCII control code for
/// End of text). Text is encoded as UTF-8 to be sent as bytes across the wire.
///
/// This format was chosen so that:
/// 1) A reasonable range of values can be transmitted across the pipe,
/// including null and bytes that represent newline characters.
/// 2) It would be easy to implement in multiple places, as we
/// have managed and native implementations.
/// </summary>
public class NamedPipeServer : IDisposable
{
// TODO(Mac) the limit is much shorter on macOS
Expand Down
63 changes: 24 additions & 39 deletions GVFS/GVFS.Common/NamedPipes/NamedPipeStreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,16 @@ namespace GVFS.Common.NamedPipes
/// </summary>
public class NamedPipeStreamReader
{
private const int DefaultBufferSize = 1024;
private const int InitialListSize = 1024;
private const byte TerminatorByte = 0x3;
private readonly byte[] buffer;

private int bufferSize;
private byte[] buffer;
private Stream stream;

public NamedPipeStreamReader(Stream stream, int bufferSize)
{
this.stream = stream;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}

public NamedPipeStreamReader(Stream stream)
: this(stream, DefaultBufferSize)
{
this.stream = stream;
this.buffer = new byte[1];
}

/// <summary>
Expand All @@ -36,38 +29,23 @@ public NamedPipeStreamReader(Stream stream)
/// <returns>The message read from the stream, or null if the end of the input stream has been reached. </returns>
public string ReadMessage()
{
int bytesRead = this.Read();
if (bytesRead == 0)
byte currentByte;

bool streamOpen = this.TryReadByte(out currentByte);
if (!streamOpen)
{
// The end of the stream has been reached - return null to indicate this.
return null;
}

// If we have read in the entire message in the first read (mainline scenario),
// then just process the data directly from the buffer.
if (this.buffer[bytesRead - 1] == TerminatorByte)
{
return Encoding.UTF8.GetString(this.buffer, 0, bytesRead - 1);
}
List<byte> bytes = new List<byte>(InitialListSize);

// We need to process multiple chunks - collect data from multiple chunks
// into a single list
List<byte> bytes = new List<byte>(this.bufferSize * 2);

while (true)
do
{
bool encounteredTerminatorByte = this.buffer[bytesRead - 1] == TerminatorByte;
int lengthToCopy = encounteredTerminatorByte ? bytesRead - 1 : bytesRead;
bytes.Add(currentByte);
streamOpen = this.TryReadByte(out currentByte);

bytes.AddRange(new ArraySegment<byte>(this.buffer, 0, lengthToCopy));
if (encounteredTerminatorByte)
{
break;
}

bytesRead = this.Read();

if (bytesRead == 0)
if (!streamOpen)
{
// We have read a partial message (the last byte received does not indicate that
// this was the end of the message), but the stream has been closed. Throw an exception
Expand All @@ -76,17 +54,24 @@ public string ReadMessage()
throw new IOException("Incomplete message read from stream. The end of the stream was reached without the expected terminating byte.");
}
}
while (currentByte != TerminatorByte);

return Encoding.UTF8.GetString(bytes.ToArray());
}

/// <summary>
/// Read the next chunk of bytes from the stream.
/// Read a byte from the stream.
/// </summary>
/// <returns>The number of bytes read.</returns>
private int Read()
/// <param name="readByte">The byte read from the stream</param>
/// <returns>True if byte read, false if end of stream has been reached</returns>
private bool TryReadByte(out byte readByte)
{
return this.stream.Read(this.buffer, 0, this.buffer.Length);
this.buffer[0] = 0;

int numBytesRead = this.stream.Read(this.buffer, 0, 1);
readByte = this.buffer[0];

return numBytesRead == 1;
}
}
}
61 changes: 31 additions & 30 deletions GVFS/GVFS.UnitTests/Common/NamedPipeStreamReaderWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace GVFS.UnitTests.Common
[TestFixture]
public class NamedPipeStreamReaderWriterTests
{
private const int BufferSize = 256;

private MemoryStream stream;
private NamedPipeStreamWriter streamWriter;
private NamedPipeStreamReader streamReader;
Expand All @@ -20,11 +18,10 @@ public void Setup()
{
this.stream = new MemoryStream();
this.streamWriter = new NamedPipeStreamWriter(this.stream);
this.streamReader = new NamedPipeStreamReader(this.stream, BufferSize);
this.streamReader = new NamedPipeStreamReader(this.stream);
}

[Test]
[Description("Verify that we can transmit multiple messages")]
public void CanWriteAndReadMessages()
{
string firstMessage = @"This is a new message";
Expand All @@ -41,24 +38,6 @@ public void CanWriteAndReadMessages()
}

[Test]
[Description("Verify that we can transmit a message that contains content that is the size of a NamedPipeStreamReader's buffer")]
public void CanSendBufferSizedContent()
{
string longMessage = new string('T', BufferSize);
this.TestTransmitMessage(longMessage);
}

[Test]
[Description("Verify that we can transmit message that is the same size a NamedPipeStreamReader's buffer")]
public void CanSendBufferSizedMessage()
{
int numBytesInMessageTerminator = 1;
string longMessage = new string('T', BufferSize - numBytesInMessageTerminator);
this.TestTransmitMessage(longMessage);
}

[Test]
[Description("Verify that the expected exception is thrown if message is not terminated with expected byte.")]
[Category(CategoryConstants.ExceptionExpected)]
public void ReadingPartialMessgeThrows()
{
Expand All @@ -71,19 +50,23 @@ public void ReadingPartialMessgeThrows()
}

[Test]
[Description("Verify that we can transmit message that is larger than the buffer")]
public void CanSendMultiBufferSizedMessage()
public void CanSendMessagesWithNewLines()
{
string longMessage = new string('T', BufferSize * 3);
this.TestTransmitMessage(longMessage);
string messageWithNewLines = "This is a \nstringwith\nnewlines";
this.TestTransmitMessage(messageWithNewLines);
}

[Test]
[Description("Verify that we can transmit message that newline characters")]
public void CanSendNewLines()
public void CanSendMultipleMessagesSequentially()
{
string messageWithNewLines = "This is a \nstringwith\nnewlines";
this.TestTransmitMessage(messageWithNewLines);
string[] messages = new string[]
{
"This is a new message",
"This is another message",
"This is the third message in a series of messages"
};

this.TestTransmitMessages(messages);
}

private void TestTransmitMessage(string message)
Expand All @@ -96,6 +79,24 @@ private void TestTransmitMessage(string message)
string readMessage = this.streamReader.ReadMessage();
readMessage.ShouldEqual(message, "The message read from the stream reader is not the same as the message that was sent.");
}

private void TestTransmitMessages(string[] messages)
{
long pos = this.ReadStreamPosition();

foreach (string message in messages)
{
this.streamWriter.WriteMessage(message);
}

this.SetStreamPosition(pos);

foreach (string message in messages)
{
string readMessage = this.streamReader.ReadMessage();
readMessage.ShouldEqual(message, "The message read from the stream reader is not the same as the message that was sent.");
}
}

private long ReadStreamPosition()
{
Expand Down