Skip to content

Commit fa081fb

Browse files
committed
chore: rebuild masque h2 conn read logic
1 parent 0f71808 commit fa081fb

File tree

1 file changed

+18
-59
lines changed

1 file changed

+18
-59
lines changed

transport/masque/client_h2.go

Lines changed: 18 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,10 @@ func dialH2(ctx context.Context, client *http.Client, template *uritemplate.Temp
105105
stream := &h2DatagramStream{
106106
requestBody: pw,
107107
responseBody: rsp.Body,
108-
recvBuf: make([]byte, 0, 4096),
108+
cancel: cancel,
109109
}
110110
return &h2IpConn{
111111
str: stream,
112-
cancel: cancel,
113112
closeChan: make(chan struct{}),
114113
}, rsp, nil
115114
}
@@ -128,8 +127,6 @@ func authorityFromURL(u *url.URL) string {
128127
type h2IpConn struct {
129128
str *h2DatagramStream
130129

131-
cancel context.CancelFunc
132-
133130
mu sync.Mutex
134131

135132
closeChan chan struct{}
@@ -238,7 +235,6 @@ func (c *h2IpConn) Close() error {
238235
}
239236
c.mu.Unlock()
240237
err := c.str.Close()
241-
c.cancel()
242238
return err
243239
}
244240

@@ -262,43 +258,40 @@ func calculateIPv4Checksum(header [ipv4HeaderLen]byte) uint16 {
262258
type h2DatagramStream struct {
263259
requestBody *io.PipeWriter
264260
responseBody io.ReadCloser
261+
cancel context.CancelFunc
265262

266263
readMu sync.Mutex
267264
writeMu sync.Mutex
268-
recvBuf []byte
269265
}
270266

271267
func (s *h2DatagramStream) ReceiveDatagram(_ context.Context) ([]byte, error) {
272268
s.readMu.Lock()
273269
defer s.readMu.Unlock()
274270

271+
reader := quicvarint.NewReader(s.responseBody)
275272
for {
276-
capsuleType, payload, consumed, ok, err := parseCapsule(s.recvBuf)
273+
capsuleType, err := quicvarint.Read(reader)
277274
if err != nil {
278275
return nil, err
279276
}
280-
if ok {
281-
s.recvBuf = s.recvBuf[consumed:]
282-
if capsuleType != h2DatagramCapsuleType {
283-
continue
284-
}
285-
return payload, nil
277+
payloadLen, err := quicvarint.Read(reader)
278+
if err != nil {
279+
return nil, err
286280
}
287-
288-
buf := make([]byte, 4096)
289-
n, readErr := s.responseBody.Read(buf)
290-
if n > 0 {
291-
s.recvBuf = append(s.recvBuf, buf[:n]...)
292-
continue
281+
payload := make([]byte, payloadLen)
282+
_, err = io.ReadFull(reader, payload)
283+
if err != nil {
284+
return nil, err
293285
}
294-
if readErr != nil {
295-
return nil, readErr
286+
if capsuleType != h2DatagramCapsuleType {
287+
continue
296288
}
289+
return payload, nil
297290
}
298291
}
299292

300293
func (s *h2DatagramStream) SendDatagram(data []byte) error {
301-
frame := make([]byte, 0, 2*quicvarint.Len(0)+len(data))
294+
frame := make([]byte, 0, quicvarint.Len(h2DatagramCapsuleType)+quicvarint.Len(uint64(len(data)))+len(data))
302295
frame = quicvarint.Append(frame, h2DatagramCapsuleType)
303296
frame = quicvarint.Append(frame, uint64(len(data)))
304297
frame = append(frame, data...)
@@ -314,41 +307,7 @@ func (s *h2DatagramStream) SendDatagram(data []byte) error {
314307

315308
func (s *h2DatagramStream) Close() error {
316309
_ = s.requestBody.Close()
317-
return s.responseBody.Close()
318-
}
319-
320-
func parseCapsule(buf []byte) (capsuleType uint64, payload []byte, consumed int, ok bool, err error) {
321-
capsuleType, typeLen, ok := parseVarint(buf)
322-
if !ok {
323-
return 0, nil, 0, false, nil
324-
}
325-
payloadLen, payloadLenLen, ok := parseVarint(buf[typeLen:])
326-
if !ok {
327-
return 0, nil, 0, false, nil
328-
}
329-
headerLen := typeLen + payloadLenLen
330-
totalLen := headerLen + int(payloadLen)
331-
if totalLen < headerLen {
332-
return 0, nil, 0, false, errors.New("connect-ip: malformed capsule length")
333-
}
334-
if len(buf) < totalLen {
335-
return 0, nil, 0, false, nil
336-
}
337-
return capsuleType, buf[headerLen:totalLen], totalLen, true, nil
338-
}
339-
340-
func parseVarint(buf []byte) (v uint64, n int, ok bool) {
341-
if len(buf) == 0 {
342-
return 0, 0, false
343-
}
344-
prefix := buf[0] >> 6
345-
n = 1 << prefix
346-
if len(buf) < n {
347-
return 0, 0, false
348-
}
349-
v = uint64(buf[0] & 0x3f)
350-
for i := 1; i < n; i++ {
351-
v = (v << 8) | uint64(buf[i])
352-
}
353-
return v, n, true
310+
err := s.responseBody.Close()
311+
s.cancel()
312+
return err
354313
}

0 commit comments

Comments
 (0)