Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal/ascii"
28 "net/textproto"
29 "net/url"
30 "reflect"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35 _ "unsafe"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http/httpproxy"
39 )
40
41
42
43
44
45
46 var DefaultTransport RoundTripper = &Transport{
47 Proxy: ProxyFromEnvironment,
48 DialContext: defaultTransportDialContext(&net.Dialer{
49 Timeout: 30 * time.Second,
50 KeepAlive: 30 * time.Second,
51 }),
52 ForceAttemptHTTP2: true,
53 MaxIdleConns: 100,
54 IdleConnTimeout: 90 * time.Second,
55 TLSHandshakeTimeout: 10 * time.Second,
56 ExpectContinueTimeout: 1 * time.Second,
57 }
58
59
60
61 const DefaultMaxIdleConnsPerHost = 2
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 type Transport struct {
98 idleMu sync.Mutex
99 closeIdle bool
100 idleConn map[connectMethodKey][]*persistConn
101 idleConnWait map[connectMethodKey]wantConnQueue
102 idleLRU connLRU
103
104 reqMu sync.Mutex
105 reqCanceler map[*Request]context.CancelCauseFunc
106
107 altMu sync.Mutex
108 altProto atomic.Value
109
110 connsPerHostMu sync.Mutex
111 connsPerHost map[connectMethodKey]int
112 connsPerHostWait map[connectMethodKey]wantConnQueue
113 dialsInProgress wantConnQueue
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 Proxy func(*Request) (*url.URL, error)
130
131
132
133
134 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
135
136
137
138
139
140
141
142
143
144 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
145
146
147
148
149
150
151
152
153
154
155
156 Dial func(network, addr string) (net.Conn, error)
157
158
159
160
161
162
163
164
165
166
167
168 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
169
170
171
172
173
174
175
176 DialTLS func(network, addr string) (net.Conn, error)
177
178
179
180
181
182 TLSClientConfig *tls.Config
183
184
185
186 TLSHandshakeTimeout time.Duration
187
188
189
190
191
192
193 DisableKeepAlives bool
194
195
196
197
198
199
200
201
202
203 DisableCompression bool
204
205
206
207 MaxIdleConns int
208
209
210
211
212 MaxIdleConnsPerHost int
213
214
215
216
217
218
219 MaxConnsPerHost int
220
221
222
223
224
225 IdleConnTimeout time.Duration
226
227
228
229
230
231 ResponseHeaderTimeout time.Duration
232
233
234
235
236
237
238
239
240 ExpectContinueTimeout time.Duration
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
256
257
258
259
260 ProxyConnectHeader Header
261
262
263
264
265
266
267
268
269 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
270
271
272
273
274
275
276 MaxResponseHeaderBytes int64
277
278
279
280
281 WriteBufferSize int
282
283
284
285
286 ReadBufferSize int
287
288
289
290 nextProtoOnce sync.Once
291 h2transport h2Transport
292 tlsNextProtoWasNil bool
293
294
295
296
297
298
299 ForceAttemptHTTP2 bool
300
301
302 HTTP2 *HTTP2Config
303
304
305
306
307
308
309
310
311
312 Protocols *Protocols
313 }
314
315 func (t *Transport) writeBufferSize() int {
316 if t.WriteBufferSize > 0 {
317 return t.WriteBufferSize
318 }
319 return 4 << 10
320 }
321
322 func (t *Transport) readBufferSize() int {
323 if t.ReadBufferSize > 0 {
324 return t.ReadBufferSize
325 }
326 return 4 << 10
327 }
328
329 func (t *Transport) maxHeaderResponseSize() int64 {
330 if t.MaxResponseHeaderBytes > 0 {
331 return t.MaxResponseHeaderBytes
332 }
333 return 10 << 20
334 }
335
336
337 func (t *Transport) Clone() *Transport {
338 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
339 t2 := &Transport{
340 Proxy: t.Proxy,
341 OnProxyConnectResponse: t.OnProxyConnectResponse,
342 DialContext: t.DialContext,
343 Dial: t.Dial,
344 DialTLS: t.DialTLS,
345 DialTLSContext: t.DialTLSContext,
346 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
347 DisableKeepAlives: t.DisableKeepAlives,
348 DisableCompression: t.DisableCompression,
349 MaxIdleConns: t.MaxIdleConns,
350 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
351 MaxConnsPerHost: t.MaxConnsPerHost,
352 IdleConnTimeout: t.IdleConnTimeout,
353 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
354 ExpectContinueTimeout: t.ExpectContinueTimeout,
355 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
356 GetProxyConnectHeader: t.GetProxyConnectHeader,
357 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
358 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
359 WriteBufferSize: t.WriteBufferSize,
360 ReadBufferSize: t.ReadBufferSize,
361 }
362 if t.TLSClientConfig != nil {
363 t2.TLSClientConfig = t.TLSClientConfig.Clone()
364 }
365 if t.HTTP2 != nil {
366 t2.HTTP2 = &HTTP2Config{}
367 *t2.HTTP2 = *t.HTTP2
368 }
369 if t.Protocols != nil {
370 t2.Protocols = &Protocols{}
371 *t2.Protocols = *t.Protocols
372 }
373 if !t.tlsNextProtoWasNil {
374 npm := maps.Clone(t.TLSNextProto)
375 if npm == nil {
376 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
377 }
378 t2.TLSNextProto = npm
379 }
380 return t2
381 }
382
383
384
385
386
387
388
389 type h2Transport interface {
390 CloseIdleConnections()
391 }
392
393 func (t *Transport) hasCustomTLSDialer() bool {
394 return t.DialTLS != nil || t.DialTLSContext != nil
395 }
396
397 var http2client = godebug.New("http2client")
398
399
400
401 func (t *Transport) onceSetNextProtoDefaults() {
402 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
403 if http2client.Value() == "0" {
404 http2client.IncNonDefault()
405 return
406 }
407
408
409
410
411
412
413 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
414 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
415 if v := rv.Field(0); v.CanInterface() {
416 if h2i, ok := v.Interface().(h2Transport); ok {
417 t.h2transport = h2i
418 return
419 }
420 }
421 }
422
423 if _, ok := t.TLSNextProto["h2"]; ok {
424
425 return
426 }
427 protocols := t.protocols()
428 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
429 return
430 }
431 if omitBundledHTTP2 {
432 return
433 }
434 t2, err := http2configureTransports(t)
435 if err != nil {
436 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
437 return
438 }
439 t.h2transport = t2
440
441
442
443
444
445
446
447 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
448 const h2max = 1<<32 - 1
449 if limit1 >= h2max {
450 t2.MaxHeaderListSize = h2max
451 } else {
452 t2.MaxHeaderListSize = uint32(limit1)
453 }
454 }
455
456
457
458
459
460
461 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
462 }
463
464 func (t *Transport) protocols() Protocols {
465 if t.Protocols != nil {
466 return *t.Protocols
467 }
468 var p Protocols
469 p.SetHTTP1(true)
470 switch {
471 case t.TLSNextProto != nil:
472
473
474 if t.TLSNextProto["h2"] != nil {
475 p.SetHTTP2(true)
476 }
477 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
478
479
480
481
482
483
484 case http2client.Value() == "0":
485 default:
486 p.SetHTTP2(true)
487 }
488 return p
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
508 return envProxyFunc()(req.URL)
509 }
510
511
512
513 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
514 return func(*Request) (*url.URL, error) {
515 return fixedURL, nil
516 }
517 }
518
519
520
521
522 type transportRequest struct {
523 *Request
524 extra Header
525 trace *httptrace.ClientTrace
526
527 ctx context.Context
528 cancel context.CancelCauseFunc
529
530 mu sync.Mutex
531 err error
532 }
533
534 func (tr *transportRequest) extraHeaders() Header {
535 if tr.extra == nil {
536 tr.extra = make(Header)
537 }
538 return tr.extra
539 }
540
541 func (tr *transportRequest) setError(err error) {
542 tr.mu.Lock()
543 if tr.err == nil {
544 tr.err = err
545 }
546 tr.mu.Unlock()
547 }
548
549
550
551 func (t *Transport) useRegisteredProtocol(req *Request) bool {
552 if req.URL.Scheme == "https" && req.requiresHTTP1() {
553
554
555
556
557 return false
558 }
559 return true
560 }
561
562
563
564
565 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
566 if !t.useRegisteredProtocol(req) {
567 return nil
568 }
569 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
570 return altProto[req.URL.Scheme]
571 }
572
573 func validateHeaders(hdrs Header) string {
574 for k, vv := range hdrs {
575 if !httpguts.ValidHeaderFieldName(k) {
576 return fmt.Sprintf("field name %q", k)
577 }
578 for _, v := range vv {
579 if !httpguts.ValidHeaderFieldValue(v) {
580
581
582 return fmt.Sprintf("field value for %q", k)
583 }
584 }
585 }
586 return ""
587 }
588
589
590 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
591 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
592 ctx := req.Context()
593 trace := httptrace.ContextClientTrace(ctx)
594
595 if req.URL == nil {
596 req.closeBody()
597 return nil, errors.New("http: nil Request.URL")
598 }
599 if req.Header == nil {
600 req.closeBody()
601 return nil, errors.New("http: nil Request.Header")
602 }
603 scheme := req.URL.Scheme
604 isHTTP := scheme == "http" || scheme == "https"
605 if isHTTP {
606
607 if err := validateHeaders(req.Header); err != "" {
608 req.closeBody()
609 return nil, fmt.Errorf("net/http: invalid header %s", err)
610 }
611
612
613 if err := validateHeaders(req.Trailer); err != "" {
614 req.closeBody()
615 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
616 }
617 }
618
619 origReq := req
620 req = setupRewindBody(req)
621
622 if altRT := t.alternateRoundTripper(req); altRT != nil {
623 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
624 return resp, err
625 }
626 var err error
627 req, err = rewindBody(req)
628 if err != nil {
629 return nil, err
630 }
631 }
632 if !isHTTP {
633 req.closeBody()
634 return nil, badStringError("unsupported protocol scheme", scheme)
635 }
636 if req.Method != "" && !validMethod(req.Method) {
637 req.closeBody()
638 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
639 }
640 if req.URL.Host == "" {
641 req.closeBody()
642 return nil, errors.New("http: no Host in request URL")
643 }
644
645
646
647
648
649
650
651
652
653
654 ctx, cancel := context.WithCancelCause(req.Context())
655
656
657 if origReq.Cancel != nil {
658 go awaitLegacyCancel(ctx, cancel, origReq)
659 }
660
661
662
663
664
665 cancel = t.prepareTransportCancel(origReq, cancel)
666
667 defer func() {
668 if err != nil {
669 cancel(err)
670 }
671 }()
672
673 for {
674 select {
675 case <-ctx.Done():
676 req.closeBody()
677 return nil, context.Cause(ctx)
678 default:
679 }
680
681
682 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
683 cm, err := t.connectMethodForRequest(treq)
684 if err != nil {
685 req.closeBody()
686 return nil, err
687 }
688
689
690
691
692
693 pconn, err := t.getConn(treq, cm)
694 if err != nil {
695 req.closeBody()
696 return nil, err
697 }
698
699 var resp *Response
700 if pconn.alt != nil {
701
702 resp, err = pconn.alt.RoundTrip(req)
703 } else {
704 resp, err = pconn.roundTrip(treq)
705 }
706 if err == nil {
707 if pconn.alt != nil {
708
709
710
711
712
713 cancel(errRequestDone)
714 }
715 resp.Request = origReq
716 return resp, nil
717 }
718
719
720 if http2isNoCachedConnError(err) {
721 if t.removeIdleConn(pconn) {
722 t.decConnsPerHost(pconn.cacheKey)
723 }
724 } else if !pconn.shouldRetryRequest(req, err) {
725
726
727 if e, ok := err.(nothingWrittenError); ok {
728 err = e.error
729 }
730 if e, ok := err.(transportReadFromServerError); ok {
731 err = e.err
732 }
733 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
734
735
736
737 req.closeBody()
738 }
739 return nil, err
740 }
741 testHookRoundTripRetried()
742
743
744 req, err = rewindBody(req)
745 if err != nil {
746 return nil, err
747 }
748 }
749 }
750
751 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
752 select {
753 case <-req.Cancel:
754 cancel(errRequestCanceled)
755 case <-ctx.Done():
756 }
757 }
758
759 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
760
761 type readTrackingBody struct {
762 io.ReadCloser
763 didRead bool
764 didClose atomic.Bool
765 }
766
767 func (r *readTrackingBody) Read(data []byte) (int, error) {
768 r.didRead = true
769 return r.ReadCloser.Read(data)
770 }
771
772 func (r *readTrackingBody) Close() error {
773 if !r.didClose.CompareAndSwap(false, true) {
774 return nil
775 }
776 return r.ReadCloser.Close()
777 }
778
779
780
781
782
783 func setupRewindBody(req *Request) *Request {
784 if req.Body == nil || req.Body == NoBody {
785 return req
786 }
787 newReq := *req
788 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
789 return &newReq
790 }
791
792
793
794
795
796 func rewindBody(req *Request) (rewound *Request, err error) {
797 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
798 return req, nil
799 }
800 if !req.Body.(*readTrackingBody).didClose.Load() {
801 req.closeBody()
802 }
803 if req.GetBody == nil {
804 return nil, errCannotRewind
805 }
806 body, err := req.GetBody()
807 if err != nil {
808 return nil, err
809 }
810 newReq := *req
811 newReq.Body = &readTrackingBody{ReadCloser: body}
812 return &newReq, nil
813 }
814
815
816
817
818 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
819 if http2isNoCachedConnError(err) {
820
821
822
823
824
825
826 return true
827 }
828 if err == errMissingHost {
829
830 return false
831 }
832 if !pc.isReused() {
833
834
835
836
837
838
839
840 return false
841 }
842 if _, ok := err.(nothingWrittenError); ok {
843
844
845 return req.outgoingLength() == 0 || req.GetBody != nil
846 }
847 if !req.isReplayable() {
848
849 return false
850 }
851 if _, ok := err.(transportReadFromServerError); ok {
852
853
854 return true
855 }
856 if err == errServerClosedIdle {
857
858
859
860 return true
861 }
862 return false
863 }
864
865
866 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
867
868
869
870
871
872
873
874
875
876
877
878 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
879 t.altMu.Lock()
880 defer t.altMu.Unlock()
881 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
882 if _, exists := oldMap[scheme]; exists {
883 panic("protocol " + scheme + " already registered")
884 }
885 newMap := maps.Clone(oldMap)
886 if newMap == nil {
887 newMap = make(map[string]RoundTripper)
888 }
889 newMap[scheme] = rt
890 t.altProto.Store(newMap)
891 }
892
893
894
895
896
897 func (t *Transport) CloseIdleConnections() {
898 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
899 t.idleMu.Lock()
900 m := t.idleConn
901 t.idleConn = nil
902 t.closeIdle = true
903 t.idleLRU = connLRU{}
904 t.idleMu.Unlock()
905 for _, conns := range m {
906 for _, pconn := range conns {
907 pconn.close(errCloseIdleConns)
908 }
909 }
910 t.connsPerHostMu.Lock()
911 t.dialsInProgress.all(func(w *wantConn) {
912 if w.cancelCtx != nil && !w.waiting() {
913 w.cancelCtx()
914 }
915 })
916 t.connsPerHostMu.Unlock()
917 if t2 := t.h2transport; t2 != nil {
918 t2.CloseIdleConnections()
919 }
920 }
921
922
923 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
924
925
926
927
928
929
930 cancel := func(err error) {
931 origCancel(err)
932 t.reqMu.Lock()
933 delete(t.reqCanceler, req)
934 t.reqMu.Unlock()
935 }
936 t.reqMu.Lock()
937 if t.reqCanceler == nil {
938 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
939 }
940 t.reqCanceler[req] = cancel
941 t.reqMu.Unlock()
942 return cancel
943 }
944
945
946
947
948
949
950
951 func (t *Transport) CancelRequest(req *Request) {
952 t.reqMu.Lock()
953 cancel := t.reqCanceler[req]
954 t.reqMu.Unlock()
955 if cancel != nil {
956 cancel(errRequestCanceled)
957 }
958 }
959
960
961
962
963
964 var (
965 envProxyOnce sync.Once
966 envProxyFuncValue func(*url.URL) (*url.URL, error)
967 )
968
969
970
971 func envProxyFunc() func(*url.URL) (*url.URL, error) {
972 envProxyOnce.Do(func() {
973 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
974 })
975 return envProxyFuncValue
976 }
977
978
979 func resetProxyConfig() {
980 envProxyOnce = sync.Once{}
981 envProxyFuncValue = nil
982 }
983
984 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
985 cm.targetScheme = treq.URL.Scheme
986 cm.targetAddr = canonicalAddr(treq.URL)
987 if t.Proxy != nil {
988 cm.proxyURL, err = t.Proxy(treq.Request)
989 }
990 cm.onlyH1 = treq.requiresHTTP1()
991 return cm, err
992 }
993
994
995
996 func (cm *connectMethod) proxyAuth() string {
997 if cm.proxyURL == nil {
998 return ""
999 }
1000 if u := cm.proxyURL.User; u != nil {
1001 username := u.Username()
1002 password, _ := u.Password()
1003 return "Basic " + basicAuth(username, password)
1004 }
1005 return ""
1006 }
1007
1008
1009 var (
1010 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1011 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1012 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1013 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1014 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1015 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1016 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1017 errIdleConnTimeout = errors.New("http: idle connection timeout")
1018
1019
1020
1021
1022
1023 errServerClosedIdle = errors.New("http: server closed idle connection")
1024 )
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034 type transportReadFromServerError struct {
1035 err error
1036 }
1037
1038 func (e transportReadFromServerError) Unwrap() error { return e.err }
1039
1040 func (e transportReadFromServerError) Error() string {
1041 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1042 }
1043
1044 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1045 if err := t.tryPutIdleConn(pconn); err != nil {
1046 pconn.close(err)
1047 }
1048 }
1049
1050 func (t *Transport) maxIdleConnsPerHost() int {
1051 if v := t.MaxIdleConnsPerHost; v != 0 {
1052 return v
1053 }
1054 return DefaultMaxIdleConnsPerHost
1055 }
1056
1057
1058
1059
1060
1061
1062 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1063 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1064 return errKeepAlivesDisabled
1065 }
1066 if pconn.isBroken() {
1067 return errConnBroken
1068 }
1069 pconn.markReused()
1070 if pconn.isClientConn {
1071
1072 defer pconn.internalStateHook()
1073 pconn.mu.Lock()
1074 defer pconn.mu.Unlock()
1075 if !pconn.inFlight {
1076 panic("pconn is not in flight")
1077 }
1078 pconn.inFlight = false
1079 select {
1080 case pconn.availch <- struct{}{}:
1081 default:
1082 panic("unable to make pconn available")
1083 }
1084 return nil
1085 }
1086
1087 t.idleMu.Lock()
1088 defer t.idleMu.Unlock()
1089
1090
1091
1092
1093 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1094 return nil
1095 }
1096
1097
1098
1099
1100
1101 key := pconn.cacheKey
1102 if q, ok := t.idleConnWait[key]; ok {
1103 done := false
1104 if pconn.alt == nil {
1105
1106
1107 for q.len() > 0 {
1108 w := q.popFront()
1109 if w.tryDeliver(pconn, nil, time.Time{}) {
1110 done = true
1111 break
1112 }
1113 }
1114 } else {
1115
1116
1117
1118
1119 for q.len() > 0 {
1120 w := q.popFront()
1121 w.tryDeliver(pconn, nil, time.Time{})
1122 }
1123 }
1124 if q.len() == 0 {
1125 delete(t.idleConnWait, key)
1126 } else {
1127 t.idleConnWait[key] = q
1128 }
1129 if done {
1130 return nil
1131 }
1132 }
1133
1134 if t.closeIdle {
1135 return errCloseIdle
1136 }
1137 if t.idleConn == nil {
1138 t.idleConn = make(map[connectMethodKey][]*persistConn)
1139 }
1140 idles := t.idleConn[key]
1141 if len(idles) >= t.maxIdleConnsPerHost() {
1142 return errTooManyIdleHost
1143 }
1144 for _, exist := range idles {
1145 if exist == pconn {
1146 log.Fatalf("dup idle pconn %p in freelist", pconn)
1147 }
1148 }
1149 t.idleConn[key] = append(idles, pconn)
1150 t.idleLRU.add(pconn)
1151 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1152 oldest := t.idleLRU.removeOldest()
1153 oldest.close(errTooManyIdle)
1154 t.removeIdleConnLocked(oldest)
1155 }
1156
1157
1158
1159
1160 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1161 if pconn.idleTimer != nil {
1162 pconn.idleTimer.Reset(t.IdleConnTimeout)
1163 } else {
1164 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1165 }
1166 }
1167 pconn.idleAt = time.Now()
1168 return nil
1169 }
1170
1171
1172
1173
1174 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1175 if t.DisableKeepAlives {
1176 return false
1177 }
1178
1179 t.idleMu.Lock()
1180 defer t.idleMu.Unlock()
1181
1182
1183
1184 t.closeIdle = false
1185
1186 if w == nil {
1187
1188 return false
1189 }
1190
1191
1192
1193
1194 var oldTime time.Time
1195 if t.IdleConnTimeout > 0 {
1196 oldTime = time.Now().Add(-t.IdleConnTimeout)
1197 }
1198
1199
1200 if list, ok := t.idleConn[w.key]; ok {
1201 stop := false
1202 delivered := false
1203 for len(list) > 0 && !stop {
1204 pconn := list[len(list)-1]
1205
1206
1207
1208
1209 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1210 if tooOld {
1211
1212
1213
1214 go pconn.closeConnIfStillIdle()
1215 }
1216 if pconn.isBroken() || tooOld {
1217
1218
1219
1220
1221
1222 list = list[:len(list)-1]
1223 continue
1224 }
1225 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1226 if delivered {
1227 if pconn.alt != nil {
1228
1229
1230 } else {
1231
1232
1233 t.idleLRU.remove(pconn)
1234 list = list[:len(list)-1]
1235 }
1236 }
1237 stop = true
1238 }
1239 if len(list) > 0 {
1240 t.idleConn[w.key] = list
1241 } else {
1242 delete(t.idleConn, w.key)
1243 }
1244 if stop {
1245 return delivered
1246 }
1247 }
1248
1249
1250 if t.idleConnWait == nil {
1251 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1252 }
1253 q := t.idleConnWait[w.key]
1254 q.cleanFrontNotWaiting()
1255 q.pushBack(w)
1256 t.idleConnWait[w.key] = q
1257 return false
1258 }
1259
1260
1261 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1262 if pconn.isClientConn {
1263 return true
1264 }
1265 t.idleMu.Lock()
1266 defer t.idleMu.Unlock()
1267 return t.removeIdleConnLocked(pconn)
1268 }
1269
1270
1271 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1272 if pconn.idleTimer != nil {
1273 pconn.idleTimer.Stop()
1274 }
1275 t.idleLRU.remove(pconn)
1276 key := pconn.cacheKey
1277 pconns := t.idleConn[key]
1278 var removed bool
1279 switch len(pconns) {
1280 case 0:
1281
1282 case 1:
1283 if pconns[0] == pconn {
1284 delete(t.idleConn, key)
1285 removed = true
1286 }
1287 default:
1288 for i, v := range pconns {
1289 if v != pconn {
1290 continue
1291 }
1292
1293
1294 copy(pconns[i:], pconns[i+1:])
1295 t.idleConn[key] = pconns[:len(pconns)-1]
1296 removed = true
1297 break
1298 }
1299 }
1300 return removed
1301 }
1302
1303 var zeroDialer net.Dialer
1304
1305 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1306 if t.DialContext != nil {
1307 c, err := t.DialContext(ctx, network, addr)
1308 if c == nil && err == nil {
1309 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1310 }
1311 return c, err
1312 }
1313 if t.Dial != nil {
1314 c, err := t.Dial(network, addr)
1315 if c == nil && err == nil {
1316 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1317 }
1318 return c, err
1319 }
1320 return zeroDialer.DialContext(ctx, network, addr)
1321 }
1322
1323
1324
1325
1326
1327
1328
1329 type wantConn struct {
1330 cm connectMethod
1331 key connectMethodKey
1332
1333
1334
1335
1336 beforeDial func()
1337 afterDial func()
1338
1339 mu sync.Mutex
1340 ctx context.Context
1341 cancelCtx context.CancelFunc
1342 done bool
1343 result chan connOrError
1344 }
1345
1346 type connOrError struct {
1347 pc *persistConn
1348 err error
1349 idleAt time.Time
1350 }
1351
1352
1353 func (w *wantConn) waiting() bool {
1354 w.mu.Lock()
1355 defer w.mu.Unlock()
1356
1357 return !w.done
1358 }
1359
1360
1361 func (w *wantConn) getCtxForDial() context.Context {
1362 w.mu.Lock()
1363 defer w.mu.Unlock()
1364
1365 return w.ctx
1366 }
1367
1368
1369 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1370 w.mu.Lock()
1371 defer w.mu.Unlock()
1372
1373 if w.done {
1374 return false
1375 }
1376 if (pc == nil) == (err == nil) {
1377 panic("net/http: internal error: misuse of tryDeliver")
1378 }
1379 w.ctx = nil
1380 w.done = true
1381
1382 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1383 close(w.result)
1384
1385 return true
1386 }
1387
1388
1389
1390 func (w *wantConn) cancel(t *Transport) {
1391 w.mu.Lock()
1392 var pc *persistConn
1393 if w.done {
1394 if r, ok := <-w.result; ok {
1395 pc = r.pc
1396 }
1397 } else {
1398 close(w.result)
1399 }
1400 w.ctx = nil
1401 w.done = true
1402 w.mu.Unlock()
1403
1404
1405
1406
1407 if pc != nil && pc.alt == nil {
1408 t.putOrCloseIdleConn(pc)
1409 }
1410 }
1411
1412
1413 type wantConnQueue struct {
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424 head []*wantConn
1425 headPos int
1426 tail []*wantConn
1427 }
1428
1429
1430 func (q *wantConnQueue) len() int {
1431 return len(q.head) - q.headPos + len(q.tail)
1432 }
1433
1434
1435 func (q *wantConnQueue) pushBack(w *wantConn) {
1436 q.tail = append(q.tail, w)
1437 }
1438
1439
1440 func (q *wantConnQueue) popFront() *wantConn {
1441 if q.headPos >= len(q.head) {
1442 if len(q.tail) == 0 {
1443 return nil
1444 }
1445
1446 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1447 }
1448 w := q.head[q.headPos]
1449 q.head[q.headPos] = nil
1450 q.headPos++
1451 return w
1452 }
1453
1454
1455 func (q *wantConnQueue) peekFront() *wantConn {
1456 if q.headPos < len(q.head) {
1457 return q.head[q.headPos]
1458 }
1459 if len(q.tail) > 0 {
1460 return q.tail[0]
1461 }
1462 return nil
1463 }
1464
1465
1466
1467 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1468 for {
1469 w := q.peekFront()
1470 if w == nil || w.waiting() {
1471 return cleaned
1472 }
1473 q.popFront()
1474 cleaned = true
1475 }
1476 }
1477
1478
1479 func (q *wantConnQueue) cleanFrontCanceled() {
1480 for {
1481 w := q.peekFront()
1482 if w == nil || w.cancelCtx != nil {
1483 return
1484 }
1485 q.popFront()
1486 }
1487 }
1488
1489
1490
1491 func (q *wantConnQueue) all(f func(*wantConn)) {
1492 for _, w := range q.head[q.headPos:] {
1493 f(w)
1494 }
1495 for _, w := range q.tail {
1496 f(w)
1497 }
1498 }
1499
1500 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1501 if t.DialTLSContext != nil {
1502 conn, err = t.DialTLSContext(ctx, network, addr)
1503 } else {
1504 conn, err = t.DialTLS(network, addr)
1505 }
1506 if conn == nil && err == nil {
1507 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1508 }
1509 return
1510 }
1511
1512
1513
1514
1515
1516 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1517 req := treq.Request
1518 trace := treq.trace
1519 ctx := req.Context()
1520 if trace != nil && trace.GetConn != nil {
1521 trace.GetConn(cm.addr())
1522 }
1523
1524
1525
1526
1527
1528
1529 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1530
1531 w := &wantConn{
1532 cm: cm,
1533 key: cm.key(),
1534 ctx: dialCtx,
1535 cancelCtx: dialCancel,
1536 result: make(chan connOrError, 1),
1537 beforeDial: testHookPrePendingDial,
1538 afterDial: testHookPostPendingDial,
1539 }
1540 defer func() {
1541 if err != nil {
1542 w.cancel(t)
1543 }
1544 }()
1545
1546
1547 if delivered := t.queueForIdleConn(w); !delivered {
1548 t.queueForDial(w)
1549 }
1550
1551
1552 select {
1553 case r := <-w.result:
1554
1555
1556 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1557 info := httptrace.GotConnInfo{
1558 Conn: r.pc.conn,
1559 Reused: r.pc.isReused(),
1560 }
1561 if !r.idleAt.IsZero() {
1562 info.WasIdle = true
1563 info.IdleTime = time.Since(r.idleAt)
1564 }
1565 trace.GotConn(info)
1566 }
1567 if r.err != nil {
1568
1569
1570
1571 select {
1572 case <-treq.ctx.Done():
1573 err := context.Cause(treq.ctx)
1574 if err == errRequestCanceled {
1575 err = errRequestCanceledConn
1576 }
1577 return nil, err
1578 default:
1579
1580 }
1581 }
1582 return r.pc, r.err
1583 case <-treq.ctx.Done():
1584 err := context.Cause(treq.ctx)
1585 if err == errRequestCanceled {
1586 err = errRequestCanceledConn
1587 }
1588 return nil, err
1589 }
1590 }
1591
1592
1593
1594 func (t *Transport) queueForDial(w *wantConn) {
1595 w.beforeDial()
1596
1597 t.connsPerHostMu.Lock()
1598 defer t.connsPerHostMu.Unlock()
1599
1600 if t.MaxConnsPerHost <= 0 {
1601 t.startDialConnForLocked(w)
1602 return
1603 }
1604
1605 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1606 if t.connsPerHost == nil {
1607 t.connsPerHost = make(map[connectMethodKey]int)
1608 }
1609 t.connsPerHost[w.key] = n + 1
1610 t.startDialConnForLocked(w)
1611 return
1612 }
1613
1614 if t.connsPerHostWait == nil {
1615 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1616 }
1617 q := t.connsPerHostWait[w.key]
1618 q.cleanFrontNotWaiting()
1619 q.pushBack(w)
1620 t.connsPerHostWait[w.key] = q
1621 }
1622
1623
1624
1625 func (t *Transport) startDialConnForLocked(w *wantConn) {
1626 t.dialsInProgress.cleanFrontCanceled()
1627 t.dialsInProgress.pushBack(w)
1628 go func() {
1629 t.dialConnFor(w)
1630 t.connsPerHostMu.Lock()
1631 defer t.connsPerHostMu.Unlock()
1632 w.cancelCtx = nil
1633 }()
1634 }
1635
1636
1637
1638
1639 func (t *Transport) dialConnFor(w *wantConn) {
1640 defer w.afterDial()
1641 ctx := w.getCtxForDial()
1642 if ctx == nil {
1643 t.decConnsPerHost(w.key)
1644 return
1645 }
1646
1647 const isClientConn = false
1648 pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
1649 delivered := w.tryDeliver(pc, err, time.Time{})
1650 if err == nil && (!delivered || pc.alt != nil) {
1651
1652
1653
1654 t.putOrCloseIdleConn(pc)
1655 }
1656 if err != nil {
1657 t.decConnsPerHost(w.key)
1658 }
1659 }
1660
1661
1662
1663 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1664 if t.MaxConnsPerHost <= 0 {
1665 return
1666 }
1667
1668 t.connsPerHostMu.Lock()
1669 defer t.connsPerHostMu.Unlock()
1670 n := t.connsPerHost[key]
1671 if n == 0 {
1672
1673
1674 panic("net/http: internal error: connCount underflow")
1675 }
1676
1677
1678
1679
1680
1681 if q := t.connsPerHostWait[key]; q.len() > 0 {
1682 done := false
1683 for q.len() > 0 {
1684 w := q.popFront()
1685 if w.waiting() {
1686 t.startDialConnForLocked(w)
1687 done = true
1688 break
1689 }
1690 }
1691 if q.len() == 0 {
1692 delete(t.connsPerHostWait, key)
1693 } else {
1694
1695
1696 t.connsPerHostWait[key] = q
1697 }
1698 if done {
1699 return
1700 }
1701 }
1702
1703
1704 if n--; n == 0 {
1705 delete(t.connsPerHost, key)
1706 } else {
1707 t.connsPerHost[key] = n
1708 }
1709 }
1710
1711
1712
1713
1714 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1715
1716 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1717 if cfg.ServerName == "" {
1718 cfg.ServerName = name
1719 }
1720 if pconn.cacheKey.onlyH1 {
1721 cfg.NextProtos = nil
1722 }
1723 plainConn := pconn.conn
1724 tlsConn := tls.Client(plainConn, cfg)
1725 errc := make(chan error, 2)
1726 var timer *time.Timer
1727 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1728 timer = time.AfterFunc(d, func() {
1729 errc <- tlsHandshakeTimeoutError{}
1730 })
1731 }
1732 go func() {
1733 if trace != nil && trace.TLSHandshakeStart != nil {
1734 trace.TLSHandshakeStart()
1735 }
1736 err := tlsConn.HandshakeContext(ctx)
1737 if timer != nil {
1738 timer.Stop()
1739 }
1740 errc <- err
1741 }()
1742 if err := <-errc; err != nil {
1743 plainConn.Close()
1744 if err == (tlsHandshakeTimeoutError{}) {
1745
1746
1747 <-errc
1748 }
1749 if trace != nil && trace.TLSHandshakeDone != nil {
1750 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1751 }
1752 return err
1753 }
1754 cs := tlsConn.ConnectionState()
1755 if trace != nil && trace.TLSHandshakeDone != nil {
1756 trace.TLSHandshakeDone(cs, nil)
1757 }
1758 pconn.tlsState = &cs
1759 pconn.conn = tlsConn
1760 return nil
1761 }
1762
1763 type erringRoundTripper interface {
1764 RoundTripErr() error
1765 }
1766
1767 var testHookProxyConnectTimeout = context.WithTimeout
1768
1769 func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
1770 pconn = &persistConn{
1771 t: t,
1772 cacheKey: cm.key(),
1773 reqch: make(chan requestAndChan, 1),
1774 writech: make(chan writeRequest, 1),
1775 closech: make(chan struct{}),
1776 writeErrCh: make(chan error, 1),
1777 writeLoopDone: make(chan struct{}),
1778 isClientConn: isClientConn,
1779 internalStateHook: internalStateHook,
1780 }
1781 trace := httptrace.ContextClientTrace(ctx)
1782 wrapErr := func(err error) error {
1783 if cm.proxyURL != nil {
1784
1785 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1786 }
1787 return err
1788 }
1789 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1790 var err error
1791 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1792 if err != nil {
1793 return nil, wrapErr(err)
1794 }
1795 if tc, ok := pconn.conn.(*tls.Conn); ok {
1796
1797
1798 if trace != nil && trace.TLSHandshakeStart != nil {
1799 trace.TLSHandshakeStart()
1800 }
1801 if err := tc.HandshakeContext(ctx); err != nil {
1802 go pconn.conn.Close()
1803 if trace != nil && trace.TLSHandshakeDone != nil {
1804 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1805 }
1806 return nil, err
1807 }
1808 cs := tc.ConnectionState()
1809 if trace != nil && trace.TLSHandshakeDone != nil {
1810 trace.TLSHandshakeDone(cs, nil)
1811 }
1812 pconn.tlsState = &cs
1813 }
1814 } else {
1815 conn, err := t.dial(ctx, "tcp", cm.addr())
1816 if err != nil {
1817 return nil, wrapErr(err)
1818 }
1819 pconn.conn = conn
1820 if cm.scheme() == "https" {
1821 var firstTLSHost string
1822 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1823 return nil, wrapErr(err)
1824 }
1825 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1826 return nil, wrapErr(err)
1827 }
1828 }
1829 }
1830
1831
1832 switch {
1833 case cm.proxyURL == nil:
1834
1835 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1836 conn := pconn.conn
1837 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1838 if u := cm.proxyURL.User; u != nil {
1839 auth := &socksUsernamePassword{
1840 Username: u.Username(),
1841 }
1842 auth.Password, _ = u.Password()
1843 d.AuthMethods = []socksAuthMethod{
1844 socksAuthMethodNotRequired,
1845 socksAuthMethodUsernamePassword,
1846 }
1847 d.Authenticate = auth.Authenticate
1848 }
1849 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1850 conn.Close()
1851 return nil, err
1852 }
1853 case cm.targetScheme == "http":
1854 pconn.isProxy = true
1855 if pa := cm.proxyAuth(); pa != "" {
1856 pconn.mutateHeaderFunc = func(h Header) {
1857 h.Set("Proxy-Authorization", pa)
1858 }
1859 }
1860 case cm.targetScheme == "https":
1861 conn := pconn.conn
1862 var hdr Header
1863 if t.GetProxyConnectHeader != nil {
1864 var err error
1865 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1866 if err != nil {
1867 conn.Close()
1868 return nil, err
1869 }
1870 } else {
1871 hdr = t.ProxyConnectHeader
1872 }
1873 if hdr == nil {
1874 hdr = make(Header)
1875 }
1876 if pa := cm.proxyAuth(); pa != "" {
1877 hdr = hdr.Clone()
1878 hdr.Set("Proxy-Authorization", pa)
1879 }
1880 connectReq := &Request{
1881 Method: "CONNECT",
1882 URL: &url.URL{Opaque: cm.targetAddr},
1883 Host: cm.targetAddr,
1884 Header: hdr,
1885 }
1886
1887
1888
1889
1890 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1891 defer cancel()
1892
1893 didReadResponse := make(chan struct{})
1894 var (
1895 resp *Response
1896 err error
1897 )
1898
1899 go func() {
1900 defer close(didReadResponse)
1901 err = connectReq.Write(conn)
1902 if err != nil {
1903 return
1904 }
1905
1906
1907 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1908 resp, err = ReadResponse(br, connectReq)
1909 }()
1910 select {
1911 case <-connectCtx.Done():
1912 conn.Close()
1913 <-didReadResponse
1914 return nil, connectCtx.Err()
1915 case <-didReadResponse:
1916
1917 }
1918 if err != nil {
1919 conn.Close()
1920 return nil, err
1921 }
1922
1923 if t.OnProxyConnectResponse != nil {
1924 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1925 if err != nil {
1926 conn.Close()
1927 return nil, err
1928 }
1929 }
1930
1931 if resp.StatusCode != 200 {
1932 _, text, ok := strings.Cut(resp.Status, " ")
1933 conn.Close()
1934 if !ok {
1935 return nil, errors.New("unknown status code")
1936 }
1937 return nil, errors.New(text)
1938 }
1939 }
1940
1941 if cm.proxyURL != nil && cm.targetScheme == "https" {
1942 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1943 return nil, err
1944 }
1945 }
1946
1947
1948 unencryptedHTTP2 := pconn.tlsState == nil &&
1949 t.Protocols != nil &&
1950 t.Protocols.UnencryptedHTTP2() &&
1951 !t.Protocols.HTTP1()
1952
1953 if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
1954 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
1955 h2, ok := altProto["https"].(newClientConner)
1956 if !ok {
1957 return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
1958 }
1959 alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
1960 if err != nil {
1961 pconn.conn.Close()
1962 return nil, err
1963 }
1964 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
1965 }
1966
1967 if unencryptedHTTP2 {
1968 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1969 if !ok {
1970 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1971 }
1972 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1973 if e, ok := alt.(erringRoundTripper); ok {
1974
1975 return nil, e.RoundTripErr()
1976 }
1977 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1978 }
1979
1980 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1981 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1982 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1983 if e, ok := alt.(erringRoundTripper); ok {
1984
1985 return nil, e.RoundTripErr()
1986 }
1987 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1988 }
1989 }
1990
1991 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1992 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1993
1994 go pconn.readLoop()
1995 go pconn.writeLoop()
1996 return pconn, nil
1997 }
1998
1999
2000
2001
2002
2003
2004
2005 type persistConnWriter struct {
2006 pc *persistConn
2007 }
2008
2009 func (w persistConnWriter) Write(p []byte) (n int, err error) {
2010 n, err = w.pc.conn.Write(p)
2011 w.pc.nwrite += int64(n)
2012 return
2013 }
2014
2015
2016
2017
2018 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
2019 n, err = io.Copy(w.pc.conn, r)
2020 w.pc.nwrite += n
2021 return
2022 }
2023
2024 var _ io.ReaderFrom = (*persistConnWriter)(nil)
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042 type connectMethod struct {
2043 _ incomparable
2044 proxyURL *url.URL
2045 targetScheme string
2046
2047
2048
2049 targetAddr string
2050 onlyH1 bool
2051 }
2052
2053 func (cm *connectMethod) key() connectMethodKey {
2054 proxyStr := ""
2055 targetAddr := cm.targetAddr
2056 if cm.proxyURL != nil {
2057 proxyStr = cm.proxyURL.String()
2058 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2059 targetAddr = ""
2060 }
2061 }
2062 return connectMethodKey{
2063 proxy: proxyStr,
2064 scheme: cm.targetScheme,
2065 addr: targetAddr,
2066 onlyH1: cm.onlyH1,
2067 }
2068 }
2069
2070
2071 func (cm *connectMethod) scheme() string {
2072 if cm.proxyURL != nil {
2073 return cm.proxyURL.Scheme
2074 }
2075 return cm.targetScheme
2076 }
2077
2078
2079 func (cm *connectMethod) addr() string {
2080 if cm.proxyURL != nil {
2081 return canonicalAddr(cm.proxyURL)
2082 }
2083 return cm.targetAddr
2084 }
2085
2086
2087
2088 func (cm *connectMethod) tlsHost() string {
2089 h := cm.targetAddr
2090 if hasPort(h) {
2091 h = h[:strings.LastIndex(h, ":")]
2092 }
2093 return h
2094 }
2095
2096
2097
2098
2099 type connectMethodKey struct {
2100 proxy, scheme, addr string
2101 onlyH1 bool
2102 }
2103
2104 func (k connectMethodKey) String() string {
2105
2106 var h1 string
2107 if k.onlyH1 {
2108 h1 = ",h1"
2109 }
2110 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2111 }
2112
2113
2114
2115 type persistConn struct {
2116
2117
2118
2119 alt RoundTripper
2120
2121 t *Transport
2122 cacheKey connectMethodKey
2123 conn net.Conn
2124 tlsState *tls.ConnectionState
2125 br *bufio.Reader
2126 bw *bufio.Writer
2127 nwrite int64
2128 reqch chan requestAndChan
2129 writech chan writeRequest
2130 closech chan struct{}
2131 availch chan struct{}
2132 isProxy bool
2133 sawEOF bool
2134 isClientConn bool
2135 readLimit int64
2136
2137
2138
2139
2140 writeErrCh chan error
2141
2142 writeLoopDone chan struct{}
2143
2144
2145 idleAt time.Time
2146 idleTimer *time.Timer
2147
2148 mu sync.Mutex
2149 numExpectedResponses int
2150 closed error
2151 canceledErr error
2152 reused bool
2153 reserved bool
2154 inFlight bool
2155 internalStateHook func()
2156
2157
2158
2159
2160 mutateHeaderFunc func(Header)
2161 }
2162
2163 func (pc *persistConn) maxHeaderResponseSize() int64 {
2164 return pc.t.maxHeaderResponseSize()
2165 }
2166
2167 func (pc *persistConn) Read(p []byte) (n int, err error) {
2168 if pc.readLimit <= 0 {
2169 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2170 }
2171 if int64(len(p)) > pc.readLimit {
2172 p = p[:pc.readLimit]
2173 }
2174 n, err = pc.conn.Read(p)
2175 if err == io.EOF {
2176 pc.sawEOF = true
2177 }
2178 pc.readLimit -= int64(n)
2179 return
2180 }
2181
2182
2183 func (pc *persistConn) isBroken() bool {
2184 pc.mu.Lock()
2185 b := pc.closed != nil
2186 pc.mu.Unlock()
2187 return b
2188 }
2189
2190
2191
2192 func (pc *persistConn) canceled() error {
2193 pc.mu.Lock()
2194 defer pc.mu.Unlock()
2195 return pc.canceledErr
2196 }
2197
2198
2199 func (pc *persistConn) isReused() bool {
2200 pc.mu.Lock()
2201 r := pc.reused
2202 pc.mu.Unlock()
2203 return r
2204 }
2205
2206 func (pc *persistConn) cancelRequest(err error) {
2207 pc.mu.Lock()
2208 defer pc.mu.Unlock()
2209 pc.canceledErr = err
2210 pc.closeLocked(errRequestCanceled)
2211 }
2212
2213
2214
2215
2216 func (pc *persistConn) closeConnIfStillIdle() {
2217 t := pc.t
2218 t.idleMu.Lock()
2219 defer t.idleMu.Unlock()
2220 if _, ok := t.idleLRU.m[pc]; !ok {
2221
2222 return
2223 }
2224 t.removeIdleConnLocked(pc)
2225 pc.close(errIdleConnTimeout)
2226 }
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2237 if err == nil {
2238 return nil
2239 }
2240
2241
2242
2243
2244
2245
2246
2247
2248 <-pc.writeLoopDone
2249
2250
2251
2252
2253 if cerr := pc.canceled(); cerr != nil {
2254 return cerr
2255 }
2256
2257
2258 req.mu.Lock()
2259 reqErr := req.err
2260 req.mu.Unlock()
2261 if reqErr != nil {
2262 return reqErr
2263 }
2264
2265 if err == errServerClosedIdle {
2266
2267 return err
2268 }
2269
2270 if _, ok := err.(transportReadFromServerError); ok {
2271 if pc.nwrite == startBytesWritten {
2272 return nothingWrittenError{err}
2273 }
2274
2275 return err
2276 }
2277 if pc.isBroken() {
2278 if pc.nwrite == startBytesWritten {
2279 return nothingWrittenError{err}
2280 }
2281 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2282 }
2283 return err
2284 }
2285
2286
2287
2288
2289 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2290
2291 func (pc *persistConn) readLoop() {
2292 closeErr := errReadLoopExiting
2293 defer func() {
2294 pc.close(closeErr)
2295 pc.t.removeIdleConn(pc)
2296 if pc.internalStateHook != nil {
2297 pc.internalStateHook()
2298 }
2299 }()
2300
2301 tryPutIdleConn := func(treq *transportRequest) bool {
2302 trace := treq.trace
2303 if err := pc.t.tryPutIdleConn(pc); err != nil {
2304 closeErr = err
2305 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2306 trace.PutIdleConn(err)
2307 }
2308 return false
2309 }
2310 if trace != nil && trace.PutIdleConn != nil {
2311 trace.PutIdleConn(nil)
2312 }
2313 return true
2314 }
2315
2316
2317
2318
2319 eofc := make(chan struct{})
2320 defer close(eofc)
2321
2322
2323 testHookMu.Lock()
2324 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2325 testHookMu.Unlock()
2326
2327 alive := true
2328 for alive {
2329 pc.readLimit = pc.maxHeaderResponseSize()
2330 _, err := pc.br.Peek(1)
2331
2332 pc.mu.Lock()
2333 if pc.numExpectedResponses == 0 {
2334 pc.readLoopPeekFailLocked(err)
2335 pc.mu.Unlock()
2336 return
2337 }
2338 pc.mu.Unlock()
2339
2340 rc := <-pc.reqch
2341 trace := rc.treq.trace
2342
2343 var resp *Response
2344 if err == nil {
2345 resp, err = pc.readResponse(rc, trace)
2346 } else {
2347 err = transportReadFromServerError{err}
2348 closeErr = err
2349 }
2350
2351 if err != nil {
2352 if pc.readLimit <= 0 {
2353 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2354 }
2355
2356 select {
2357 case rc.ch <- responseAndError{err: err}:
2358 case <-rc.callerGone:
2359 return
2360 }
2361 return
2362 }
2363 pc.readLimit = maxInt64
2364
2365 pc.mu.Lock()
2366 pc.numExpectedResponses--
2367 pc.mu.Unlock()
2368
2369 bodyWritable := resp.bodyIsWritable()
2370 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2371
2372 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2373
2374
2375
2376 alive = false
2377 }
2378
2379 if !hasBody || bodyWritable {
2380
2381
2382
2383
2384
2385 alive = alive &&
2386 !pc.sawEOF &&
2387 pc.wroteRequest() &&
2388 tryPutIdleConn(rc.treq)
2389
2390 if bodyWritable {
2391 closeErr = errCallerOwnsConn
2392 }
2393
2394 select {
2395 case rc.ch <- responseAndError{res: resp}:
2396 case <-rc.callerGone:
2397 return
2398 }
2399
2400 rc.treq.cancel(errRequestDone)
2401
2402
2403
2404
2405 testHookReadLoopBeforeNextRead()
2406 continue
2407 }
2408
2409 waitForBodyRead := make(chan bool, 2)
2410 body := &bodyEOFSignal{
2411 body: resp.Body,
2412 earlyCloseFn: func() error {
2413 waitForBodyRead <- false
2414 <-eofc
2415 return nil
2416
2417 },
2418 fn: func(err error) error {
2419 isEOF := err == io.EOF
2420 waitForBodyRead <- isEOF
2421 if isEOF {
2422 <-eofc
2423 } else if err != nil {
2424 if cerr := pc.canceled(); cerr != nil {
2425 return cerr
2426 }
2427 }
2428 return err
2429 },
2430 }
2431
2432 resp.Body = body
2433 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2434 resp.Body = &gzipReader{body: body}
2435 resp.Header.Del("Content-Encoding")
2436 resp.Header.Del("Content-Length")
2437 resp.ContentLength = -1
2438 resp.Uncompressed = true
2439 }
2440
2441 select {
2442 case rc.ch <- responseAndError{res: resp}:
2443 case <-rc.callerGone:
2444 return
2445 }
2446
2447
2448
2449
2450 select {
2451 case bodyEOF := <-waitForBodyRead:
2452 alive = alive &&
2453 bodyEOF &&
2454 !pc.sawEOF &&
2455 pc.wroteRequest() &&
2456 tryPutIdleConn(rc.treq)
2457 if bodyEOF {
2458 eofc <- struct{}{}
2459 }
2460 case <-rc.treq.ctx.Done():
2461 alive = false
2462 pc.cancelRequest(context.Cause(rc.treq.ctx))
2463 case <-pc.closech:
2464 alive = false
2465 }
2466
2467 rc.treq.cancel(errRequestDone)
2468 testHookReadLoopBeforeNextRead()
2469 }
2470 }
2471
2472 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2473 if pc.closed != nil {
2474 return
2475 }
2476 if n := pc.br.Buffered(); n > 0 {
2477 buf, _ := pc.br.Peek(n)
2478 if is408Message(buf) {
2479 pc.closeLocked(errServerClosedIdle)
2480 return
2481 } else {
2482 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2483 }
2484 }
2485 if peekErr == io.EOF {
2486
2487 pc.closeLocked(errServerClosedIdle)
2488 } else {
2489 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2490 }
2491 }
2492
2493
2494
2495
2496 func is408Message(buf []byte) bool {
2497 if len(buf) < len("HTTP/1.x 408") {
2498 return false
2499 }
2500 if string(buf[:7]) != "HTTP/1." {
2501 return false
2502 }
2503 return string(buf[8:12]) == " 408"
2504 }
2505
2506
2507
2508
2509 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2510 if trace != nil && trace.GotFirstResponseByte != nil {
2511 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2512 trace.GotFirstResponseByte()
2513 }
2514 }
2515
2516 continueCh := rc.continueCh
2517 for {
2518 resp, err = ReadResponse(pc.br, rc.treq.Request)
2519 if err != nil {
2520 return
2521 }
2522 resCode := resp.StatusCode
2523 if continueCh != nil && resCode == StatusContinue {
2524 if trace != nil && trace.Got100Continue != nil {
2525 trace.Got100Continue()
2526 }
2527 continueCh <- struct{}{}
2528 continueCh = nil
2529 }
2530 is1xx := 100 <= resCode && resCode <= 199
2531
2532 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2533 if is1xxNonTerminal {
2534 if trace != nil && trace.Got1xxResponse != nil {
2535 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2536 return nil, err
2537 }
2538
2539
2540
2541
2542
2543
2544
2545 pc.readLimit = pc.maxHeaderResponseSize()
2546 }
2547 continue
2548 }
2549 break
2550 }
2551 if resp.isProtocolSwitch() {
2552 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2553 }
2554 if continueCh != nil {
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567 if resp.Close || rc.treq.Request.Close {
2568 close(continueCh)
2569 } else {
2570 continueCh <- struct{}{}
2571 }
2572 }
2573
2574 resp.TLS = pc.tlsState
2575 return
2576 }
2577
2578
2579
2580
2581 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2582 if continueCh == nil {
2583 return nil
2584 }
2585 return func() bool {
2586 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2587 defer timer.Stop()
2588
2589 select {
2590 case _, ok := <-continueCh:
2591 return ok
2592 case <-timer.C:
2593 return true
2594 case <-pc.closech:
2595 return false
2596 }
2597 }
2598 }
2599
2600 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2601 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2602 if br.Buffered() != 0 {
2603 body.br = br
2604 }
2605 return body
2606 }
2607
2608
2609
2610
2611
2612
2613 type readWriteCloserBody struct {
2614 _ incomparable
2615 br *bufio.Reader
2616 io.ReadWriteCloser
2617 }
2618
2619 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2620 if b.br != nil {
2621 if n := b.br.Buffered(); len(p) > n {
2622 p = p[:n]
2623 }
2624 n, err = b.br.Read(p)
2625 if b.br.Buffered() == 0 {
2626 b.br = nil
2627 }
2628 return n, err
2629 }
2630 return b.ReadWriteCloser.Read(p)
2631 }
2632
2633 func (b *readWriteCloserBody) CloseWrite() error {
2634 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2635 return cw.CloseWrite()
2636 }
2637 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2638 }
2639
2640
2641 type nothingWrittenError struct {
2642 error
2643 }
2644
2645 func (nwe nothingWrittenError) Unwrap() error {
2646 return nwe.error
2647 }
2648
2649 func (pc *persistConn) writeLoop() {
2650 defer close(pc.writeLoopDone)
2651 for {
2652 select {
2653 case wr := <-pc.writech:
2654 startBytesWritten := pc.nwrite
2655 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2656 if bre, ok := err.(requestBodyReadError); ok {
2657 err = bre.error
2658
2659
2660
2661
2662
2663
2664
2665 wr.req.setError(err)
2666 }
2667 if err == nil {
2668 err = pc.bw.Flush()
2669 }
2670 if err != nil {
2671 if pc.nwrite == startBytesWritten {
2672 err = nothingWrittenError{err}
2673 }
2674 }
2675 pc.writeErrCh <- err
2676 wr.ch <- err
2677 if err != nil {
2678 pc.close(err)
2679 return
2680 }
2681 case <-pc.closech:
2682 return
2683 }
2684 }
2685 }
2686
2687
2688
2689
2690
2691
2692
2693 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2694
2695
2696
2697 func (pc *persistConn) wroteRequest() bool {
2698 select {
2699 case err := <-pc.writeErrCh:
2700
2701
2702 return err == nil
2703 default:
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2715 defer t.Stop()
2716 select {
2717 case err := <-pc.writeErrCh:
2718 return err == nil
2719 case <-t.C:
2720 return false
2721 }
2722 }
2723 }
2724
2725
2726
2727 type responseAndError struct {
2728 _ incomparable
2729 res *Response
2730 err error
2731 }
2732
2733 type requestAndChan struct {
2734 _ incomparable
2735 treq *transportRequest
2736 ch chan responseAndError
2737
2738
2739
2740
2741 addedGzip bool
2742
2743
2744
2745
2746
2747 continueCh chan<- struct{}
2748
2749 callerGone <-chan struct{}
2750 }
2751
2752
2753
2754
2755
2756 type writeRequest struct {
2757 req *transportRequest
2758 ch chan<- error
2759
2760
2761
2762
2763 continueCh <-chan struct{}
2764 }
2765
2766
2767
2768 type timeoutError struct {
2769 err string
2770 }
2771
2772 func (e *timeoutError) Error() string { return e.err }
2773 func (e *timeoutError) Timeout() bool { return true }
2774 func (e *timeoutError) Temporary() bool { return true }
2775 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2776
2777 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2778
2779
2780
2781 var errRequestCanceled = http2errRequestCanceled
2782 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2783
2784
2785
2786 var errRequestDone = errors.New("net/http: request completed")
2787
2788 func nop() {}
2789
2790
2791 var (
2792 testHookEnterRoundTrip = nop
2793 testHookWaitResLoop = nop
2794 testHookRoundTripRetried = nop
2795 testHookPrePendingDial = nop
2796 testHookPostPendingDial = nop
2797
2798 testHookMu sync.Locker = fakeLocker{}
2799 testHookReadLoopBeforeNextRead = nop
2800 )
2801
2802 func (pc *persistConn) waitForAvailability(ctx context.Context) error {
2803 select {
2804 case <-pc.availch:
2805 return nil
2806 case <-pc.closech:
2807 return pc.closed
2808 case <-ctx.Done():
2809 return ctx.Err()
2810 }
2811 }
2812
2813 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2814 testHookEnterRoundTrip()
2815
2816 pc.mu.Lock()
2817 if pc.isClientConn {
2818 if !pc.reserved {
2819 pc.mu.Unlock()
2820 if err := pc.waitForAvailability(req.ctx); err != nil {
2821 return nil, err
2822 }
2823 pc.mu.Lock()
2824 }
2825 pc.reserved = false
2826 pc.inFlight = true
2827 }
2828 pc.numExpectedResponses++
2829 headerFn := pc.mutateHeaderFunc
2830 pc.mu.Unlock()
2831
2832 if headerFn != nil {
2833 headerFn(req.extraHeaders())
2834 }
2835
2836
2837
2838
2839
2840 requestedGzip := false
2841 if !pc.t.DisableCompression &&
2842 req.Header.Get("Accept-Encoding") == "" &&
2843 req.Header.Get("Range") == "" &&
2844 req.Method != "HEAD" {
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857 requestedGzip = true
2858 req.extraHeaders().Set("Accept-Encoding", "gzip")
2859 }
2860
2861 var continueCh chan struct{}
2862 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2863 continueCh = make(chan struct{}, 1)
2864 }
2865
2866 if pc.t.DisableKeepAlives &&
2867 !req.wantsClose() &&
2868 !isProtocolSwitchHeader(req.Header) {
2869 req.extraHeaders().Set("Connection", "close")
2870 }
2871
2872 gone := make(chan struct{})
2873 defer close(gone)
2874
2875 const debugRoundTrip = false
2876
2877
2878
2879
2880 startBytesWritten := pc.nwrite
2881 writeErrCh := make(chan error, 1)
2882 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2883
2884 resc := make(chan responseAndError)
2885 pc.reqch <- requestAndChan{
2886 treq: req,
2887 ch: resc,
2888 addedGzip: requestedGzip,
2889 continueCh: continueCh,
2890 callerGone: gone,
2891 }
2892
2893 handleResponse := func(re responseAndError) (*Response, error) {
2894 if (re.res == nil) == (re.err == nil) {
2895 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2896 }
2897 if debugRoundTrip {
2898 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2899 }
2900 if re.err != nil {
2901 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2902 }
2903 return re.res, nil
2904 }
2905
2906 var respHeaderTimer <-chan time.Time
2907 ctxDoneChan := req.ctx.Done()
2908 pcClosed := pc.closech
2909 for {
2910 testHookWaitResLoop()
2911 select {
2912 case err := <-writeErrCh:
2913 if debugRoundTrip {
2914 req.logf("writeErrCh recv: %T/%#v", err, err)
2915 }
2916 if err != nil {
2917 pc.close(fmt.Errorf("write error: %w", err))
2918 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2919 }
2920 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2921 if debugRoundTrip {
2922 req.logf("starting timer for %v", d)
2923 }
2924 timer := time.NewTimer(d)
2925 defer timer.Stop()
2926 respHeaderTimer = timer.C
2927 }
2928 case <-pcClosed:
2929 select {
2930 case re := <-resc:
2931
2932
2933
2934 return handleResponse(re)
2935 default:
2936 }
2937 if debugRoundTrip {
2938 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2939 }
2940 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2941 case <-respHeaderTimer:
2942 if debugRoundTrip {
2943 req.logf("timeout waiting for response headers.")
2944 }
2945 pc.close(errTimeout)
2946 return nil, errTimeout
2947 case re := <-resc:
2948 return handleResponse(re)
2949 case <-ctxDoneChan:
2950 select {
2951 case re := <-resc:
2952
2953
2954
2955 return handleResponse(re)
2956 default:
2957 }
2958 pc.cancelRequest(context.Cause(req.ctx))
2959 }
2960 }
2961 }
2962
2963
2964
2965 type tLogKey struct{}
2966
2967 func (tr *transportRequest) logf(format string, args ...any) {
2968 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2969 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2970 }
2971 }
2972
2973
2974
2975 func (pc *persistConn) markReused() {
2976 pc.mu.Lock()
2977 pc.reused = true
2978 pc.mu.Unlock()
2979 }
2980
2981
2982
2983
2984
2985
2986 func (pc *persistConn) close(err error) {
2987 pc.mu.Lock()
2988 defer pc.mu.Unlock()
2989 pc.closeLocked(err)
2990 }
2991
2992 func (pc *persistConn) closeLocked(err error) {
2993 if err == nil {
2994 panic("nil error")
2995 }
2996 if pc.closed == nil {
2997 pc.closed = err
2998 pc.t.decConnsPerHost(pc.cacheKey)
2999
3000
3001 if pc.alt == nil {
3002 if err != errCallerOwnsConn {
3003 pc.conn.Close()
3004 }
3005 close(pc.closech)
3006 }
3007 }
3008 pc.mutateHeaderFunc = nil
3009 }
3010
3011 func schemePort(scheme string) string {
3012 switch scheme {
3013 case "http":
3014 return "80"
3015 case "https":
3016 return "443"
3017 case "socks5", "socks5h":
3018 return "1080"
3019 default:
3020 return ""
3021 }
3022 }
3023
3024 func idnaASCIIFromURL(url *url.URL) string {
3025 addr := url.Hostname()
3026 if v, err := idnaASCII(addr); err == nil {
3027 addr = v
3028 }
3029 return addr
3030 }
3031
3032
3033 func canonicalAddr(url *url.URL) string {
3034 port := url.Port()
3035 if port == "" {
3036 port = schemePort(url.Scheme)
3037 }
3038 return net.JoinHostPort(idnaASCIIFromURL(url), port)
3039 }
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052 type bodyEOFSignal struct {
3053 body io.ReadCloser
3054 mu sync.Mutex
3055 closed bool
3056 rerr error
3057 fn func(error) error
3058 earlyCloseFn func() error
3059 }
3060
3061 var errReadOnClosedResBody = errors.New("http: read on closed response body")
3062 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
3063
3064 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
3065 es.mu.Lock()
3066 closed, rerr := es.closed, es.rerr
3067 es.mu.Unlock()
3068 if closed {
3069 return 0, errReadOnClosedResBody
3070 }
3071 if rerr != nil {
3072 return 0, rerr
3073 }
3074
3075 n, err = es.body.Read(p)
3076 if err != nil {
3077 es.mu.Lock()
3078 defer es.mu.Unlock()
3079 if es.rerr == nil {
3080 es.rerr = err
3081 }
3082 err = es.condfn(err)
3083 }
3084 return
3085 }
3086
3087 func (es *bodyEOFSignal) Close() error {
3088 es.mu.Lock()
3089 defer es.mu.Unlock()
3090 if es.closed {
3091 return nil
3092 }
3093 es.closed = true
3094 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3095 return es.earlyCloseFn()
3096 }
3097 err := es.body.Close()
3098 return es.condfn(err)
3099 }
3100
3101
3102 func (es *bodyEOFSignal) condfn(err error) error {
3103 if es.fn == nil {
3104 return err
3105 }
3106 err = es.fn(err)
3107 es.fn = nil
3108 return err
3109 }
3110
3111
3112
3113
3114
3115 type gzipReader struct {
3116 _ incomparable
3117 body *bodyEOFSignal
3118 mu sync.Mutex
3119 zr *gzip.Reader
3120 zerr error
3121 }
3122
3123 type eofReader struct{}
3124
3125 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3126 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3127
3128 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3129
3130
3131 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3132 zr := gzipPool.Get().(*gzip.Reader)
3133 if err := zr.Reset(r); err != nil {
3134 gzipPoolPut(zr)
3135 return nil, err
3136 }
3137 return zr, nil
3138 }
3139
3140
3141 func gzipPoolPut(zr *gzip.Reader) {
3142
3143
3144 var r flate.Reader = eofReader{}
3145 zr.Reset(r)
3146 gzipPool.Put(zr)
3147 }
3148
3149
3150
3151 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3152 gz.mu.Lock()
3153 defer gz.mu.Unlock()
3154 if gz.zerr != nil {
3155 return nil, gz.zerr
3156 }
3157 if gz.zr == nil {
3158 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3159 if gz.zerr != nil {
3160 return nil, gz.zerr
3161 }
3162 }
3163 ret := gz.zr
3164 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3165 return ret, nil
3166 }
3167
3168
3169 func (gz *gzipReader) release(zr *gzip.Reader) {
3170 gz.mu.Lock()
3171 defer gz.mu.Unlock()
3172 if gz.zerr == errConcurrentReadOnResBody {
3173 gz.zr, gz.zerr = zr, nil
3174 } else {
3175 gzipPoolPut(zr)
3176 }
3177 }
3178
3179
3180
3181 func (gz *gzipReader) close() {
3182 gz.mu.Lock()
3183 defer gz.mu.Unlock()
3184 if gz.zerr == nil && gz.zr != nil {
3185 gzipPoolPut(gz.zr)
3186 gz.zr = nil
3187 }
3188 gz.zerr = errReadOnClosedResBody
3189 }
3190
3191 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3192 zr, err := gz.acquire()
3193 if err != nil {
3194 return 0, err
3195 }
3196 defer gz.release(zr)
3197
3198 return zr.Read(p)
3199 }
3200
3201 func (gz *gzipReader) Close() error {
3202 gz.close()
3203
3204 return gz.body.Close()
3205 }
3206
3207 type tlsHandshakeTimeoutError struct{}
3208
3209 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3210 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3211 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3212
3213
3214
3215
3216 type fakeLocker struct{}
3217
3218 func (fakeLocker) Lock() {}
3219 func (fakeLocker) Unlock() {}
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3235 if cfg == nil {
3236 return &tls.Config{}
3237 }
3238 return cfg.Clone()
3239 }
3240
3241 type connLRU struct {
3242 ll *list.List
3243 m map[*persistConn]*list.Element
3244 }
3245
3246
3247 func (cl *connLRU) add(pc *persistConn) {
3248 if cl.ll == nil {
3249 cl.ll = list.New()
3250 cl.m = make(map[*persistConn]*list.Element)
3251 }
3252 ele := cl.ll.PushFront(pc)
3253 if _, ok := cl.m[pc]; ok {
3254 panic("persistConn was already in LRU")
3255 }
3256 cl.m[pc] = ele
3257 }
3258
3259 func (cl *connLRU) removeOldest() *persistConn {
3260 ele := cl.ll.Back()
3261 pc := ele.Value.(*persistConn)
3262 cl.ll.Remove(ele)
3263 delete(cl.m, pc)
3264 return pc
3265 }
3266
3267
3268 func (cl *connLRU) remove(pc *persistConn) {
3269 if ele, ok := cl.m[pc]; ok {
3270 cl.ll.Remove(ele)
3271 delete(cl.m, pc)
3272 }
3273 }
3274
3275
3276 func (cl *connLRU) len() int {
3277 return len(cl.m)
3278 }
3279
View as plain text