1
2
3
4
5
6
7 package http2
8
9 import (
10 "bufio"
11 "bytes"
12 "compress/flate"
13 "compress/gzip"
14 "context"
15 "crypto/rand"
16 "crypto/tls"
17 "errors"
18 "fmt"
19 "io"
20 "io/fs"
21 "log"
22 "math"
23 "math/bits"
24 mathrand "math/rand"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/httpcommon"
29 "net/textproto"
30 "slices"
31 "strconv"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http2/hpack"
39 "golang.org/x/net/idna"
40 )
41
42 const (
43
44
45 transportDefaultConnFlow = 1 << 30
46
47
48
49
50 transportDefaultStreamFlow = 4 << 20
51
52 defaultUserAgent = "Go-http-client/2.0"
53
54
55
56
57 initialMaxConcurrentStreams = 100
58
59
60
61 defaultMaxConcurrentStreams = 1000
62 )
63
64
65
66
67
68 type Transport struct {
69
70
71
72
73
74
75
76 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
77
78
79
80
81
82
83
84
85
86 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
87
88
89
90 TLSClientConfig *tls.Config
91
92
93
94 ConnPool ClientConnPool
95
96
97
98
99
100
101
102
103
104 DisableCompression bool
105
106
107
108 AllowHTTP bool
109
110
111
112
113
114
115
116
117 MaxHeaderListSize uint32
118
119
120
121
122
123
124
125
126 MaxReadFrameSize uint32
127
128
129
130
131
132
133 MaxDecoderHeaderTableSize uint32
134
135
136
137
138
139 MaxEncoderHeaderTableSize uint32
140
141
142
143
144
145
146
147
148
149 StrictMaxConcurrentStreams bool
150
151
152
153
154
155 IdleConnTimeout time.Duration
156
157
158
159
160
161
162
163 ReadIdleTimeout time.Duration
164
165
166
167
168 PingTimeout time.Duration
169
170
171
172
173 WriteByteTimeout time.Duration
174
175
176
177
178
179 CountError func(errType string)
180
181 t1 TransportConfig
182
183 connPoolOnce sync.Once
184 connPoolOrDef ClientConnPool
185
186 *transportTestHooks
187 }
188
189
190
191
192
193 type transportTestHooks struct {
194 newclientconn func(*ClientConn)
195 }
196
197 func (t *Transport) maxHeaderListSize() uint32 {
198 n := t.t1.MaxResponseHeaderBytes()
199 if n > 0 {
200 n = adjustHTTP1MaxHeaderSize(n)
201 }
202 if n <= 0 {
203 return 10 << 20
204 }
205 if n >= 0xffffffff {
206 return 0
207 }
208 return uint32(n)
209 }
210
211 func (t *Transport) disableCompression() bool {
212 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
213 }
214
215 func NewTransport(t1 TransportConfig) *Transport {
216 connPool := new(clientConnPool)
217 t2 := &Transport{
218 ConnPool: noDialClientConnPool{connPool},
219 t1: t1,
220 }
221 connPool.t = t2
222 return t2
223 }
224
225 func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
226 connPool, ok := t.ConnPool.(noDialClientConnPool)
227 if !ok {
228 go c.Close()
229 return nil
230 }
231 addr := authorityAddr(scheme, authority)
232 used, err := connPool.addConnIfNeeded(addr, t, c)
233 if !used {
234 go c.Close()
235 }
236 return err
237 }
238
239
240
241 type unencryptedTransport Transport
242
243 func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
244 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
245 }
246
247 func (t *Transport) connPool() ClientConnPool {
248 t.connPoolOnce.Do(t.initConnPool)
249 return t.connPoolOrDef
250 }
251
252 func (t *Transport) initConnPool() {
253 if t.ConnPool != nil {
254 t.connPoolOrDef = t.ConnPool
255 } else {
256 t.connPoolOrDef = &clientConnPool{t: t}
257 }
258 }
259
260
261
262 type ClientConn struct {
263 t *Transport
264 tconn net.Conn
265 tlsState *tls.ConnectionState
266 atomicReused uint32
267 singleUse bool
268 getConnCalled bool
269
270
271 readerDone chan struct{}
272 readerErr error
273
274 idleTimeout time.Duration
275 idleTimer *time.Timer
276
277 mu sync.Mutex
278 cond *sync.Cond
279 flow outflow
280 inflow inflow
281 doNotReuse bool
282 closing bool
283 closed bool
284 closedOnIdle bool
285 seenSettings bool
286 seenSettingsChan chan struct{}
287 wantSettingsAck bool
288 goAway *GoAwayFrame
289 goAwayDebug string
290 streams map[uint32]*clientStream
291 streamsReserved int
292 nextStreamID uint32
293 pendingRequests int
294 pings map[[8]byte]chan struct{}
295 br *bufio.Reader
296 lastActive time.Time
297 lastIdle time.Time
298
299 maxFrameSize uint32
300 maxConcurrentStreams uint32
301 peerMaxHeaderListSize uint64
302 peerMaxHeaderTableSize uint32
303 initialWindowSize uint32
304 initialStreamRecvWindowSize int32
305 readIdleTimeout time.Duration
306 pingTimeout time.Duration
307 extendedConnectAllowed bool
308 strictMaxConcurrentStreams bool
309
310
311
312
313
314
315
316
317
318 rstStreamPingsBlocked bool
319
320
321
322
323
324
325
326 pendingResets int
327
328
329
330
331
332
333 readBeforeStreamID uint32
334
335
336
337
338 reqHeaderMu chan struct{}
339
340
341
342
343
344 internalStateHook func()
345
346
347
348
349 wmu sync.Mutex
350 bw *bufio.Writer
351 fr *Framer
352 werr error
353 hbuf bytes.Buffer
354 henc *hpack.Encoder
355 }
356
357
358
359 type clientStream struct {
360 cc *ClientConn
361
362
363 ctx context.Context
364 reqCancel <-chan struct{}
365
366 trace *httptrace.ClientTrace
367 ID uint32
368 bufPipe pipe
369 requestedGzip bool
370 isHead bool
371
372 abortOnce sync.Once
373 abort chan struct{}
374 abortErr error
375
376 peerClosed chan struct{}
377 donec chan struct{}
378 on100 chan struct{}
379
380 respHeaderRecv chan struct{}
381 res *ClientResponse
382
383 flow outflow
384 inflow inflow
385 bytesRemain int64
386 readErr error
387
388 reqBody io.ReadCloser
389 reqBodyContentLength int64
390 reqBodyClosed chan struct{}
391
392
393 sentEndStream bool
394 sentHeaders bool
395
396
397 firstByte bool
398 pastHeaders bool
399 pastTrailers bool
400 readClosed bool
401 readAborted bool
402 totalHeaderSize int64
403
404 trailer Header
405 resTrailer *Header
406
407 staticResp ClientResponse
408 }
409
410 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
411
412
413
414 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
415 if fn := got1xxFuncForTests; fn != nil {
416 return fn
417 }
418 return traceGot1xxResponseFunc(cs.trace)
419 }
420
421 func (cs *clientStream) abortStream(err error) {
422 cs.cc.mu.Lock()
423 defer cs.cc.mu.Unlock()
424 cs.abortStreamLocked(err)
425 }
426
427 func (cs *clientStream) abortStreamLocked(err error) {
428 cs.abortOnce.Do(func() {
429 cs.abortErr = err
430 close(cs.abort)
431 })
432 if cs.reqBody != nil {
433 cs.closeReqBodyLocked()
434 }
435
436 if cs.cc.cond != nil {
437
438 cs.cc.cond.Broadcast()
439 }
440 }
441
442 func (cs *clientStream) abortRequestBodyWrite() {
443 cc := cs.cc
444 cc.mu.Lock()
445 defer cc.mu.Unlock()
446 if cs.reqBody != nil && cs.reqBodyClosed == nil {
447 cs.closeReqBodyLocked()
448 cc.cond.Broadcast()
449 }
450 }
451
452 func (cs *clientStream) closeReqBodyLocked() {
453 if cs.reqBodyClosed != nil {
454 return
455 }
456 cs.reqBodyClosed = make(chan struct{})
457 reqBodyClosed := cs.reqBodyClosed
458 go func() {
459 cs.reqBody.Close()
460 close(reqBodyClosed)
461 }()
462 }
463
464 type stickyErrWriter struct {
465 conn net.Conn
466 timeout time.Duration
467 err *error
468 }
469
470 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
471 if *sew.err != nil {
472 return 0, *sew.err
473 }
474 n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
475 *sew.err = err
476 return n, err
477 }
478
479
480
481
482
483
484
485 type noCachedConnError struct{}
486
487 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
488 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
489
490
491
492
493 func isNoCachedConnError(err error) bool {
494 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
495 return ok
496 }
497
498 var ErrNoCachedConn error = noCachedConnError{}
499
500
501 type RoundTripOpt struct {
502
503
504
505
506 OnlyCachedConn bool
507
508 allowHTTP bool
509 }
510
511 func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
512 return t.RoundTripOpt(req, RoundTripOpt{})
513 }
514
515
516
517 func authorityAddr(scheme string, authority string) (addr string) {
518 host, port, err := net.SplitHostPort(authority)
519 if err != nil {
520 host = authority
521 port = ""
522 }
523 if port == "" {
524 port = "443"
525 if scheme == "http" {
526 port = "80"
527 }
528 }
529 if a, err := idna.ToASCII(host); err == nil {
530 host = a
531 }
532
533 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
534 return host + ":" + port
535 }
536 return net.JoinHostPort(host, port)
537 }
538
539
540 func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
541 switch req.URL.Scheme {
542 case "https":
543
544 case "http":
545 if !t.AllowHTTP && !opt.allowHTTP {
546 return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
547 }
548 default:
549 return nil, errors.New("http2: unsupported scheme")
550 }
551
552 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
553 for retry := 0; ; retry++ {
554 cc, err := t.connPool().GetClientConn(req, addr)
555 if err != nil {
556 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
557 return nil, err
558 }
559 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
560 traceGotConn(req, cc, reused)
561 res, err := cc.RoundTrip(req)
562 if err != nil && retry <= 6 {
563 roundTripErr := err
564 if req, err = shouldRetryRequest(req, err); err == nil {
565
566 if retry == 0 {
567 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
568 continue
569 }
570 backoff := float64(uint(1) << (uint(retry) - 1))
571 backoff += backoff * (0.1 * mathrand.Float64())
572 d := time.Second * time.Duration(backoff)
573 tm := time.NewTimer(d)
574 select {
575 case <-tm.C:
576 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
577 continue
578 case <-req.Context.Done():
579 tm.Stop()
580 err = req.Context.Err()
581 }
582 }
583 }
584 if err == errClientConnNotEstablished {
585
586
587
588
589
590
591
592
593
594
595 if cc.idleTimer != nil {
596 cc.idleTimer.Stop()
597 }
598 t.connPool().MarkDead(cc)
599 }
600 if err != nil {
601 t.vlogf("RoundTrip failure: %v", err)
602 return nil, err
603 }
604 return res, nil
605 }
606 }
607
608 func (t *Transport) IdleConnStrsForTesting() []string {
609 pool, ok := t.connPool().(noDialClientConnPool)
610 if !ok {
611 return nil
612 }
613
614 var ret []string
615 pool.mu.Lock()
616 defer pool.mu.Unlock()
617 for k, ccs := range pool.conns {
618 for _, cc := range ccs {
619 if cc.idleState().canTakeNewRequest {
620 ret = append(ret, k)
621 }
622 }
623 }
624 slices.Sort(ret)
625 return ret
626 }
627
628
629
630
631 func (t *Transport) CloseIdleConnections() {
632 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
633 cp.closeIdleConnections()
634 }
635 }
636
637 var (
638 errClientConnClosed = errors.New("http2: client conn is closed")
639 errClientConnUnusable = errors.New("http2: client conn not usable")
640 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
641 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
642 errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
643 )
644
645
646
647
648
649 func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
650 if !canRetryError(err) {
651 return nil, err
652 }
653
654
655 if req.Body == nil || req.Body == NoBody {
656 return req, nil
657 }
658
659
660
661 if req.GetBody != nil {
662 body, err := req.GetBody()
663 if err != nil {
664 return nil, err
665 }
666 newReq := req.Clone()
667 newReq.Body = body
668 return newReq, nil
669 }
670
671
672
673
674 if err == errClientConnUnusable {
675 return req, nil
676 }
677
678 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
679 }
680
681 func canRetryError(err error) bool {
682 if err == errClientConnUnusable || err == errClientConnGotGoAway {
683 return true
684 }
685 if se, ok := err.(StreamError); ok {
686 return se.Code == ErrCodeRefusedStream
687 }
688 return false
689 }
690
691 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
692 if t.transportTestHooks != nil {
693 return t.newClientConn(nil, singleUse, nil)
694 }
695 host, _, err := net.SplitHostPort(addr)
696 if err != nil {
697 return nil, err
698 }
699 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
700 if err != nil {
701 return nil, err
702 }
703 return t.newClientConn(tconn, singleUse, nil)
704 }
705
706 func (t *Transport) newTLSConfig(host string) *tls.Config {
707 cfg := new(tls.Config)
708 if t.TLSClientConfig != nil {
709 *cfg = *t.TLSClientConfig.Clone()
710 }
711 if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
712 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
713 }
714 if cfg.ServerName == "" {
715 cfg.ServerName = host
716 }
717 return cfg
718 }
719
720 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
721 if t.DialTLSContext != nil {
722 return t.DialTLSContext(ctx, network, addr, tlsCfg)
723 } else if t.DialTLS != nil {
724 return t.DialTLS(network, addr, tlsCfg)
725 }
726
727 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
728 if err != nil {
729 return nil, err
730 }
731 state := tlsCn.ConnectionState()
732 if p := state.NegotiatedProtocol; p != NextProtoTLS {
733 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
734 }
735 if !state.NegotiatedProtocolIsMutual {
736 return nil, errors.New("http2: could not negotiate protocol mutually")
737 }
738 return tlsCn, nil
739 }
740
741
742
743 func (t *Transport) disableKeepAlives() bool {
744 return t.t1 != nil && t.t1.DisableKeepAlives()
745 }
746
747 func (t *Transport) expectContinueTimeout() time.Duration {
748 if t.t1 == nil {
749 return 0
750 }
751 return t.t1.ExpectContinueTimeout()
752 }
753
754 func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
755 cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
756 if err != nil {
757 return NetHTTPClientConn{}, err
758 }
759
760
761
762 cc.strictMaxConcurrentStreams = true
763
764 return NetHTTPClientConn{cc}, nil
765 }
766
767 func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
768 conf := configFromTransport(t)
769 cc := &ClientConn{
770 t: t,
771 tconn: c,
772 readerDone: make(chan struct{}),
773 nextStreamID: 1,
774 maxFrameSize: 16 << 10,
775 initialWindowSize: 65535,
776 initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
777 maxConcurrentStreams: initialMaxConcurrentStreams,
778 strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
779 peerMaxHeaderListSize: 0xffffffffffffffff,
780 streams: make(map[uint32]*clientStream),
781 singleUse: singleUse,
782 seenSettingsChan: make(chan struct{}),
783 wantSettingsAck: true,
784 readIdleTimeout: conf.SendPingTimeout,
785 pingTimeout: conf.PingTimeout,
786 pings: make(map[[8]byte]chan struct{}),
787 reqHeaderMu: make(chan struct{}, 1),
788 lastActive: time.Now(),
789 internalStateHook: internalStateHook,
790 }
791 if t.transportTestHooks != nil {
792 t.transportTestHooks.newclientconn(cc)
793 c = cc.tconn
794 }
795 if VerboseLogs {
796 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
797 }
798
799 cc.cond = sync.NewCond(&cc.mu)
800 cc.flow.add(int32(initialWindowSize))
801
802
803
804 cc.bw = bufio.NewWriter(stickyErrWriter{
805 conn: c,
806 timeout: conf.WriteByteTimeout,
807 err: &cc.werr,
808 })
809 cc.br = bufio.NewReader(c)
810 cc.fr = NewFramer(cc.bw, cc.br)
811 cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
812 if t.CountError != nil {
813 cc.fr.countError = t.CountError
814 }
815 maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
816 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
817 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
818
819 cc.henc = hpack.NewEncoder(&cc.hbuf)
820 cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
821 cc.peerMaxHeaderTableSize = initialHeaderTableSize
822
823 if cs, ok := c.(connectionStater); ok {
824 state := cs.ConnectionState()
825 cc.tlsState = &state
826 }
827
828 initialSettings := []Setting{
829 {ID: SettingEnablePush, Val: 0},
830 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
831 }
832 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
833 if max := t.maxHeaderListSize(); max != 0 {
834 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
835 }
836 if maxHeaderTableSize != initialHeaderTableSize {
837 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
838 }
839
840 cc.bw.Write(clientPreface)
841 cc.fr.WriteSettings(initialSettings...)
842 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
843 cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
844 cc.bw.Flush()
845 if cc.werr != nil {
846 cc.Close()
847 return nil, cc.werr
848 }
849
850
851 if d := t.idleConnTimeout(); d != 0 {
852 cc.idleTimeout = d
853 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
854 }
855
856 go cc.readLoop()
857 return cc, nil
858 }
859
860 func (cc *ClientConn) healthCheck() {
861 pingTimeout := cc.pingTimeout
862
863
864 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
865 defer cancel()
866 cc.vlogf("http2: Transport sending health check")
867 err := cc.Ping(ctx)
868 if err != nil {
869 cc.vlogf("http2: Transport health check failure: %v", err)
870 cc.closeForLostPing()
871 } else {
872 cc.vlogf("http2: Transport health check success")
873 }
874 }
875
876
877 func (cc *ClientConn) SetDoNotReuse() {
878 cc.mu.Lock()
879 defer cc.mu.Unlock()
880 cc.doNotReuse = true
881 }
882
883 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
884 cc.mu.Lock()
885 defer cc.mu.Unlock()
886
887 old := cc.goAway
888 cc.goAway = f
889
890
891 if cc.goAwayDebug == "" {
892 cc.goAwayDebug = string(f.DebugData())
893 }
894 if old != nil && old.ErrCode != ErrCodeNo {
895 cc.goAway.ErrCode = old.ErrCode
896 }
897 last := f.LastStreamID
898 for streamID, cs := range cc.streams {
899 if streamID <= last {
900
901
902
903 continue
904 }
905 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
906
907
908
909 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
910 } else {
911
912
913 cs.abortStreamLocked(errClientConnGotGoAway)
914 }
915 }
916 }
917
918
919
920
921
922
923 func (cc *ClientConn) CanTakeNewRequest() bool {
924 cc.mu.Lock()
925 defer cc.mu.Unlock()
926 return cc.canTakeNewRequestLocked()
927 }
928
929
930
931
932 func (cc *ClientConn) ReserveNewRequest() bool {
933 cc.mu.Lock()
934 defer cc.mu.Unlock()
935 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
936 return false
937 }
938 cc.streamsReserved++
939 return true
940 }
941
942
943 type ClientConnState struct {
944
945 Closed bool
946
947
948
949
950
951 Closing bool
952
953
954 StreamsActive int
955
956
957
958 StreamsReserved int
959
960
961
962
963 StreamsPending int
964
965
966
967
968 MaxConcurrentStreams uint32
969
970
971
972 LastIdle time.Time
973 }
974
975
976 func (cc *ClientConn) State() ClientConnState {
977 cc.wmu.Lock()
978 maxConcurrent := cc.maxConcurrentStreams
979 if !cc.seenSettings {
980 maxConcurrent = 0
981 }
982 cc.wmu.Unlock()
983
984 cc.mu.Lock()
985 defer cc.mu.Unlock()
986 return ClientConnState{
987 Closed: cc.closed,
988 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
989 StreamsActive: len(cc.streams) + cc.pendingResets,
990 StreamsReserved: cc.streamsReserved,
991 StreamsPending: cc.pendingRequests,
992 LastIdle: cc.lastIdle,
993 MaxConcurrentStreams: maxConcurrent,
994 }
995 }
996
997
998
999 type clientConnIdleState struct {
1000 canTakeNewRequest bool
1001 }
1002
1003 func (cc *ClientConn) idleState() clientConnIdleState {
1004 cc.mu.Lock()
1005 defer cc.mu.Unlock()
1006 return cc.idleStateLocked()
1007 }
1008
1009 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
1010 if cc.singleUse && cc.nextStreamID > 1 {
1011 return
1012 }
1013 var maxConcurrentOkay bool
1014 if cc.strictMaxConcurrentStreams {
1015
1016
1017
1018
1019 maxConcurrentOkay = true
1020 } else {
1021
1022
1023
1024
1025
1026
1027 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
1028 }
1029
1030 st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
1031
1032
1033
1034
1035
1036
1037
1038
1039 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
1040 st.canTakeNewRequest = true
1041 }
1042
1043 return
1044 }
1045
1046 func (cc *ClientConn) isUsableLocked() bool {
1047 return cc.goAway == nil &&
1048 !cc.closed &&
1049 !cc.closing &&
1050 !cc.doNotReuse &&
1051 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
1052 !cc.tooIdleLocked()
1053 }
1054
1055
1056
1057
1058
1059
1060
1061 func (cc *ClientConn) canReserveLocked() bool {
1062 if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
1063 return false
1064 }
1065 if !cc.isUsableLocked() {
1066 return false
1067 }
1068 return true
1069 }
1070
1071
1072
1073 func (cc *ClientConn) currentRequestCountLocked() int {
1074 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1075 }
1076
1077 func (cc *ClientConn) canTakeNewRequestLocked() bool {
1078 st := cc.idleStateLocked()
1079 return st.canTakeNewRequest
1080 }
1081
1082
1083 func (cc *ClientConn) availableLocked() int {
1084 if !cc.canTakeNewRequestLocked() {
1085 return 0
1086 }
1087 return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
1088 }
1089
1090
1091
1092 func (cc *ClientConn) tooIdleLocked() bool {
1093
1094
1095
1096
1097 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
1098 }
1099
1100
1101
1102
1103
1104
1105
1106 func (cc *ClientConn) onIdleTimeout() {
1107 cc.closeIfIdle()
1108 }
1109
1110 func (cc *ClientConn) closeConn() {
1111 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1112 defer t.Stop()
1113 cc.tconn.Close()
1114 cc.maybeCallStateHook()
1115 }
1116
1117
1118
1119 func (cc *ClientConn) forceCloseConn() {
1120 tc, ok := cc.tconn.(*tls.Conn)
1121 if !ok {
1122 return
1123 }
1124 if nc := tc.NetConn(); nc != nil {
1125 nc.Close()
1126 }
1127 }
1128
1129 func (cc *ClientConn) closeIfIdle() {
1130 cc.mu.Lock()
1131 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1132 cc.mu.Unlock()
1133 return
1134 }
1135 cc.closed = true
1136 cc.closedOnIdle = true
1137 nextID := cc.nextStreamID
1138
1139 cc.mu.Unlock()
1140
1141 if VerboseLogs {
1142 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1143 }
1144 cc.closeConn()
1145 }
1146
1147 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1148 cc.mu.Lock()
1149 defer cc.mu.Unlock()
1150 return cc.doNotReuse && len(cc.streams) == 0
1151 }
1152
1153 var shutdownEnterWaitStateHook = func() {}
1154
1155
1156 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1157 if err := cc.sendGoAway(); err != nil {
1158 return err
1159 }
1160
1161 done := make(chan struct{})
1162 cancelled := false
1163 go func() {
1164 cc.mu.Lock()
1165 defer cc.mu.Unlock()
1166 for {
1167 if len(cc.streams) == 0 || cc.closed {
1168 cc.closed = true
1169 close(done)
1170 break
1171 }
1172 if cancelled {
1173 break
1174 }
1175 cc.cond.Wait()
1176 }
1177 }()
1178 shutdownEnterWaitStateHook()
1179 select {
1180 case <-done:
1181 cc.closeConn()
1182 return nil
1183 case <-ctx.Done():
1184 cc.mu.Lock()
1185
1186 cancelled = true
1187 cc.cond.Broadcast()
1188 cc.mu.Unlock()
1189 return ctx.Err()
1190 }
1191 }
1192
1193 func (cc *ClientConn) sendGoAway() error {
1194 cc.mu.Lock()
1195 closing := cc.closing
1196 cc.closing = true
1197 maxStreamID := cc.nextStreamID
1198 cc.mu.Unlock()
1199 if closing {
1200
1201 return nil
1202 }
1203
1204 cc.wmu.Lock()
1205 defer cc.wmu.Unlock()
1206
1207 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1208 return err
1209 }
1210 if err := cc.bw.Flush(); err != nil {
1211 return err
1212 }
1213
1214 return nil
1215 }
1216
1217
1218
1219 func (cc *ClientConn) closeForError(err error) {
1220 cc.mu.Lock()
1221 cc.closed = true
1222 for _, cs := range cc.streams {
1223 cs.abortStreamLocked(err)
1224 }
1225 cc.cond.Broadcast()
1226 cc.mu.Unlock()
1227 cc.closeConn()
1228 }
1229
1230
1231
1232
1233 func (cc *ClientConn) Close() error {
1234 cc.closeForError(errClientConnForceClosed)
1235 return nil
1236 }
1237
1238
1239 func (cc *ClientConn) closeForLostPing() {
1240 err := errors.New("http2: client connection lost")
1241 if f := cc.t.CountError; f != nil {
1242 f("conn_close_lost_ping")
1243 }
1244 cc.closeForError(err)
1245 }
1246
1247
1248
1249 var errRequestCanceled = internal.ErrRequestCanceled
1250
1251 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1252 if cc.t.t1 != nil {
1253 return cc.t.t1.ResponseHeaderTimeout()
1254 }
1255
1256
1257
1258
1259 return 0
1260 }
1261
1262
1263
1264
1265 func actualContentLength(req *ClientRequest) int64 {
1266 if req.Body == nil || req.Body == NoBody {
1267 return 0
1268 }
1269 if req.ContentLength != 0 {
1270 return req.ContentLength
1271 }
1272 return -1
1273 }
1274
1275 func (cc *ClientConn) decrStreamReservations() {
1276 cc.mu.Lock()
1277 defer cc.mu.Unlock()
1278 cc.decrStreamReservationsLocked()
1279 }
1280
1281 func (cc *ClientConn) decrStreamReservationsLocked() {
1282 if cc.streamsReserved > 0 {
1283 cc.streamsReserved--
1284 }
1285 }
1286
1287 func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
1288 return cc.roundTrip(req, nil)
1289 }
1290
1291 func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
1292 ctx := req.Context
1293 req.stream = clientStream{
1294 cc: cc,
1295 ctx: ctx,
1296 reqCancel: req.Cancel,
1297 isHead: req.Method == "HEAD",
1298 reqBody: req.Body,
1299 reqBodyContentLength: actualContentLength(req),
1300 trace: httptrace.ContextClientTrace(ctx),
1301 peerClosed: make(chan struct{}),
1302 abort: make(chan struct{}),
1303 respHeaderRecv: make(chan struct{}),
1304 donec: make(chan struct{}),
1305 resTrailer: req.ResTrailer,
1306 }
1307 cs := &req.stream
1308
1309 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1310
1311 go cs.doRequest(req, streamf)
1312
1313 waitDone := func() error {
1314 select {
1315 case <-cs.donec:
1316 return nil
1317 case <-ctx.Done():
1318 return ctx.Err()
1319 case <-cs.reqCancel:
1320 return errRequestCanceled
1321 }
1322 }
1323
1324 handleResponseHeaders := func() (*ClientResponse, error) {
1325 res := cs.res
1326 if res.StatusCode > 299 {
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336 cs.abortRequestBodyWrite()
1337 }
1338 res.TLS = cc.tlsState
1339 if res.Body == NoBody && actualContentLength(req) == 0 {
1340
1341
1342
1343 if err := waitDone(); err != nil {
1344 return nil, err
1345 }
1346 }
1347 return res, nil
1348 }
1349
1350 cancelRequest := func(cs *clientStream, err error) error {
1351 cs.cc.mu.Lock()
1352 bodyClosed := cs.reqBodyClosed
1353 cs.cc.mu.Unlock()
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367 if bodyClosed != nil {
1368 <-bodyClosed
1369 }
1370 return err
1371 }
1372
1373 for {
1374 select {
1375 case <-cs.respHeaderRecv:
1376 return handleResponseHeaders()
1377 case <-cs.abort:
1378 select {
1379 case <-cs.respHeaderRecv:
1380
1381
1382
1383
1384 return handleResponseHeaders()
1385 default:
1386 waitDone()
1387 return nil, cs.abortErr
1388 }
1389 case <-ctx.Done():
1390 err := ctx.Err()
1391 cs.abortStream(err)
1392 return nil, cancelRequest(cs, err)
1393 case <-cs.reqCancel:
1394 cs.abortStream(errRequestCanceled)
1395 return nil, cancelRequest(cs, errRequestCanceled)
1396 }
1397 }
1398 }
1399
1400
1401
1402
1403 func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
1404 err := cs.writeRequest(req, streamf)
1405 cs.cleanupWriteRequest(err)
1406 }
1407
1408 var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1409
1410
1411
1412
1413
1414
1415
1416
1417 func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
1418 cc := cs.cc
1419 ctx := cs.ctx
1420
1421
1422
1423 var isExtendedConnect bool
1424 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1425 isExtendedConnect = true
1426 }
1427
1428
1429
1430
1431 if cc.reqHeaderMu == nil {
1432 panic("RoundTrip on uninitialized ClientConn")
1433 }
1434 if isExtendedConnect {
1435 select {
1436 case <-cs.reqCancel:
1437 return errRequestCanceled
1438 case <-ctx.Done():
1439 return ctx.Err()
1440 case <-cc.seenSettingsChan:
1441 if !cc.extendedConnectAllowed {
1442 return errExtendedConnectNotSupported
1443 }
1444 }
1445 }
1446 select {
1447 case cc.reqHeaderMu <- struct{}{}:
1448 case <-cs.reqCancel:
1449 return errRequestCanceled
1450 case <-ctx.Done():
1451 return ctx.Err()
1452 }
1453
1454 cc.mu.Lock()
1455 if cc.idleTimer != nil {
1456 cc.idleTimer.Stop()
1457 }
1458 cc.decrStreamReservationsLocked()
1459 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1460 cc.mu.Unlock()
1461 <-cc.reqHeaderMu
1462 return err
1463 }
1464 cc.addStreamLocked(cs)
1465 if isConnectionCloseRequest(req) {
1466 cc.doNotReuse = true
1467 }
1468 cc.mu.Unlock()
1469
1470 if streamf != nil {
1471 streamf(cs)
1472 }
1473
1474 continueTimeout := cc.t.expectContinueTimeout()
1475 if continueTimeout != 0 {
1476 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1477 continueTimeout = 0
1478 } else {
1479 cs.on100 = make(chan struct{}, 1)
1480 }
1481 }
1482
1483
1484
1485
1486
1487 err = cs.encodeAndWriteHeaders(req)
1488 <-cc.reqHeaderMu
1489 if err != nil {
1490 return err
1491 }
1492
1493 hasBody := cs.reqBodyContentLength != 0
1494 if !hasBody {
1495 cs.sentEndStream = true
1496 } else {
1497 if continueTimeout != 0 {
1498 traceWait100Continue(cs.trace)
1499 timer := time.NewTimer(continueTimeout)
1500 select {
1501 case <-timer.C:
1502 err = nil
1503 case <-cs.on100:
1504 err = nil
1505 case <-cs.abort:
1506 err = cs.abortErr
1507 case <-ctx.Done():
1508 err = ctx.Err()
1509 case <-cs.reqCancel:
1510 err = errRequestCanceled
1511 }
1512 timer.Stop()
1513 if err != nil {
1514 traceWroteRequest(cs.trace, err)
1515 return err
1516 }
1517 }
1518
1519 if err = cs.writeRequestBody(req); err != nil {
1520 if err != errStopReqBodyWrite {
1521 traceWroteRequest(cs.trace, err)
1522 return err
1523 }
1524 } else {
1525 cs.sentEndStream = true
1526 }
1527 }
1528
1529 traceWroteRequest(cs.trace, err)
1530
1531 var respHeaderTimer <-chan time.Time
1532 var respHeaderRecv chan struct{}
1533 if d := cc.responseHeaderTimeout(); d != 0 {
1534 timer := time.NewTimer(d)
1535 defer timer.Stop()
1536 respHeaderTimer = timer.C
1537 respHeaderRecv = cs.respHeaderRecv
1538 }
1539
1540
1541
1542 for {
1543 select {
1544 case <-cs.peerClosed:
1545 return nil
1546 case <-respHeaderTimer:
1547 return errTimeout
1548 case <-respHeaderRecv:
1549 respHeaderRecv = nil
1550 respHeaderTimer = nil
1551 case <-cs.abort:
1552 return cs.abortErr
1553 case <-ctx.Done():
1554 return ctx.Err()
1555 case <-cs.reqCancel:
1556 return errRequestCanceled
1557 }
1558 }
1559 }
1560
1561 func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
1562 cc := cs.cc
1563 ctx := cs.ctx
1564
1565 cc.wmu.Lock()
1566 defer cc.wmu.Unlock()
1567
1568
1569 select {
1570 case <-cs.abort:
1571 return cs.abortErr
1572 case <-ctx.Done():
1573 return ctx.Err()
1574 case <-cs.reqCancel:
1575 return errRequestCanceled
1576 default:
1577 }
1578
1579
1580
1581
1582
1583
1584 cc.hbuf.Reset()
1585 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1586 cc.writeHeader(name, value)
1587 })
1588 if err != nil {
1589 return fmt.Errorf("http2: %w", err)
1590 }
1591 hdrs := cc.hbuf.Bytes()
1592
1593
1594 endStream := !res.HasBody && !res.HasTrailers
1595 cs.sentHeaders = true
1596 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1597 traceWroteHeaders(cs.trace)
1598 return err
1599 }
1600
1601 func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1602 return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
1603 Request: httpcommon.Request{
1604 Header: req.Header,
1605 Trailer: req.Trailer,
1606 URL: req.URL,
1607 Host: req.Host,
1608 Method: req.Method,
1609 ActualContentLength: actualContentLength(req),
1610 },
1611 AddGzipHeader: addGzipHeader,
1612 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1613 DefaultUserAgent: defaultUserAgent,
1614 }, headerf)
1615 }
1616
1617
1618
1619
1620
1621 func (cs *clientStream) cleanupWriteRequest(err error) {
1622 cc := cs.cc
1623
1624 if cs.ID == 0 {
1625
1626 cc.decrStreamReservations()
1627 }
1628
1629
1630
1631
1632
1633 cc.mu.Lock()
1634 mustCloseBody := false
1635 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1636 mustCloseBody = true
1637 cs.reqBodyClosed = make(chan struct{})
1638 }
1639 bodyClosed := cs.reqBodyClosed
1640 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1641
1642 readSinceStream := cc.readBeforeStreamID > cs.ID
1643 cc.mu.Unlock()
1644 if mustCloseBody {
1645 cs.reqBody.Close()
1646 close(bodyClosed)
1647 }
1648 if bodyClosed != nil {
1649 <-bodyClosed
1650 }
1651
1652 if err != nil && cs.sentEndStream {
1653
1654
1655
1656 select {
1657 case <-cs.peerClosed:
1658 err = nil
1659 default:
1660 }
1661 }
1662 if err != nil {
1663 cs.abortStream(err)
1664 if cs.sentHeaders {
1665 if se, ok := err.(StreamError); ok {
1666 if se.Cause != errFromPeer {
1667 cc.writeStreamReset(cs.ID, se.Code, false, err)
1668 }
1669 } else {
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687 ping := false
1688 if !closeOnIdle && !readSinceStream {
1689 cc.mu.Lock()
1690
1691
1692 if !cc.rstStreamPingsBlocked {
1693 if cc.pendingResets == 0 {
1694 ping = true
1695 }
1696 cc.pendingResets++
1697 }
1698 cc.mu.Unlock()
1699 }
1700 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1701 }
1702 }
1703 cs.bufPipe.CloseWithError(err)
1704 } else {
1705 if cs.sentHeaders && !cs.sentEndStream {
1706 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1707 }
1708 cs.bufPipe.CloseWithError(errRequestCanceled)
1709 }
1710 if cs.ID != 0 {
1711 cc.forgetStreamID(cs.ID)
1712 }
1713
1714 cc.wmu.Lock()
1715 werr := cc.werr
1716 cc.wmu.Unlock()
1717 if werr != nil {
1718 cc.Close()
1719 }
1720
1721 close(cs.donec)
1722 cc.maybeCallStateHook()
1723 }
1724
1725
1726
1727 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1728 for {
1729 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1730
1731
1732 return errClientConnNotEstablished
1733 }
1734 cc.lastActive = time.Now()
1735 if cc.closed || !cc.canTakeNewRequestLocked() {
1736 return errClientConnUnusable
1737 }
1738 cc.lastIdle = time.Time{}
1739 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1740 return nil
1741 }
1742 cc.pendingRequests++
1743 cc.cond.Wait()
1744 cc.pendingRequests--
1745 select {
1746 case <-cs.abort:
1747 return cs.abortErr
1748 default:
1749 }
1750 }
1751 }
1752
1753
1754 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1755 first := true
1756 for len(hdrs) > 0 && cc.werr == nil {
1757 chunk := hdrs
1758 if len(chunk) > maxFrameSize {
1759 chunk = chunk[:maxFrameSize]
1760 }
1761 hdrs = hdrs[len(chunk):]
1762 endHeaders := len(hdrs) == 0
1763 if first {
1764 cc.fr.WriteHeaders(HeadersFrameParam{
1765 StreamID: streamID,
1766 BlockFragment: chunk,
1767 EndStream: endStream,
1768 EndHeaders: endHeaders,
1769 })
1770 first = false
1771 } else {
1772 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1773 }
1774 }
1775 cc.bw.Flush()
1776 return cc.werr
1777 }
1778
1779
1780 var (
1781
1782 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1783
1784
1785 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1786
1787 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1788 )
1789
1790
1791
1792
1793
1794
1795 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1796 const max = 512 << 10
1797 n := min(int64(maxFrameSize), max)
1798 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1799
1800
1801
1802
1803 n = cl + 1
1804 }
1805 if n < 1 {
1806 return 1
1807 }
1808 return int(n)
1809 }
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819 var bufPools [7]sync.Pool
1820 func bufPoolIndex(size int) int {
1821 if size <= 16384 {
1822 return 0
1823 }
1824 size -= 1
1825 bits := bits.Len(uint(size))
1826 index := bits - 14
1827 if index >= len(bufPools) {
1828 return len(bufPools) - 1
1829 }
1830 return index
1831 }
1832
1833 func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
1834 cc := cs.cc
1835 body := cs.reqBody
1836 sentEnd := false
1837
1838 hasTrailers := req.Trailer != nil
1839 remainLen := cs.reqBodyContentLength
1840 hasContentLen := remainLen != -1
1841
1842 cc.mu.Lock()
1843 maxFrameSize := int(cc.maxFrameSize)
1844 cc.mu.Unlock()
1845
1846
1847 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1848 var buf []byte
1849 index := bufPoolIndex(scratchLen)
1850 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1851 defer bufPools[index].Put(bp)
1852 buf = *bp
1853 } else {
1854 buf = make([]byte, scratchLen)
1855 defer bufPools[index].Put(&buf)
1856 }
1857
1858 var sawEOF bool
1859 for !sawEOF {
1860 n, err := body.Read(buf)
1861 if hasContentLen {
1862 remainLen -= int64(n)
1863 if remainLen == 0 && err == nil {
1864
1865
1866
1867
1868
1869
1870
1871 var scratch [1]byte
1872 var n1 int
1873 n1, err = body.Read(scratch[:])
1874 remainLen -= int64(n1)
1875 }
1876 if remainLen < 0 {
1877 err = errReqBodyTooLong
1878 return err
1879 }
1880 }
1881 if err != nil {
1882 cc.mu.Lock()
1883 bodyClosed := cs.reqBodyClosed != nil
1884 cc.mu.Unlock()
1885 switch {
1886 case bodyClosed:
1887 return errStopReqBodyWrite
1888 case err == io.EOF:
1889 sawEOF = true
1890 err = nil
1891 default:
1892 return err
1893 }
1894 }
1895
1896 remain := buf[:n]
1897 for len(remain) > 0 && err == nil {
1898 var allowed int32
1899 allowed, err = cs.awaitFlowControl(len(remain))
1900 if err != nil {
1901 return err
1902 }
1903 cc.wmu.Lock()
1904 data := remain[:allowed]
1905 remain = remain[allowed:]
1906 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1907 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1908 if err == nil {
1909
1910
1911
1912
1913
1914
1915 err = cc.bw.Flush()
1916 }
1917 cc.wmu.Unlock()
1918 }
1919 if err != nil {
1920 return err
1921 }
1922 }
1923
1924 if sentEnd {
1925
1926
1927
1928 return nil
1929 }
1930
1931
1932
1933
1934 cc.mu.Lock()
1935 trailer := req.Trailer
1936 err = cs.abortErr
1937 cc.mu.Unlock()
1938 if err != nil {
1939 return err
1940 }
1941
1942 cc.wmu.Lock()
1943 defer cc.wmu.Unlock()
1944 var trls []byte
1945 if len(trailer) > 0 {
1946 trls, err = cc.encodeTrailers(trailer)
1947 if err != nil {
1948 return err
1949 }
1950 }
1951
1952
1953
1954 if len(trls) > 0 {
1955 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1956 } else {
1957 err = cc.fr.WriteData(cs.ID, true, nil)
1958 }
1959 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1960 err = ferr
1961 }
1962 return err
1963 }
1964
1965
1966
1967
1968
1969 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1970 cc := cs.cc
1971 ctx := cs.ctx
1972 cc.mu.Lock()
1973 defer cc.mu.Unlock()
1974 for {
1975 if cc.closed {
1976 return 0, errClientConnClosed
1977 }
1978 if cs.reqBodyClosed != nil {
1979 return 0, errStopReqBodyWrite
1980 }
1981 select {
1982 case <-cs.abort:
1983 return 0, cs.abortErr
1984 case <-ctx.Done():
1985 return 0, ctx.Err()
1986 case <-cs.reqCancel:
1987 return 0, errRequestCanceled
1988 default:
1989 }
1990 if a := cs.flow.available(); a > 0 {
1991 take := a
1992 if int(take) > maxBytes {
1993
1994 take = int32(maxBytes)
1995 }
1996 if take > int32(cc.maxFrameSize) {
1997 take = int32(cc.maxFrameSize)
1998 }
1999 cs.flow.take(take)
2000 return take, nil
2001 }
2002 cc.cond.Wait()
2003 }
2004 }
2005
2006
2007 func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
2008 cc.hbuf.Reset()
2009
2010 hlSize := uint64(0)
2011 for k, vv := range trailer {
2012 for _, v := range vv {
2013 hf := hpack.HeaderField{Name: k, Value: v}
2014 hlSize += uint64(hf.Size())
2015 }
2016 }
2017 if hlSize > cc.peerMaxHeaderListSize {
2018 return nil, errRequestHeaderListSize
2019 }
2020
2021 for k, vv := range trailer {
2022 lowKey, ascii := httpcommon.LowerHeader(k)
2023 if !ascii {
2024
2025
2026 continue
2027 }
2028
2029
2030 for _, v := range vv {
2031 cc.writeHeader(lowKey, v)
2032 }
2033 }
2034 return cc.hbuf.Bytes(), nil
2035 }
2036
2037 func (cc *ClientConn) writeHeader(name, value string) {
2038 if VerboseLogs {
2039 log.Printf("http2: Transport encoding header %q = %q", name, value)
2040 }
2041 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2042 }
2043
2044 type resAndError struct {
2045 _ incomparable
2046 res *ClientResponse
2047 err error
2048 }
2049
2050
2051 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2052 cs.flow.add(int32(cc.initialWindowSize))
2053 cs.flow.setConnFlow(&cc.flow)
2054 cs.inflow.init(cc.initialStreamRecvWindowSize)
2055 cs.ID = cc.nextStreamID
2056 cc.nextStreamID += 2
2057 cc.streams[cs.ID] = cs
2058 if cs.ID == 0 {
2059 panic("assigned stream ID 0")
2060 }
2061 }
2062
2063 func (cc *ClientConn) forgetStreamID(id uint32) {
2064 cc.mu.Lock()
2065 slen := len(cc.streams)
2066 delete(cc.streams, id)
2067 if len(cc.streams) != slen-1 {
2068 panic("forgetting unknown stream id")
2069 }
2070 cc.lastActive = time.Now()
2071 if len(cc.streams) == 0 && cc.idleTimer != nil {
2072 cc.idleTimer.Reset(cc.idleTimeout)
2073 cc.lastIdle = time.Now()
2074 }
2075
2076
2077 cc.cond.Broadcast()
2078
2079 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2080 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2081 if VerboseLogs {
2082 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2083 }
2084 cc.closed = true
2085 defer cc.closeConn()
2086 }
2087
2088 cc.mu.Unlock()
2089 }
2090
2091
2092 type clientConnReadLoop struct {
2093 _ incomparable
2094 cc *ClientConn
2095 }
2096
2097
2098 func (cc *ClientConn) readLoop() {
2099 rl := &clientConnReadLoop{cc: cc}
2100 defer rl.cleanup()
2101 cc.readerErr = rl.run()
2102 if ce, ok := cc.readerErr.(ConnectionError); ok {
2103 cc.wmu.Lock()
2104 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2105 cc.wmu.Unlock()
2106 }
2107 }
2108
2109
2110
2111 type GoAwayError struct {
2112 LastStreamID uint32
2113 ErrCode ErrCode
2114 DebugData string
2115 }
2116
2117 func (e GoAwayError) Error() string {
2118 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2119 e.LastStreamID, e.ErrCode, e.DebugData)
2120 }
2121
2122 func isEOFOrNetReadError(err error) bool {
2123 if err == io.EOF {
2124 return true
2125 }
2126 ne, ok := err.(*net.OpError)
2127 return ok && ne.Op == "read"
2128 }
2129
2130 func (rl *clientConnReadLoop) cleanup() {
2131 cc := rl.cc
2132 defer cc.closeConn()
2133 defer close(cc.readerDone)
2134
2135 if cc.idleTimer != nil {
2136 cc.idleTimer.Stop()
2137 }
2138
2139
2140
2141
2142 err := cc.readerErr
2143 cc.mu.Lock()
2144 if cc.goAway != nil && isEOFOrNetReadError(err) {
2145 err = GoAwayError{
2146 LastStreamID: cc.goAway.LastStreamID,
2147 ErrCode: cc.goAway.ErrCode,
2148 DebugData: cc.goAwayDebug,
2149 }
2150 } else if err == io.EOF {
2151 err = io.ErrUnexpectedEOF
2152 }
2153 cc.closed = true
2154
2155
2156
2157
2158
2159
2160
2161 unusedWaitTime := 5 * time.Second
2162 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2163 unusedWaitTime = cc.idleTimeout
2164 }
2165 idleTime := time.Now().Sub(cc.lastActive)
2166 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2167 cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
2168 cc.t.connPool().MarkDead(cc)
2169 })
2170 } else {
2171 cc.mu.Unlock()
2172 cc.t.connPool().MarkDead(cc)
2173 cc.mu.Lock()
2174 }
2175
2176 for _, cs := range cc.streams {
2177 select {
2178 case <-cs.peerClosed:
2179
2180
2181 default:
2182 cs.abortStreamLocked(err)
2183 }
2184 }
2185 cc.cond.Broadcast()
2186 cc.mu.Unlock()
2187
2188 if !cc.seenSettings {
2189
2190
2191 cc.extendedConnectAllowed = true
2192 close(cc.seenSettingsChan)
2193 }
2194 }
2195
2196
2197
2198 func (cc *ClientConn) countReadFrameError(err error) {
2199 f := cc.t.CountError
2200 if f == nil || err == nil {
2201 return
2202 }
2203 if ce, ok := err.(ConnectionError); ok {
2204 errCode := ErrCode(ce)
2205 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2206 return
2207 }
2208 if errors.Is(err, io.EOF) {
2209 f("read_frame_eof")
2210 return
2211 }
2212 if errors.Is(err, io.ErrUnexpectedEOF) {
2213 f("read_frame_unexpected_eof")
2214 return
2215 }
2216 if errors.Is(err, ErrFrameTooLarge) {
2217 f("read_frame_too_large")
2218 return
2219 }
2220 f("read_frame_other")
2221 }
2222
2223 func (rl *clientConnReadLoop) run() error {
2224 cc := rl.cc
2225 gotSettings := false
2226 readIdleTimeout := cc.readIdleTimeout
2227 var t *time.Timer
2228 if readIdleTimeout != 0 {
2229 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2230 }
2231 for {
2232 f, err := cc.fr.ReadFrame()
2233 if t != nil {
2234 t.Reset(readIdleTimeout)
2235 }
2236 if err != nil {
2237 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2238 }
2239 if se, ok := err.(StreamError); ok {
2240 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2241 if se.Cause == nil {
2242 se.Cause = cc.fr.errDetail
2243 }
2244 rl.endStreamError(cs, se)
2245 }
2246 continue
2247 } else if err != nil {
2248 cc.countReadFrameError(err)
2249 return err
2250 }
2251 if VerboseLogs {
2252 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2253 }
2254 if !gotSettings {
2255 if _, ok := f.(*SettingsFrame); !ok {
2256 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2257 return ConnectionError(ErrCodeProtocol)
2258 }
2259 gotSettings = true
2260 }
2261
2262 switch f := f.(type) {
2263 case *MetaHeadersFrame:
2264 err = rl.processHeaders(f)
2265 case *DataFrame:
2266 err = rl.processData(f)
2267 case *GoAwayFrame:
2268 err = rl.processGoAway(f)
2269 case *RSTStreamFrame:
2270 err = rl.processResetStream(f)
2271 case *SettingsFrame:
2272 err = rl.processSettings(f)
2273 case *PushPromiseFrame:
2274 err = rl.processPushPromise(f)
2275 case *WindowUpdateFrame:
2276 err = rl.processWindowUpdate(f)
2277 case *PingFrame:
2278 err = rl.processPing(f)
2279 default:
2280 cc.logf("Transport: unhandled response frame type %T", f)
2281 }
2282 if err != nil {
2283 if VerboseLogs {
2284 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2285 }
2286 return err
2287 }
2288 }
2289 }
2290
2291 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2292 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2293 if cs == nil {
2294
2295
2296
2297 return nil
2298 }
2299 if cs.readClosed {
2300 rl.endStreamError(cs, StreamError{
2301 StreamID: f.StreamID,
2302 Code: ErrCodeProtocol,
2303 Cause: errors.New("protocol error: headers after END_STREAM"),
2304 })
2305 return nil
2306 }
2307 if !cs.firstByte {
2308 if cs.trace != nil {
2309
2310
2311
2312
2313 traceFirstResponseByte(cs.trace)
2314 }
2315 cs.firstByte = true
2316 }
2317 if !cs.pastHeaders {
2318 cs.pastHeaders = true
2319 } else {
2320 return rl.processTrailers(cs, f)
2321 }
2322
2323 res, err := rl.handleResponse(cs, f)
2324 if err != nil {
2325 if _, ok := err.(ConnectionError); ok {
2326 return err
2327 }
2328
2329 rl.endStreamError(cs, StreamError{
2330 StreamID: f.StreamID,
2331 Code: ErrCodeProtocol,
2332 Cause: err,
2333 })
2334 return nil
2335 }
2336 if res == nil {
2337
2338 return nil
2339 }
2340 cs.res = res
2341 close(cs.respHeaderRecv)
2342 if f.StreamEnded() {
2343 rl.endStream(cs)
2344 }
2345 return nil
2346 }
2347
2348
2349
2350
2351
2352
2353
2354 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
2355 if f.Truncated {
2356 return nil, errResponseHeaderListSize
2357 }
2358
2359 status := f.PseudoValue("status")
2360 if status == "" {
2361 return nil, errors.New("malformed response from server: missing status pseudo header")
2362 }
2363 statusCode, err := strconv.Atoi(status)
2364 if err != nil {
2365 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2366 }
2367
2368 regularFields := f.RegularFields()
2369 strs := make([]string, len(regularFields))
2370 header := make(Header, len(regularFields))
2371 res := &cs.staticResp
2372 cs.staticResp = ClientResponse{
2373 Header: header,
2374 StatusCode: statusCode,
2375 Status: status,
2376 }
2377 for _, hf := range regularFields {
2378 key := httpcommon.CanonicalHeader(hf.Name)
2379 if key == "Trailer" {
2380 t := res.Trailer
2381 if t == nil {
2382 t = make(Header)
2383 res.Trailer = t
2384 }
2385 foreachHeaderElement(hf.Value, func(v string) {
2386 t[httpcommon.CanonicalHeader(v)] = nil
2387 })
2388 } else {
2389 vv := header[key]
2390 if vv == nil && len(strs) > 0 {
2391
2392
2393
2394
2395 vv, strs = strs[:1:1], strs[1:]
2396 vv[0] = hf.Value
2397 header[key] = vv
2398 } else {
2399 header[key] = append(vv, hf.Value)
2400 }
2401 }
2402 }
2403
2404 if statusCode >= 100 && statusCode <= 199 {
2405 if f.StreamEnded() {
2406 return nil, errors.New("1xx informational response with END_STREAM flag")
2407 }
2408 if fn := cs.get1xxTraceFunc(); fn != nil {
2409
2410
2411
2412 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2413 return nil, err
2414 }
2415 } else {
2416
2417
2418
2419
2420
2421
2422
2423 limit := int64(cs.cc.t.maxHeaderListSize())
2424 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
2425 limit = t1.MaxResponseHeaderBytes()
2426 }
2427 for _, h := range f.Fields {
2428 cs.totalHeaderSize += int64(h.Size())
2429 }
2430 if cs.totalHeaderSize > limit {
2431 if VerboseLogs {
2432 log.Printf("http2: 1xx informational responses too large")
2433 }
2434 return nil, errors.New("header list too large")
2435 }
2436 }
2437 if statusCode == 100 {
2438 traceGot100Continue(cs.trace)
2439 select {
2440 case cs.on100 <- struct{}{}:
2441 default:
2442 }
2443 }
2444 cs.pastHeaders = false
2445 return nil, nil
2446 }
2447
2448 res.ContentLength = -1
2449 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2450 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2451 res.ContentLength = int64(cl)
2452 } else {
2453
2454
2455 }
2456 } else if len(clens) > 1 {
2457
2458
2459 } else if f.StreamEnded() && !cs.isHead {
2460 res.ContentLength = 0
2461 }
2462
2463 if cs.isHead {
2464 res.Body = NoBody
2465 return res, nil
2466 }
2467
2468 if f.StreamEnded() {
2469 if res.ContentLength > 0 {
2470 res.Body = missingBody{}
2471 } else {
2472 res.Body = NoBody
2473 }
2474 return res, nil
2475 }
2476
2477 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2478 cs.bytesRemain = res.ContentLength
2479 res.Body = transportResponseBody{cs}
2480
2481 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2482 res.Header.Del("Content-Encoding")
2483 res.Header.Del("Content-Length")
2484 res.ContentLength = -1
2485 res.Body = &gzipReader{body: res.Body}
2486 res.Uncompressed = true
2487 }
2488 return res, nil
2489 }
2490
2491 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2492 if cs.pastTrailers {
2493
2494 return ConnectionError(ErrCodeProtocol)
2495 }
2496 cs.pastTrailers = true
2497 if !f.StreamEnded() {
2498
2499
2500 return ConnectionError(ErrCodeProtocol)
2501 }
2502 if len(f.PseudoFields()) > 0 {
2503
2504
2505 return ConnectionError(ErrCodeProtocol)
2506 }
2507
2508 trailer := make(Header)
2509 for _, hf := range f.RegularFields() {
2510 key := httpcommon.CanonicalHeader(hf.Name)
2511 trailer[key] = append(trailer[key], hf.Value)
2512 }
2513 cs.trailer = trailer
2514
2515 rl.endStream(cs)
2516 return nil
2517 }
2518
2519
2520
2521 type transportResponseBody struct {
2522 cs *clientStream
2523 }
2524
2525 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2526 cs := b.cs
2527 cc := cs.cc
2528
2529 if cs.readErr != nil {
2530 return 0, cs.readErr
2531 }
2532 n, err = b.cs.bufPipe.Read(p)
2533 if cs.bytesRemain != -1 {
2534 if int64(n) > cs.bytesRemain {
2535 n = int(cs.bytesRemain)
2536 if err == nil {
2537 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2538 cs.abortStream(err)
2539 }
2540 cs.readErr = err
2541 return int(cs.bytesRemain), err
2542 }
2543 cs.bytesRemain -= int64(n)
2544 if err == io.EOF && cs.bytesRemain > 0 {
2545 err = io.ErrUnexpectedEOF
2546 cs.readErr = err
2547 return n, err
2548 }
2549 }
2550 if n == 0 {
2551
2552 return
2553 }
2554
2555 cc.mu.Lock()
2556 connAdd := cc.inflow.add(n)
2557 var streamAdd int32
2558 if err == nil {
2559 streamAdd = cs.inflow.add(n)
2560 }
2561 cc.mu.Unlock()
2562
2563 if connAdd != 0 || streamAdd != 0 {
2564 cc.wmu.Lock()
2565 defer cc.wmu.Unlock()
2566 if connAdd != 0 {
2567 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2568 }
2569 if streamAdd != 0 {
2570 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2571 }
2572 cc.bw.Flush()
2573 }
2574 return
2575 }
2576
2577 var errClosedResponseBody = errors.New("http2: response body closed")
2578
2579 func (b transportResponseBody) Close() error {
2580 cs := b.cs
2581 cc := cs.cc
2582
2583 cs.bufPipe.BreakWithError(errClosedResponseBody)
2584 cs.abortStream(errClosedResponseBody)
2585
2586 unread := cs.bufPipe.Len()
2587 if unread > 0 {
2588 cc.mu.Lock()
2589
2590 connAdd := cc.inflow.add(unread)
2591 cc.mu.Unlock()
2592
2593
2594
2595 cc.wmu.Lock()
2596
2597 if connAdd > 0 {
2598 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2599 }
2600 cc.bw.Flush()
2601 cc.wmu.Unlock()
2602 }
2603
2604 select {
2605 case <-cs.donec:
2606 case <-cs.ctx.Done():
2607
2608
2609
2610 return nil
2611 case <-cs.reqCancel:
2612 return errRequestCanceled
2613 }
2614 return nil
2615 }
2616
2617 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2618 cc := rl.cc
2619 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2620 data := f.Data()
2621 if cs == nil {
2622 cc.mu.Lock()
2623 neverSent := cc.nextStreamID
2624 cc.mu.Unlock()
2625 if f.StreamID >= neverSent {
2626
2627 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2628 return ConnectionError(ErrCodeProtocol)
2629 }
2630
2631
2632
2633
2634
2635
2636 if f.Length > 0 {
2637 cc.mu.Lock()
2638 ok := cc.inflow.take(f.Length)
2639 connAdd := cc.inflow.add(int(f.Length))
2640 cc.mu.Unlock()
2641 if !ok {
2642 return ConnectionError(ErrCodeFlowControl)
2643 }
2644 if connAdd > 0 {
2645 cc.wmu.Lock()
2646 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2647 cc.bw.Flush()
2648 cc.wmu.Unlock()
2649 }
2650 }
2651 return nil
2652 }
2653 if cs.readClosed {
2654 cc.logf("protocol error: received DATA after END_STREAM")
2655 rl.endStreamError(cs, StreamError{
2656 StreamID: f.StreamID,
2657 Code: ErrCodeProtocol,
2658 })
2659 return nil
2660 }
2661 if !cs.pastHeaders {
2662 cc.logf("protocol error: received DATA before a HEADERS frame")
2663 rl.endStreamError(cs, StreamError{
2664 StreamID: f.StreamID,
2665 Code: ErrCodeProtocol,
2666 })
2667 return nil
2668 }
2669 if f.Length > 0 {
2670 if cs.isHead && len(data) > 0 {
2671 cc.logf("protocol error: received DATA on a HEAD request")
2672 rl.endStreamError(cs, StreamError{
2673 StreamID: f.StreamID,
2674 Code: ErrCodeProtocol,
2675 })
2676 return nil
2677 }
2678
2679 cc.mu.Lock()
2680 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2681 cc.mu.Unlock()
2682 return ConnectionError(ErrCodeFlowControl)
2683 }
2684
2685
2686 var refund int
2687 if pad := int(f.Length) - len(data); pad > 0 {
2688 refund += pad
2689 }
2690
2691 didReset := false
2692 var err error
2693 if len(data) > 0 {
2694 if _, err = cs.bufPipe.Write(data); err != nil {
2695
2696
2697 didReset = true
2698 refund += len(data)
2699 }
2700 }
2701
2702 sendConn := cc.inflow.add(refund)
2703 var sendStream int32
2704 if !didReset {
2705 sendStream = cs.inflow.add(refund)
2706 }
2707 cc.mu.Unlock()
2708
2709 if sendConn > 0 || sendStream > 0 {
2710 cc.wmu.Lock()
2711 if sendConn > 0 {
2712 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2713 }
2714 if sendStream > 0 {
2715 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2716 }
2717 cc.bw.Flush()
2718 cc.wmu.Unlock()
2719 }
2720
2721 if err != nil {
2722 rl.endStreamError(cs, err)
2723 return nil
2724 }
2725 }
2726
2727 if f.StreamEnded() {
2728 rl.endStream(cs)
2729 }
2730 return nil
2731 }
2732
2733 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2734
2735
2736 if !cs.readClosed {
2737 cs.readClosed = true
2738
2739
2740
2741
2742 rl.cc.mu.Lock()
2743 defer rl.cc.mu.Unlock()
2744 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2745 close(cs.peerClosed)
2746 }
2747 }
2748
2749 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2750 cs.readAborted = true
2751 cs.abortStream(err)
2752 }
2753
2754 func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
2755 cs.readAborted = true
2756 cs.abortStreamLocked(err)
2757 }
2758
2759
2760 const (
2761 headerOrDataFrame = true
2762 notHeaderOrDataFrame = false
2763 )
2764
2765
2766
2767 func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2768 rl.cc.mu.Lock()
2769 defer rl.cc.mu.Unlock()
2770 if headerOrData {
2771
2772
2773 rl.cc.rstStreamPingsBlocked = false
2774 }
2775 rl.cc.readBeforeStreamID = rl.cc.nextStreamID
2776 cs := rl.cc.streams[id]
2777 if cs != nil && !cs.readAborted {
2778 return cs
2779 }
2780 return nil
2781 }
2782
2783 func (cs *clientStream) copyTrailers() {
2784 for k, vv := range cs.trailer {
2785 t := cs.resTrailer
2786 if *t == nil {
2787 *t = make(Header)
2788 }
2789 (*t)[k] = vv
2790 }
2791 }
2792
2793 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2794 cc := rl.cc
2795 cc.t.connPool().MarkDead(cc)
2796 if f.ErrCode != 0 {
2797
2798 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2799 if fn := cc.t.CountError; fn != nil {
2800 fn("recv_goaway_" + f.ErrCode.stringToken())
2801 }
2802 }
2803 cc.setGoAway(f)
2804 return nil
2805 }
2806
2807 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2808 cc := rl.cc
2809
2810
2811 cc.wmu.Lock()
2812 defer cc.wmu.Unlock()
2813
2814 if err := rl.processSettingsNoWrite(f); err != nil {
2815 return err
2816 }
2817 if !f.IsAck() {
2818 cc.fr.WriteSettingsAck()
2819 cc.bw.Flush()
2820 }
2821 return nil
2822 }
2823
2824 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2825 cc := rl.cc
2826 defer cc.maybeCallStateHook()
2827 cc.mu.Lock()
2828 defer cc.mu.Unlock()
2829
2830 if f.IsAck() {
2831 if cc.wantSettingsAck {
2832 cc.wantSettingsAck = false
2833 return nil
2834 }
2835 return ConnectionError(ErrCodeProtocol)
2836 }
2837
2838 var seenMaxConcurrentStreams bool
2839 err := f.ForeachSetting(func(s Setting) error {
2840 switch s.ID {
2841 case SettingMaxFrameSize:
2842 cc.maxFrameSize = s.Val
2843 case SettingMaxConcurrentStreams:
2844 cc.maxConcurrentStreams = s.Val
2845 seenMaxConcurrentStreams = true
2846 case SettingMaxHeaderListSize:
2847 cc.peerMaxHeaderListSize = uint64(s.Val)
2848 case SettingInitialWindowSize:
2849
2850
2851
2852
2853 if s.Val > math.MaxInt32 {
2854 return ConnectionError(ErrCodeFlowControl)
2855 }
2856
2857
2858
2859
2860 delta := int32(s.Val) - int32(cc.initialWindowSize)
2861 for _, cs := range cc.streams {
2862 cs.flow.add(delta)
2863 }
2864 cc.cond.Broadcast()
2865
2866 cc.initialWindowSize = s.Val
2867 case SettingHeaderTableSize:
2868 cc.henc.SetMaxDynamicTableSize(s.Val)
2869 cc.peerMaxHeaderTableSize = s.Val
2870 case SettingEnableConnectProtocol:
2871 if err := s.Valid(); err != nil {
2872 return err
2873 }
2874
2875
2876
2877
2878
2879
2880
2881
2882 if !cc.seenSettings {
2883 cc.extendedConnectAllowed = s.Val == 1
2884 }
2885 default:
2886 cc.vlogf("Unhandled Setting: %v", s)
2887 }
2888 return nil
2889 })
2890 if err != nil {
2891 return err
2892 }
2893
2894 if !cc.seenSettings {
2895 if !seenMaxConcurrentStreams {
2896
2897
2898
2899
2900 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2901 }
2902 close(cc.seenSettingsChan)
2903 cc.seenSettings = true
2904 }
2905
2906 return nil
2907 }
2908
2909 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2910 cc := rl.cc
2911 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2912 if f.StreamID != 0 && cs == nil {
2913 return nil
2914 }
2915
2916 cc.mu.Lock()
2917 defer cc.mu.Unlock()
2918
2919 fl := &cc.flow
2920 if cs != nil {
2921 fl = &cs.flow
2922 }
2923 if !fl.add(int32(f.Increment)) {
2924
2925 if cs != nil {
2926 rl.endStreamErrorLocked(cs, StreamError{
2927 StreamID: f.StreamID,
2928 Code: ErrCodeFlowControl,
2929 })
2930 return nil
2931 }
2932
2933 return ConnectionError(ErrCodeFlowControl)
2934 }
2935 cc.cond.Broadcast()
2936 return nil
2937 }
2938
2939 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2940 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2941 if cs == nil {
2942
2943 return nil
2944 }
2945 serr := streamError(cs.ID, f.ErrCode)
2946 serr.Cause = errFromPeer
2947 if f.ErrCode == ErrCodeProtocol {
2948 rl.cc.SetDoNotReuse()
2949 }
2950 if fn := cs.cc.t.CountError; fn != nil {
2951 fn("recv_rststream_" + f.ErrCode.stringToken())
2952 }
2953 cs.abortStream(serr)
2954
2955 cs.bufPipe.CloseWithError(serr)
2956 return nil
2957 }
2958
2959
2960 func (cc *ClientConn) Ping(ctx context.Context) error {
2961 c := make(chan struct{})
2962
2963 var p [8]byte
2964 for {
2965 if _, err := rand.Read(p[:]); err != nil {
2966 return err
2967 }
2968 cc.mu.Lock()
2969
2970 if _, found := cc.pings[p]; !found {
2971 cc.pings[p] = c
2972 cc.mu.Unlock()
2973 break
2974 }
2975 cc.mu.Unlock()
2976 }
2977 var pingError error
2978 errc := make(chan struct{})
2979 go func() {
2980 cc.wmu.Lock()
2981 defer cc.wmu.Unlock()
2982 if pingError = cc.fr.WritePing(false, p); pingError != nil {
2983 close(errc)
2984 return
2985 }
2986 if pingError = cc.bw.Flush(); pingError != nil {
2987 close(errc)
2988 return
2989 }
2990 }()
2991 select {
2992 case <-c:
2993 return nil
2994 case <-errc:
2995 return pingError
2996 case <-ctx.Done():
2997 return ctx.Err()
2998 case <-cc.readerDone:
2999
3000 return cc.readerErr
3001 }
3002 }
3003
3004 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
3005 if f.IsAck() {
3006 cc := rl.cc
3007 defer cc.maybeCallStateHook()
3008 cc.mu.Lock()
3009 defer cc.mu.Unlock()
3010
3011 if c, ok := cc.pings[f.Data]; ok {
3012 close(c)
3013 delete(cc.pings, f.Data)
3014 }
3015 if cc.pendingResets > 0 {
3016
3017 cc.pendingResets = 0
3018 cc.rstStreamPingsBlocked = true
3019 cc.cond.Broadcast()
3020 }
3021 return nil
3022 }
3023 cc := rl.cc
3024 cc.wmu.Lock()
3025 defer cc.wmu.Unlock()
3026 if err := cc.fr.WritePing(true, f.Data); err != nil {
3027 return err
3028 }
3029 return cc.bw.Flush()
3030 }
3031
3032 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3033
3034
3035
3036
3037
3038
3039
3040 return ConnectionError(ErrCodeProtocol)
3041 }
3042
3043
3044
3045 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
3046
3047
3048
3049
3050 cc.wmu.Lock()
3051 cc.fr.WriteRSTStream(streamID, code)
3052 if ping {
3053 var payload [8]byte
3054 rand.Read(payload[:])
3055 cc.fr.WritePing(false, payload)
3056 }
3057 cc.bw.Flush()
3058 cc.wmu.Unlock()
3059 }
3060
3061 var (
3062 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3063 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
3064 )
3065
3066 func (cc *ClientConn) logf(format string, args ...any) {
3067 cc.t.logf(format, args...)
3068 }
3069
3070 func (cc *ClientConn) vlogf(format string, args ...any) {
3071 cc.t.vlogf(format, args...)
3072 }
3073
3074 func (t *Transport) vlogf(format string, args ...any) {
3075 if VerboseLogs {
3076 t.logf(format, args...)
3077 }
3078 }
3079
3080 func (t *Transport) logf(format string, args ...any) {
3081 log.Printf(format, args...)
3082 }
3083
3084 type missingBody struct{}
3085
3086 func (missingBody) Close() error { return nil }
3087 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3088
3089 type erringRoundTripper struct{ err error }
3090
3091 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3092 func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
3093
3094 var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
3095
3096
3097
3098
3099
3100 type gzipReader struct {
3101 _ incomparable
3102 body io.ReadCloser
3103 mu sync.Mutex
3104 zr *gzip.Reader
3105 zerr error
3106 }
3107
3108 type eofReader struct{}
3109
3110 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3111 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3112
3113 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3114
3115
3116 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3117 zr := gzipPool.Get().(*gzip.Reader)
3118 if err := zr.Reset(r); err != nil {
3119 gzipPoolPut(zr)
3120 return nil, err
3121 }
3122 return zr, nil
3123 }
3124
3125
3126 func gzipPoolPut(zr *gzip.Reader) {
3127
3128
3129 var r flate.Reader = eofReader{}
3130 zr.Reset(r)
3131 gzipPool.Put(zr)
3132 }
3133
3134
3135
3136 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3137 gz.mu.Lock()
3138 defer gz.mu.Unlock()
3139 if gz.zerr != nil {
3140 return nil, gz.zerr
3141 }
3142 if gz.zr == nil {
3143 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3144 if gz.zerr != nil {
3145 return nil, gz.zerr
3146 }
3147 }
3148 ret := gz.zr
3149 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3150 return ret, nil
3151 }
3152
3153
3154 func (gz *gzipReader) release(zr *gzip.Reader) {
3155 gz.mu.Lock()
3156 defer gz.mu.Unlock()
3157 if gz.zerr == errConcurrentReadOnResBody {
3158 gz.zr, gz.zerr = zr, nil
3159 } else {
3160 gzipPoolPut(zr)
3161 }
3162 }
3163
3164
3165
3166 func (gz *gzipReader) close() {
3167 gz.mu.Lock()
3168 defer gz.mu.Unlock()
3169 if gz.zerr == nil && gz.zr != nil {
3170 gzipPoolPut(gz.zr)
3171 gz.zr = nil
3172 }
3173 gz.zerr = fs.ErrClosed
3174 }
3175
3176 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3177 zr, err := gz.acquire()
3178 if err != nil {
3179 return 0, err
3180 }
3181 defer gz.release(zr)
3182
3183 return zr.Read(p)
3184 }
3185
3186 func (gz *gzipReader) Close() error {
3187 gz.close()
3188
3189 return gz.body.Close()
3190 }
3191
3192
3193
3194 func isConnectionCloseRequest(req *ClientRequest) bool {
3195 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3196 }
3197
3198
3199
3200 type NetHTTPClientConn struct {
3201 cc *ClientConn
3202 }
3203
3204 func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
3205 return cc.cc.RoundTrip(req)
3206 }
3207
3208 func (cc NetHTTPClientConn) Close() error {
3209 return cc.cc.Close()
3210 }
3211
3212 func (cc NetHTTPClientConn) Err() error {
3213 cc.cc.mu.Lock()
3214 defer cc.cc.mu.Unlock()
3215 if cc.cc.closed {
3216 return errors.New("connection closed")
3217 }
3218 return nil
3219 }
3220
3221 func (cc NetHTTPClientConn) Reserve() error {
3222 defer cc.cc.maybeCallStateHook()
3223 cc.cc.mu.Lock()
3224 defer cc.cc.mu.Unlock()
3225 if !cc.cc.canReserveLocked() {
3226 return errors.New("connection is unavailable")
3227 }
3228 cc.cc.streamsReserved++
3229 return nil
3230 }
3231
3232 func (cc NetHTTPClientConn) Release() {
3233 defer cc.cc.maybeCallStateHook()
3234 cc.cc.mu.Lock()
3235 defer cc.cc.mu.Unlock()
3236
3237
3238
3239
3240 if cc.cc.streamsReserved > 0 {
3241 cc.cc.streamsReserved--
3242 }
3243 }
3244
3245 func (cc NetHTTPClientConn) Available() int {
3246 cc.cc.mu.Lock()
3247 defer cc.cc.mu.Unlock()
3248 return cc.cc.availableLocked()
3249 }
3250
3251 func (cc NetHTTPClientConn) InFlight() int {
3252 cc.cc.mu.Lock()
3253 defer cc.cc.mu.Unlock()
3254 return cc.cc.currentRequestCountLocked()
3255 }
3256
3257 func (cc *ClientConn) maybeCallStateHook() {
3258 if cc.internalStateHook != nil {
3259 cc.internalStateHook()
3260 }
3261 }
3262
3263 func (t *Transport) idleConnTimeout() time.Duration {
3264
3265
3266
3267 if t.IdleConnTimeout != 0 {
3268 return t.IdleConnTimeout
3269 }
3270
3271 if t.t1 != nil {
3272 return t.t1.IdleConnTimeout()
3273 }
3274
3275 return 0
3276 }
3277
3278 func traceGetConn(req *ClientRequest, hostPort string) {
3279 trace := httptrace.ContextClientTrace(req.Context)
3280 if trace == nil || trace.GetConn == nil {
3281 return
3282 }
3283 trace.GetConn(hostPort)
3284 }
3285
3286 func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
3287 trace := httptrace.ContextClientTrace(req.Context)
3288 if trace == nil || trace.GotConn == nil {
3289 return
3290 }
3291 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3292 ci.Reused = reused
3293 cc.mu.Lock()
3294 ci.WasIdle = len(cc.streams) == 0 && reused
3295 if ci.WasIdle && !cc.lastActive.IsZero() {
3296 ci.IdleTime = time.Since(cc.lastActive)
3297 }
3298 cc.mu.Unlock()
3299
3300 trace.GotConn(ci)
3301 }
3302
3303 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3304 if trace != nil && trace.WroteHeaders != nil {
3305 trace.WroteHeaders()
3306 }
3307 }
3308
3309 func traceGot100Continue(trace *httptrace.ClientTrace) {
3310 if trace != nil && trace.Got100Continue != nil {
3311 trace.Got100Continue()
3312 }
3313 }
3314
3315 func traceWait100Continue(trace *httptrace.ClientTrace) {
3316 if trace != nil && trace.Wait100Continue != nil {
3317 trace.Wait100Continue()
3318 }
3319 }
3320
3321 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3322 if trace != nil && trace.WroteRequest != nil {
3323 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3324 }
3325 }
3326
3327 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3328 if trace != nil && trace.GotFirstResponseByte != nil {
3329 trace.GotFirstResponseByte()
3330 }
3331 }
3332
3333 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3334 if trace != nil {
3335 return trace.Got1xxResponse
3336 }
3337 return nil
3338 }
3339
3340
3341
3342 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3343 dialer := &tls.Dialer{
3344 Config: cfg,
3345 }
3346 cn, err := dialer.DialContext(ctx, network, addr)
3347 if err != nil {
3348 return nil, err
3349 }
3350 tlsCn := cn.(*tls.Conn)
3351 return tlsCn, nil
3352 }
3353
View as plain text