@@ -79,48 +79,44 @@ export const observeRpcStream = <A, E, R>(
7979 stream : Stream . Stream < A , E , R > ,
8080 traceAttributes ?: Readonly < Record < string , unknown > > ,
8181) : Stream . Stream < A , E , R > => {
82- const instrumented = Stream . unwrap (
83- Effect . gen ( function * ( ) {
84- const startedAt = Date . now ( ) ;
85- return stream . pipe ( Stream . onExit ( ( exit ) => recordRpcStreamMetrics ( method , startedAt , exit ) ) ) ;
86- } ) ,
87- ) ;
82+ const makeInstrumented = Effect . gen ( function * ( ) {
83+ const startedAt = Date . now ( ) ;
84+ return stream . pipe ( Stream . onExit ( ( exit ) => recordRpcStreamMetrics ( method , startedAt , exit ) ) ) ;
85+ } ) ;
8886
8987 return shouldTraceRpc ( method )
90- ? instrumented . pipe (
88+ ? Stream . unwrap ( makeInstrumented ) . pipe (
9189 Stream . withSpan ( `${ RPC_SPAN_PREFIX } .${ method } ` , {
9290 attributes : rpcSpanAttributes ( method , traceAttributes ) ,
9391 } ) ,
9492 )
95- : instrumented ;
93+ : Stream . unwrap ( makeInstrumented . pipe ( Effect . withTracerEnabled ( false ) ) ) ;
9694} ;
9795
9896export const observeRpcStreamEffect = < A , StreamError , StreamContext , EffectError , EffectContext > (
9997 method : string ,
10098 effect : Effect . Effect < Stream . Stream < A , StreamError , StreamContext > , EffectError , EffectContext > ,
10199 traceAttributes ?: Readonly < Record < string , unknown > > ,
102100) : Stream . Stream < A , StreamError | EffectError , StreamContext | EffectContext > => {
103- const instrumented = Stream . unwrap (
104- Effect . gen ( function * ( ) {
105- const startedAt = Date . now ( ) ;
106- const exit = yield * Effect . exit ( effect ) ;
101+ const makeInstrumented = Effect . gen ( function * ( ) {
102+ const startedAt = Date . now ( ) ;
103+ const exit = yield * Effect . exit ( effect ) ;
107104
108- if ( Exit . isFailure ( exit ) ) {
109- yield * recordRpcStreamMetrics ( method , startedAt , exit ) ;
110- return yield * Effect . failCause ( exit . cause ) ;
111- }
105+ if ( Exit . isFailure ( exit ) ) {
106+ yield * recordRpcStreamMetrics ( method , startedAt , exit ) ;
107+ return yield * Effect . failCause ( exit . cause ) ;
108+ }
112109
113- return exit . value . pipe (
114- Stream . onExit ( ( streamExit ) => recordRpcStreamMetrics ( method , startedAt , streamExit ) ) ,
115- ) ;
116- } ) ,
117- ) ;
110+ return exit . value . pipe (
111+ Stream . onExit ( ( streamExit ) => recordRpcStreamMetrics ( method , startedAt , streamExit ) ) ,
112+ ) ;
113+ } ) ;
118114
119115 return shouldTraceRpc ( method )
120- ? instrumented . pipe (
116+ ? Stream . unwrap ( makeInstrumented ) . pipe (
121117 Stream . withSpan ( `${ RPC_SPAN_PREFIX } .${ method } ` , {
122118 attributes : rpcSpanAttributes ( method , traceAttributes ) ,
123119 } ) ,
124120 )
125- : instrumented ;
121+ : Stream . unwrap ( makeInstrumented . pipe ( Effect . withTracerEnabled ( false ) ) ) ;
126122} ;
0 commit comments