@@ -129,6 +129,7 @@ public static TcpQactiveProvider Server(IPEndPoint endPoint, ITcpQactiveProvider
129129 return new TcpQactiveProvider ( endPoint , transportInitializer ) ;
130130 }
131131
132+ [ System . Diagnostics . CodeAnalysis . SuppressMessage ( "Microsoft.Maintainability" , "CA1506:AvoidExcessiveClassCoupling" , Justification = "Seems as simple as it's going to get." ) ]
132133 [ System . Diagnostics . CodeAnalysis . SuppressMessage ( "Microsoft.Reliability" , "CA2000:Dispose objects before losing scope" , Justification = "The SocketAsyncEventArgs instance is either disposed before returning or by the observable's Finally operator." ) ]
133134 public override IObservable < TResult > Connect < TResult > ( Func < IQbservableProtocol , Expression > prepareExpression )
134135 {
@@ -181,7 +182,7 @@ public override IObservable<TResult> Connect<TResult>(Func<IQbservableProtocol,
181182
182183 var s = Observable . Using (
183184 ( ) => new NetworkStream ( e2 . ConnectSocket , ownsSocket : false ) ,
184- stream => ReadObservable < TResult > ( stream , prepareExpression , cancel . Token ) )
185+ stream => GetObservable < TResult > ( stream , prepareExpression , cancel . Token ) )
185186 . SubscribeSafe ( innerObserver ) ;
186187
187188 return new CompositeDisposable ( s , cancel ) ;
@@ -205,7 +206,7 @@ public override IObservable<TResult> Connect<TResult>(Func<IQbservableProtocol,
205206 }
206207 }
207208
208- private IObservable < TResult > ReadObservable < TResult > ( Stream stream , Func < IQbservableProtocol , Expression > prepareExpression , CancellationToken cancel )
209+ private IObservable < TResult > GetObservable < TResult > ( Stream stream , Func < IQbservableProtocol , Expression > prepareExpression , CancellationToken cancel )
209210 {
210211 Contract . Requires ( stream != null ) ;
211212 Contract . Requires ( prepareExpression != null ) ;
@@ -218,6 +219,7 @@ from result in protocol
218219 select result ;
219220 }
220221
222+ [ System . Diagnostics . CodeAnalysis . SuppressMessage ( "Microsoft.Maintainability" , "CA1506:AvoidExcessiveClassCoupling" , Justification = "Seems as simple as it's going to get." ) ]
221223 public override IObservable < ClientTermination > Listen ( QbservableServiceOptions options , Func < IQbservableProtocol , IParameterizedQbservableProvider > providerFactory )
222224 {
223225 return from listener in Observable . Return ( new TcpListener ( EndPoint ) )
@@ -251,84 +253,96 @@ from client in Observable.FromAsync(listener.AcceptTcpClientAsync).Repeat()
251253 Stopped ( ) ;
252254 } )
253255 let capturedId = new CapturedId ( Id + " C" + Interlocked . Increment ( ref lastServerClientNumber ) + " " + client . Client . RemoteEndPoint )
254- from result in Observable . StartAsync ( async cancel =>
255- {
256- ReceivingConnection ( idOverride : capturedId . Value ) ;
256+ from termination in Observable . StartAsync ( cancel => AcceptAsync ( client , capturedId , options , providerFactory , cancel ) )
257+ . Finally ( ( ) => Shutdown ( client . Client , capturedId . Value ) )
258+ select termination ;
259+ }
257260
258- // These default settings enable a proper graceful shutdown. DisconnectAsync is used instead of Close on the server-side to request
259- // that the client terminates the connection ASAP. This is important because it prevents the server-side socket from going into a
260- // TIME_WAIT state rather than the client. The linger option is meant to ensure that any outgoing data, such as an exception, is
261- // completely transmitted to the client before the socket terminates. The seconds specified is arbitrary, though chosen to be large
262- // enough to transfer any remaining data successfully and small enough to cause a timely disconnection. A custom prepareSocket
263- // implementation can always change it via SetSocketOption, if necessary.
264- //
265- // https://msdn.microsoft.com/en-us/library/system.net.sockets.socket.disconnect(v=vs.110).aspx
266- client . LingerState . Enabled = true ;
267- client . LingerState . LingerTime = LingerTimeInSeconds ;
261+ private async Task < TcpClientTermination > AcceptAsync (
262+ TcpClient client ,
263+ CapturedId capturedId ,
264+ QbservableServiceOptions options ,
265+ Func < IQbservableProtocol , IParameterizedQbservableProvider > providerFactory ,
266+ CancellationToken cancel )
267+ {
268+ Contract . Requires ( client != null ) ;
269+ Contract . Requires ( capturedId != null ) ;
270+ Contract . Requires ( options != null ) ;
271+ Contract . Requires ( providerFactory != null ) ;
268272
269- prepareSocket ( client . Client ) ;
273+ ReceivingConnection ( idOverride : capturedId . Value ) ;
270274
271- var watch = Stopwatch . StartNew ( ) ;
275+ // These default settings enable a proper graceful shutdown. DisconnectAsync is used instead of Close on the server-side to request
276+ // that the client terminates the connection ASAP. This is important because it prevents the server-side socket from going into a
277+ // TIME_WAIT state rather than the client. The linger option is meant to ensure that any outgoing data, such as an exception, is
278+ // completely transmitted to the client before the socket terminates. The seconds specified is arbitrary, though chosen to be large
279+ // enough to transfer any remaining data successfully and small enough to cause a timely disconnection. A custom prepareSocket
280+ // implementation can always change it via SetSocketOption, if necessary.
281+ //
282+ // https://msdn.microsoft.com/en-us/library/system.net.sockets.socket.disconnect(v=vs.110).aspx
283+ client . LingerState . Enabled = true ;
284+ client . LingerState . LingerTime = LingerTimeInSeconds ;
272285
273- var localEndPoint = client . Client . LocalEndPoint ;
274- var remoteEndPoint = client . Client . RemoteEndPoint ;
286+ prepareSocket ( client . Client ) ;
275287
276- var exceptions = new List < ExceptionDispatchInfo > ( ) ;
277- var shutdownReason = QbservableProtocolShutdownReason . None ;
288+ var watch = Stopwatch . StartNew ( ) ;
278289
279- try
280- {
281- using ( var stream = new NetworkStream ( client . Client , ownsSocket : false ) )
282- using ( var protocol = await NegotiateServerAsync ( capturedId . Value , stream , formatterFactory ( ) , options , cancel ) . ConfigureAwait ( false ) )
283- {
284- capturedId . Value = protocol . ClientId ;
285-
286- var provider = providerFactory ( protocol ) ;
287-
288- ReceivedConnection ( idOverride : capturedId . Value ) ;
289-
290- try
291- {
292- await protocol . ExecuteServerAsync ( provider ) . ConfigureAwait ( false ) ;
293- }
294- catch ( OperationCanceledException )
295- {
296- }
297- catch ( Exception ex )
298- {
299- exceptions . Add ( ExceptionDispatchInfo . Capture ( ex ) ) ;
300- }
301- finally
302- {
303- shutdownReason = protocol . ShutdownReason ;
304- }
305-
306- var protocolExceptions = protocol . Exceptions ;
307-
308- if ( protocolExceptions != null )
309- {
310- foreach ( var exception in protocolExceptions )
311- {
312- exceptions . Add ( exception ) ;
313- }
314- }
315- }
316- }
317- catch ( OperationCanceledException )
318- {
319- shutdownReason = QbservableProtocolShutdownReason . ProtocolNegotiationCanceled ;
320- }
321- catch ( Exception ex )
322- {
323- shutdownReason = QbservableProtocolShutdownReason . ProtocolNegotiationError ;
290+ var localEndPoint = client . Client . LocalEndPoint ;
291+ var remoteEndPoint = client . Client . RemoteEndPoint ;
324292
325- exceptions . Add ( ExceptionDispatchInfo . Capture ( ex ) ) ;
326- }
293+ var exceptions = new List < ExceptionDispatchInfo > ( ) ;
294+ var shutdownReason = QbservableProtocolShutdownReason . None ;
327295
328- return new TcpClientTermination ( localEndPoint , remoteEndPoint , watch . Elapsed , shutdownReason , exceptions ) ;
329- } )
330- . Finally ( ( ) => Shutdown ( client . Client , capturedId . Value ) )
331- select result ;
296+ try
297+ {
298+ using ( var stream = new NetworkStream ( client . Client , ownsSocket : false ) )
299+ using ( var protocol = await NegotiateServerAsync ( capturedId . Value , stream , formatterFactory ( ) , options , cancel ) . ConfigureAwait ( false ) )
300+ {
301+ capturedId . Value = protocol . ClientId ;
302+
303+ var provider = providerFactory ( protocol ) ;
304+
305+ ReceivedConnection ( idOverride : capturedId . Value ) ;
306+
307+ try
308+ {
309+ await protocol . ExecuteServerAsync ( provider ) . ConfigureAwait ( false ) ;
310+ }
311+ catch ( OperationCanceledException )
312+ {
313+ }
314+ catch ( Exception ex )
315+ {
316+ exceptions . Add ( ExceptionDispatchInfo . Capture ( ex ) ) ;
317+ }
318+ finally
319+ {
320+ shutdownReason = protocol . ShutdownReason ;
321+ }
322+
323+ var protocolExceptions = protocol . Exceptions ;
324+
325+ if ( protocolExceptions != null )
326+ {
327+ foreach ( var exception in protocolExceptions )
328+ {
329+ exceptions . Add ( exception ) ;
330+ }
331+ }
332+ }
333+ }
334+ catch ( OperationCanceledException )
335+ {
336+ shutdownReason = QbservableProtocolShutdownReason . ProtocolNegotiationCanceled ;
337+ }
338+ catch ( Exception ex )
339+ {
340+ shutdownReason = QbservableProtocolShutdownReason . ProtocolNegotiationError ;
341+
342+ exceptions . Add ( ExceptionDispatchInfo . Capture ( ex ) ) ;
343+ }
344+
345+ return new TcpClientTermination ( localEndPoint , remoteEndPoint , watch . Elapsed , shutdownReason , exceptions ) ;
332346 }
333347
334348 private async Task < IStreamQbservableProtocol > NegotiateClientAsync ( Stream stream , IRemotingFormatter formatter , CancellationToken cancel )
@@ -464,7 +478,7 @@ private async void Shutdown(Socket socket, object id = null)
464478 }
465479
466480 [ System . Diagnostics . CodeAnalysis . SuppressMessage ( "Microsoft.Reliability" , "CA2000:Dispose objects before losing scope" , Justification = "The SocketAsyncEventArgs instance is either disposed before returning or by the observable's Finally operator." ) ]
467- private async Task DisconnectAsync ( Socket socket )
481+ private static async Task DisconnectAsync ( Socket socket )
468482 {
469483 Contract . Requires ( socket != null ) ;
470484
0 commit comments