Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal/ascii"
28 "net/textproto"
29 "net/url"
30 "reflect"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35 _ "unsafe"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http/httpproxy"
39 )
40
41
42
43
44
45
46 var DefaultTransport RoundTripper = &Transport{
47 Proxy: ProxyFromEnvironment,
48 DialContext: defaultTransportDialContext(&net.Dialer{
49 Timeout: 30 * time.Second,
50 KeepAlive: 30 * time.Second,
51 }),
52 ForceAttemptHTTP2: true,
53 MaxIdleConns: 100,
54 IdleConnTimeout: 90 * time.Second,
55 TLSHandshakeTimeout: 10 * time.Second,
56 ExpectContinueTimeout: 1 * time.Second,
57 }
58
59
60
61 const DefaultMaxIdleConnsPerHost = 2
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 type Transport struct {
98 idleMu sync.Mutex
99 closeIdle bool
100 idleConn map[connectMethodKey][]*persistConn
101 idleConnWait map[connectMethodKey]wantConnQueue
102 idleLRU connLRU
103
104 reqMu sync.Mutex
105 reqCanceler map[*Request]context.CancelCauseFunc
106
107 altMu sync.Mutex
108 altProto atomic.Value
109
110 connsPerHostMu sync.Mutex
111 connsPerHost map[connectMethodKey]int
112 connsPerHostWait map[connectMethodKey]wantConnQueue
113 dialsInProgress wantConnQueue
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 Proxy func(*Request) (*url.URL, error)
130
131
132
133
134 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
135
136
137
138
139
140
141
142
143
144 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
145
146
147
148
149
150
151
152
153
154
155
156 Dial func(network, addr string) (net.Conn, error)
157
158
159
160
161
162
163
164
165
166
167
168 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
169
170
171
172
173
174
175
176 DialTLS func(network, addr string) (net.Conn, error)
177
178
179
180
181
182 TLSClientConfig *tls.Config
183
184
185
186 TLSHandshakeTimeout time.Duration
187
188
189
190
191
192
193 DisableKeepAlives bool
194
195
196
197
198
199
200
201
202
203 DisableCompression bool
204
205
206
207 MaxIdleConns int
208
209
210
211
212 MaxIdleConnsPerHost int
213
214
215
216
217
218
219 MaxConnsPerHost int
220
221
222
223
224
225 IdleConnTimeout time.Duration
226
227
228
229
230
231 ResponseHeaderTimeout time.Duration
232
233
234
235
236
237
238
239
240 ExpectContinueTimeout time.Duration
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
256
257
258
259
260 ProxyConnectHeader Header
261
262
263
264
265
266
267
268
269 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
270
271
272
273
274
275
276 MaxResponseHeaderBytes int64
277
278
279
280
281 WriteBufferSize int
282
283
284
285
286 ReadBufferSize int
287
288
289
290 nextProtoOnce sync.Once
291 h2transport h2Transport
292 tlsNextProtoWasNil bool
293
294
295
296
297
298
299 ForceAttemptHTTP2 bool
300
301
302 HTTP2 *HTTP2Config
303
304
305
306
307
308
309
310
311
312 Protocols *Protocols
313 }
314
315 func (t *Transport) writeBufferSize() int {
316 if t.WriteBufferSize > 0 {
317 return t.WriteBufferSize
318 }
319 return 4 << 10
320 }
321
322 func (t *Transport) readBufferSize() int {
323 if t.ReadBufferSize > 0 {
324 return t.ReadBufferSize
325 }
326 return 4 << 10
327 }
328
329 func (t *Transport) maxHeaderResponseSize() int64 {
330 if t.MaxResponseHeaderBytes > 0 {
331 return t.MaxResponseHeaderBytes
332 }
333 return 10 << 20
334 }
335
336
337 func (t *Transport) Clone() *Transport {
338 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
339 t2 := &Transport{
340 Proxy: t.Proxy,
341 OnProxyConnectResponse: t.OnProxyConnectResponse,
342 DialContext: t.DialContext,
343 Dial: t.Dial,
344 DialTLS: t.DialTLS,
345 DialTLSContext: t.DialTLSContext,
346 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
347 DisableKeepAlives: t.DisableKeepAlives,
348 DisableCompression: t.DisableCompression,
349 MaxIdleConns: t.MaxIdleConns,
350 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
351 MaxConnsPerHost: t.MaxConnsPerHost,
352 IdleConnTimeout: t.IdleConnTimeout,
353 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
354 ExpectContinueTimeout: t.ExpectContinueTimeout,
355 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
356 GetProxyConnectHeader: t.GetProxyConnectHeader,
357 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
358 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
359 WriteBufferSize: t.WriteBufferSize,
360 ReadBufferSize: t.ReadBufferSize,
361 }
362 if t.TLSClientConfig != nil {
363 t2.TLSClientConfig = t.TLSClientConfig.Clone()
364 }
365 if t.HTTP2 != nil {
366 t2.HTTP2 = &HTTP2Config{}
367 *t2.HTTP2 = *t.HTTP2
368 }
369 if t.Protocols != nil {
370 t2.Protocols = &Protocols{}
371 *t2.Protocols = *t.Protocols
372 }
373 if !t.tlsNextProtoWasNil {
374 npm := maps.Clone(t.TLSNextProto)
375 if npm == nil {
376 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
377 }
378 t2.TLSNextProto = npm
379 }
380 return t2
381 }
382
383
384
385
386
387
388
389 type h2Transport interface {
390 CloseIdleConnections()
391 }
392
393 func (t *Transport) hasCustomTLSDialer() bool {
394 return t.DialTLS != nil || t.DialTLSContext != nil
395 }
396
397 var http2client = godebug.New("http2client")
398
399
400
401 func (t *Transport) onceSetNextProtoDefaults() {
402 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
403 if http2client.Value() == "0" {
404 http2client.IncNonDefault()
405 return
406 }
407
408
409
410
411
412
413 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
414 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
415 if v := rv.Field(0); v.CanInterface() {
416 if h2i, ok := v.Interface().(h2Transport); ok {
417 t.h2transport = h2i
418 return
419 }
420 }
421 }
422
423 if _, ok := t.TLSNextProto["h2"]; ok {
424
425 return
426 }
427 protocols := t.protocols()
428 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
429 return
430 }
431 if omitBundledHTTP2 {
432 return
433 }
434 t2, err := http2configureTransports(t)
435 if err != nil {
436 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
437 return
438 }
439 t.h2transport = t2
440
441
442
443
444
445
446
447 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
448 const h2max = 1<<32 - 1
449 if limit1 >= h2max {
450 t2.MaxHeaderListSize = h2max
451 } else {
452 t2.MaxHeaderListSize = uint32(limit1)
453 }
454 }
455
456
457
458
459
460
461 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
462 }
463
464 func (t *Transport) protocols() Protocols {
465 if t.Protocols != nil {
466 return *t.Protocols
467 }
468 var p Protocols
469 p.SetHTTP1(true)
470 switch {
471 case t.TLSNextProto != nil:
472
473
474 if t.TLSNextProto["h2"] != nil {
475 p.SetHTTP2(true)
476 }
477 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
478
479
480
481
482
483
484 case http2client.Value() == "0":
485 default:
486 p.SetHTTP2(true)
487 }
488 return p
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
508 return envProxyFunc()(req.URL)
509 }
510
511
512
513 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
514 return func(*Request) (*url.URL, error) {
515 return fixedURL, nil
516 }
517 }
518
519
520
521
522 type transportRequest struct {
523 *Request
524 extra Header
525 trace *httptrace.ClientTrace
526
527 ctx context.Context
528 cancel context.CancelCauseFunc
529
530 mu sync.Mutex
531 err error
532 }
533
534 func (tr *transportRequest) extraHeaders() Header {
535 if tr.extra == nil {
536 tr.extra = make(Header)
537 }
538 return tr.extra
539 }
540
541 func (tr *transportRequest) setError(err error) {
542 tr.mu.Lock()
543 if tr.err == nil {
544 tr.err = err
545 }
546 tr.mu.Unlock()
547 }
548
549
550
551 func (t *Transport) useRegisteredProtocol(req *Request) bool {
552 if req.URL.Scheme == "https" && req.requiresHTTP1() {
553
554
555
556
557 return false
558 }
559 return true
560 }
561
562
563
564
565 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
566 if !t.useRegisteredProtocol(req) {
567 return nil
568 }
569 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
570 return altProto[req.URL.Scheme]
571 }
572
573 func validateHeaders(hdrs Header) string {
574 for k, vv := range hdrs {
575 if !httpguts.ValidHeaderFieldName(k) {
576 return fmt.Sprintf("field name %q", k)
577 }
578 for _, v := range vv {
579 if !httpguts.ValidHeaderFieldValue(v) {
580
581
582 return fmt.Sprintf("field value for %q", k)
583 }
584 }
585 }
586 return ""
587 }
588
589
590 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
591 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
592 ctx := req.Context()
593 trace := httptrace.ContextClientTrace(ctx)
594
595 if req.URL == nil {
596 req.closeBody()
597 return nil, errors.New("http: nil Request.URL")
598 }
599 if req.Header == nil {
600 req.closeBody()
601 return nil, errors.New("http: nil Request.Header")
602 }
603 scheme := req.URL.Scheme
604 isHTTP := scheme == "http" || scheme == "https"
605 if isHTTP {
606
607 if err := validateHeaders(req.Header); err != "" {
608 req.closeBody()
609 return nil, fmt.Errorf("net/http: invalid header %s", err)
610 }
611
612
613 if err := validateHeaders(req.Trailer); err != "" {
614 req.closeBody()
615 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
616 }
617 }
618
619 origReq := req
620 req = setupRewindBody(req)
621
622 if altRT := t.alternateRoundTripper(req); altRT != nil {
623 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
624 return resp, err
625 }
626 var err error
627 req, err = rewindBody(req)
628 if err != nil {
629 return nil, err
630 }
631 }
632 if !isHTTP {
633 req.closeBody()
634 return nil, badStringError("unsupported protocol scheme", scheme)
635 }
636 if req.Method != "" && !validMethod(req.Method) {
637 req.closeBody()
638 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
639 }
640 if req.URL.Host == "" {
641 req.closeBody()
642 return nil, errors.New("http: no Host in request URL")
643 }
644
645
646
647
648
649
650
651
652
653
654 ctx, cancel := context.WithCancelCause(req.Context())
655
656
657 if origReq.Cancel != nil {
658 go awaitLegacyCancel(ctx, cancel, origReq)
659 }
660
661
662
663
664
665 cancel = t.prepareTransportCancel(origReq, cancel)
666
667 defer func() {
668 if err != nil {
669 cancel(err)
670 }
671 }()
672
673 for {
674 select {
675 case <-ctx.Done():
676 req.closeBody()
677 return nil, context.Cause(ctx)
678 default:
679 }
680
681
682 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
683 cm, err := t.connectMethodForRequest(treq)
684 if err != nil {
685 req.closeBody()
686 return nil, err
687 }
688
689
690
691
692
693 pconn, err := t.getConn(treq, cm)
694 if err != nil {
695 req.closeBody()
696 return nil, err
697 }
698
699 var resp *Response
700 if pconn.alt != nil {
701
702 resp, err = pconn.alt.RoundTrip(req)
703 } else {
704 resp, err = pconn.roundTrip(treq)
705 }
706 if err == nil {
707 if pconn.alt != nil {
708
709
710
711
712
713 cancel(errRequestDone)
714 }
715 resp.Request = origReq
716 return resp, nil
717 }
718
719
720 if http2isNoCachedConnError(err) {
721 if t.removeIdleConn(pconn) {
722 t.decConnsPerHost(pconn.cacheKey)
723 }
724 } else if !pconn.shouldRetryRequest(req, err) {
725
726
727 if e, ok := err.(nothingWrittenError); ok {
728 err = e.error
729 }
730 if e, ok := err.(transportReadFromServerError); ok {
731 err = e.err
732 }
733 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
734
735
736
737 req.closeBody()
738 }
739 return nil, err
740 }
741 testHookRoundTripRetried()
742
743
744 req, err = rewindBody(req)
745 if err != nil {
746 return nil, err
747 }
748 }
749 }
750
751 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
752 select {
753 case <-req.Cancel:
754 cancel(errRequestCanceled)
755 case <-ctx.Done():
756 }
757 }
758
759 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
760
761 type readTrackingBody struct {
762 io.ReadCloser
763 didRead bool
764 didClose atomic.Bool
765 }
766
767 func (r *readTrackingBody) Read(data []byte) (int, error) {
768 r.didRead = true
769 return r.ReadCloser.Read(data)
770 }
771
772 func (r *readTrackingBody) Close() error {
773 if !r.didClose.CompareAndSwap(false, true) {
774 return nil
775 }
776 return r.ReadCloser.Close()
777 }
778
779
780
781
782
783 func setupRewindBody(req *Request) *Request {
784 if req.Body == nil || req.Body == NoBody {
785 return req
786 }
787 newReq := *req
788 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
789 return &newReq
790 }
791
792
793
794
795
796 func rewindBody(req *Request) (rewound *Request, err error) {
797 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
798 return req, nil
799 }
800 if !req.Body.(*readTrackingBody).didClose.Load() {
801 req.closeBody()
802 }
803 if req.GetBody == nil {
804 return nil, errCannotRewind
805 }
806 body, err := req.GetBody()
807 if err != nil {
808 return nil, err
809 }
810 newReq := *req
811 newReq.Body = &readTrackingBody{ReadCloser: body}
812 return &newReq, nil
813 }
814
815
816
817
818 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
819 if http2isNoCachedConnError(err) {
820
821
822
823
824
825
826 return true
827 }
828 if err == errMissingHost {
829
830 return false
831 }
832 if !pc.isReused() {
833
834
835
836
837
838
839
840 return false
841 }
842 if _, ok := err.(nothingWrittenError); ok {
843
844
845 return req.outgoingLength() == 0 || req.GetBody != nil
846 }
847 if !req.isReplayable() {
848
849 return false
850 }
851 if _, ok := err.(transportReadFromServerError); ok {
852
853
854 return true
855 }
856 if err == errServerClosedIdle {
857
858
859
860 return true
861 }
862 return false
863 }
864
865
866 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
867
868
869
870
871
872
873
874
875
876
877
878 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
879 t.altMu.Lock()
880 defer t.altMu.Unlock()
881 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
882 if _, exists := oldMap[scheme]; exists {
883 panic("protocol " + scheme + " already registered")
884 }
885 newMap := maps.Clone(oldMap)
886 if newMap == nil {
887 newMap = make(map[string]RoundTripper)
888 }
889 newMap[scheme] = rt
890 t.altProto.Store(newMap)
891 }
892
893
894
895
896
897 func (t *Transport) CloseIdleConnections() {
898 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
899 t.idleMu.Lock()
900 m := t.idleConn
901 t.idleConn = nil
902 t.closeIdle = true
903 t.idleLRU = connLRU{}
904 t.idleMu.Unlock()
905 for _, conns := range m {
906 for _, pconn := range conns {
907 pconn.close(errCloseIdleConns)
908 }
909 }
910 t.connsPerHostMu.Lock()
911 t.dialsInProgress.all(func(w *wantConn) {
912 if w.cancelCtx != nil && !w.waiting() {
913 w.cancelCtx()
914 }
915 })
916 t.connsPerHostMu.Unlock()
917 if t2 := t.h2transport; t2 != nil {
918 t2.CloseIdleConnections()
919 }
920 }
921
922
923 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
924
925
926
927
928
929
930 cancel := func(err error) {
931 origCancel(err)
932 t.reqMu.Lock()
933 delete(t.reqCanceler, req)
934 t.reqMu.Unlock()
935 }
936 t.reqMu.Lock()
937 if t.reqCanceler == nil {
938 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
939 }
940 t.reqCanceler[req] = cancel
941 t.reqMu.Unlock()
942 return cancel
943 }
944
945
946
947
948
949
950
951 func (t *Transport) CancelRequest(req *Request) {
952 t.reqMu.Lock()
953 cancel := t.reqCanceler[req]
954 t.reqMu.Unlock()
955 if cancel != nil {
956 cancel(errRequestCanceled)
957 }
958 }
959
960
961
962
963
964 var (
965 envProxyOnce sync.Once
966 envProxyFuncValue func(*url.URL) (*url.URL, error)
967 )
968
969
970
971 func envProxyFunc() func(*url.URL) (*url.URL, error) {
972 envProxyOnce.Do(func() {
973 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
974 })
975 return envProxyFuncValue
976 }
977
978
979 func resetProxyConfig() {
980 envProxyOnce = sync.Once{}
981 envProxyFuncValue = nil
982 }
983
984 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
985 cm.targetScheme = treq.URL.Scheme
986 cm.targetAddr = canonicalAddr(treq.URL)
987 if t.Proxy != nil {
988 cm.proxyURL, err = t.Proxy(treq.Request)
989 }
990 cm.onlyH1 = treq.requiresHTTP1()
991 return cm, err
992 }
993
994
995
996 func (cm *connectMethod) proxyAuth() string {
997 if cm.proxyURL == nil {
998 return ""
999 }
1000 if u := cm.proxyURL.User; u != nil {
1001 username := u.Username()
1002 password, _ := u.Password()
1003 return "Basic " + basicAuth(username, password)
1004 }
1005 return ""
1006 }
1007
1008
1009 var (
1010 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1011 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1012 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1013 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1014 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1015 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1016 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1017 errIdleConnTimeout = errors.New("http: idle connection timeout")
1018
1019
1020
1021
1022
1023 errServerClosedIdle = errors.New("http: server closed idle connection")
1024 )
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034 type transportReadFromServerError struct {
1035 err error
1036 }
1037
1038 func (e transportReadFromServerError) Unwrap() error { return e.err }
1039
1040 func (e transportReadFromServerError) Error() string {
1041 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1042 }
1043
1044 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1045 if err := t.tryPutIdleConn(pconn); err != nil {
1046 pconn.close(err)
1047 }
1048 }
1049
1050 func (t *Transport) maxIdleConnsPerHost() int {
1051 if v := t.MaxIdleConnsPerHost; v != 0 {
1052 return v
1053 }
1054 return DefaultMaxIdleConnsPerHost
1055 }
1056
1057
1058
1059
1060
1061
1062 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1063 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1064 return errKeepAlivesDisabled
1065 }
1066 if pconn.isBroken() {
1067 return errConnBroken
1068 }
1069 pconn.markReused()
1070 if pconn.isClientConn {
1071
1072 defer pconn.internalStateHook()
1073 pconn.mu.Lock()
1074 defer pconn.mu.Unlock()
1075 if !pconn.inFlight {
1076 panic("pconn is not in flight")
1077 }
1078 pconn.inFlight = false
1079 select {
1080 case pconn.availch <- struct{}{}:
1081 default:
1082 panic("unable to make pconn available")
1083 }
1084 return nil
1085 }
1086
1087 t.idleMu.Lock()
1088 defer t.idleMu.Unlock()
1089
1090
1091
1092
1093 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1094 return nil
1095 }
1096
1097
1098
1099
1100
1101 key := pconn.cacheKey
1102 if q, ok := t.idleConnWait[key]; ok {
1103 done := false
1104 if pconn.alt == nil {
1105
1106
1107 for q.len() > 0 {
1108 w := q.popFront()
1109 if w.tryDeliver(pconn, nil, time.Time{}) {
1110 done = true
1111 break
1112 }
1113 }
1114 } else {
1115
1116
1117
1118
1119 for q.len() > 0 {
1120 w := q.popFront()
1121 w.tryDeliver(pconn, nil, time.Time{})
1122 }
1123 }
1124 if q.len() == 0 {
1125 delete(t.idleConnWait, key)
1126 } else {
1127 t.idleConnWait[key] = q
1128 }
1129 if done {
1130 return nil
1131 }
1132 }
1133
1134 if t.closeIdle {
1135 return errCloseIdle
1136 }
1137 if t.idleConn == nil {
1138 t.idleConn = make(map[connectMethodKey][]*persistConn)
1139 }
1140 idles := t.idleConn[key]
1141 if len(idles) >= t.maxIdleConnsPerHost() {
1142 return errTooManyIdleHost
1143 }
1144 for _, exist := range idles {
1145 if exist == pconn {
1146 log.Fatalf("dup idle pconn %p in freelist", pconn)
1147 }
1148 }
1149 t.idleConn[key] = append(idles, pconn)
1150 t.idleLRU.add(pconn)
1151 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1152 oldest := t.idleLRU.removeOldest()
1153 oldest.close(errTooManyIdle)
1154 t.removeIdleConnLocked(oldest)
1155 }
1156
1157
1158
1159
1160 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1161 if pconn.idleTimer != nil {
1162 pconn.idleTimer.Reset(t.IdleConnTimeout)
1163 } else {
1164 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1165 }
1166 }
1167 pconn.idleAt = time.Now()
1168 return nil
1169 }
1170
1171
1172
1173
1174 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1175 if t.DisableKeepAlives {
1176 return false
1177 }
1178
1179 t.idleMu.Lock()
1180 defer t.idleMu.Unlock()
1181
1182
1183
1184 t.closeIdle = false
1185
1186 if w == nil {
1187
1188 return false
1189 }
1190
1191
1192
1193
1194 var oldTime time.Time
1195 if t.IdleConnTimeout > 0 {
1196 oldTime = time.Now().Add(-t.IdleConnTimeout)
1197 }
1198
1199
1200 if list, ok := t.idleConn[w.key]; ok {
1201 stop := false
1202 delivered := false
1203 for len(list) > 0 && !stop {
1204 pconn := list[len(list)-1]
1205
1206
1207
1208
1209 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1210 if tooOld {
1211
1212
1213
1214 go pconn.closeConnIfStillIdle()
1215 }
1216 if pconn.isBroken() || tooOld {
1217
1218
1219
1220
1221
1222 list = list[:len(list)-1]
1223 continue
1224 }
1225 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1226 if delivered {
1227 if pconn.alt != nil {
1228
1229
1230 } else {
1231
1232
1233 t.idleLRU.remove(pconn)
1234 list = list[:len(list)-1]
1235 }
1236 }
1237 stop = true
1238 }
1239 if len(list) > 0 {
1240 t.idleConn[w.key] = list
1241 } else {
1242 delete(t.idleConn, w.key)
1243 }
1244 if stop {
1245 return delivered
1246 }
1247 }
1248
1249
1250 if t.idleConnWait == nil {
1251 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1252 }
1253 q := t.idleConnWait[w.key]
1254 q.cleanFrontNotWaiting()
1255 q.pushBack(w)
1256 t.idleConnWait[w.key] = q
1257 return false
1258 }
1259
1260
1261 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1262 if pconn.isClientConn {
1263 return true
1264 }
1265 t.idleMu.Lock()
1266 defer t.idleMu.Unlock()
1267 return t.removeIdleConnLocked(pconn)
1268 }
1269
1270
1271 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1272 if pconn.idleTimer != nil {
1273 pconn.idleTimer.Stop()
1274 }
1275 t.idleLRU.remove(pconn)
1276 key := pconn.cacheKey
1277 pconns := t.idleConn[key]
1278 var removed bool
1279 switch len(pconns) {
1280 case 0:
1281
1282 case 1:
1283 if pconns[0] == pconn {
1284 delete(t.idleConn, key)
1285 removed = true
1286 }
1287 default:
1288 for i, v := range pconns {
1289 if v != pconn {
1290 continue
1291 }
1292
1293
1294 copy(pconns[i:], pconns[i+1:])
1295 t.idleConn[key] = pconns[:len(pconns)-1]
1296 removed = true
1297 break
1298 }
1299 }
1300 return removed
1301 }
1302
1303 var zeroDialer net.Dialer
1304
1305 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1306 if t.DialContext != nil {
1307 c, err := t.DialContext(ctx, network, addr)
1308 if c == nil && err == nil {
1309 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1310 }
1311 return c, err
1312 }
1313 if t.Dial != nil {
1314 c, err := t.Dial(network, addr)
1315 if c == nil && err == nil {
1316 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1317 }
1318 return c, err
1319 }
1320 return zeroDialer.DialContext(ctx, network, addr)
1321 }
1322
1323
1324
1325
1326
1327
1328
1329 type wantConn struct {
1330 cm connectMethod
1331 key connectMethodKey
1332
1333
1334
1335
1336 beforeDial func()
1337 afterDial func()
1338
1339 mu sync.Mutex
1340 ctx context.Context
1341 cancelCtx context.CancelFunc
1342 done bool
1343 result chan connOrError
1344 }
1345
1346 type connOrError struct {
1347 pc *persistConn
1348 err error
1349 idleAt time.Time
1350 }
1351
1352
1353 func (w *wantConn) waiting() bool {
1354 w.mu.Lock()
1355 defer w.mu.Unlock()
1356
1357 return !w.done
1358 }
1359
1360
1361 func (w *wantConn) getCtxForDial() context.Context {
1362 w.mu.Lock()
1363 defer w.mu.Unlock()
1364
1365 return w.ctx
1366 }
1367
1368
1369 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1370 w.mu.Lock()
1371 defer w.mu.Unlock()
1372
1373 if w.done {
1374 return false
1375 }
1376 if (pc == nil) == (err == nil) {
1377 panic("net/http: internal error: misuse of tryDeliver")
1378 }
1379 w.ctx = nil
1380 w.done = true
1381
1382 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1383 close(w.result)
1384
1385 return true
1386 }
1387
1388
1389
1390 func (w *wantConn) cancel(t *Transport) {
1391 w.mu.Lock()
1392 var pc *persistConn
1393 if w.done {
1394 if r, ok := <-w.result; ok {
1395 pc = r.pc
1396 }
1397 } else {
1398 close(w.result)
1399 }
1400 w.ctx = nil
1401 w.done = true
1402 w.mu.Unlock()
1403
1404
1405
1406
1407 if pc != nil && pc.alt == nil {
1408 t.putOrCloseIdleConn(pc)
1409 }
1410 }
1411
1412
1413 type wantConnQueue struct {
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424 head []*wantConn
1425 headPos int
1426 tail []*wantConn
1427 }
1428
1429
1430 func (q *wantConnQueue) len() int {
1431 return len(q.head) - q.headPos + len(q.tail)
1432 }
1433
1434
1435 func (q *wantConnQueue) pushBack(w *wantConn) {
1436 q.tail = append(q.tail, w)
1437 }
1438
1439
1440 func (q *wantConnQueue) popFront() *wantConn {
1441 if q.headPos >= len(q.head) {
1442 if len(q.tail) == 0 {
1443 return nil
1444 }
1445
1446 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1447 }
1448 w := q.head[q.headPos]
1449 q.head[q.headPos] = nil
1450 q.headPos++
1451 return w
1452 }
1453
1454
1455 func (q *wantConnQueue) peekFront() *wantConn {
1456 if q.headPos < len(q.head) {
1457 return q.head[q.headPos]
1458 }
1459 if len(q.tail) > 0 {
1460 return q.tail[0]
1461 }
1462 return nil
1463 }
1464
1465
1466
1467 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1468 for {
1469 w := q.peekFront()
1470 if w == nil || w.waiting() {
1471 return cleaned
1472 }
1473 q.popFront()
1474 cleaned = true
1475 }
1476 }
1477
1478
1479 func (q *wantConnQueue) cleanFrontCanceled() {
1480 for {
1481 w := q.peekFront()
1482 if w == nil || w.cancelCtx != nil {
1483 return
1484 }
1485 q.popFront()
1486 }
1487 }
1488
1489
1490
1491 func (q *wantConnQueue) all(f func(*wantConn)) {
1492 for _, w := range q.head[q.headPos:] {
1493 f(w)
1494 }
1495 for _, w := range q.tail {
1496 f(w)
1497 }
1498 }
1499
1500 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1501 if t.DialTLSContext != nil {
1502 conn, err = t.DialTLSContext(ctx, network, addr)
1503 } else {
1504 conn, err = t.DialTLS(network, addr)
1505 }
1506 if conn == nil && err == nil {
1507 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1508 }
1509 return
1510 }
1511
1512
1513
1514
1515
1516 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1517 req := treq.Request
1518 trace := treq.trace
1519 ctx := req.Context()
1520 if trace != nil && trace.GetConn != nil {
1521 trace.GetConn(cm.addr())
1522 }
1523
1524
1525
1526
1527
1528
1529 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1530
1531 w := &wantConn{
1532 cm: cm,
1533 key: cm.key(),
1534 ctx: dialCtx,
1535 cancelCtx: dialCancel,
1536 result: make(chan connOrError, 1),
1537 beforeDial: testHookPrePendingDial,
1538 afterDial: testHookPostPendingDial,
1539 }
1540 defer func() {
1541 if err != nil {
1542 w.cancel(t)
1543 }
1544 }()
1545
1546
1547 if delivered := t.queueForIdleConn(w); !delivered {
1548 t.queueForDial(w)
1549 }
1550
1551
1552 select {
1553 case r := <-w.result:
1554
1555
1556 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1557 info := httptrace.GotConnInfo{
1558 Conn: r.pc.conn,
1559 Reused: r.pc.isReused(),
1560 }
1561 if !r.idleAt.IsZero() {
1562 info.WasIdle = true
1563 info.IdleTime = time.Since(r.idleAt)
1564 }
1565 trace.GotConn(info)
1566 }
1567 if r.err != nil {
1568
1569
1570
1571 select {
1572 case <-treq.ctx.Done():
1573 err := context.Cause(treq.ctx)
1574 if err == errRequestCanceled {
1575 err = errRequestCanceledConn
1576 }
1577 return nil, err
1578 default:
1579
1580 }
1581 }
1582 return r.pc, r.err
1583 case <-treq.ctx.Done():
1584 err := context.Cause(treq.ctx)
1585 if err == errRequestCanceled {
1586 err = errRequestCanceledConn
1587 }
1588 return nil, err
1589 }
1590 }
1591
1592
1593
1594 func (t *Transport) queueForDial(w *wantConn) {
1595 w.beforeDial()
1596
1597 t.connsPerHostMu.Lock()
1598 defer t.connsPerHostMu.Unlock()
1599
1600 if t.MaxConnsPerHost <= 0 {
1601 t.startDialConnForLocked(w)
1602 return
1603 }
1604
1605 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1606 if t.connsPerHost == nil {
1607 t.connsPerHost = make(map[connectMethodKey]int)
1608 }
1609 t.connsPerHost[w.key] = n + 1
1610 t.startDialConnForLocked(w)
1611 return
1612 }
1613
1614 if t.connsPerHostWait == nil {
1615 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1616 }
1617 q := t.connsPerHostWait[w.key]
1618 q.cleanFrontNotWaiting()
1619 q.pushBack(w)
1620 t.connsPerHostWait[w.key] = q
1621 }
1622
1623
1624
1625 func (t *Transport) startDialConnForLocked(w *wantConn) {
1626 t.dialsInProgress.cleanFrontCanceled()
1627 t.dialsInProgress.pushBack(w)
1628 go func() {
1629 t.dialConnFor(w)
1630 t.connsPerHostMu.Lock()
1631 defer t.connsPerHostMu.Unlock()
1632 w.cancelCtx = nil
1633 }()
1634 }
1635
1636
1637
1638
1639 func (t *Transport) dialConnFor(w *wantConn) {
1640 defer w.afterDial()
1641 ctx := w.getCtxForDial()
1642 if ctx == nil {
1643 t.decConnsPerHost(w.key)
1644 return
1645 }
1646
1647 const isClientConn = false
1648 pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
1649 delivered := w.tryDeliver(pc, err, time.Time{})
1650 if err == nil && (!delivered || pc.alt != nil) {
1651
1652
1653
1654 t.putOrCloseIdleConn(pc)
1655 }
1656 if err != nil {
1657 t.decConnsPerHost(w.key)
1658 }
1659 }
1660
1661
1662
1663 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1664 if t.MaxConnsPerHost <= 0 {
1665 return
1666 }
1667
1668 t.connsPerHostMu.Lock()
1669 defer t.connsPerHostMu.Unlock()
1670 n := t.connsPerHost[key]
1671 if n == 0 {
1672
1673
1674 panic("net/http: internal error: connCount underflow")
1675 }
1676
1677
1678
1679
1680
1681 if q := t.connsPerHostWait[key]; q.len() > 0 {
1682 done := false
1683 for q.len() > 0 {
1684 w := q.popFront()
1685 if w.waiting() {
1686 t.startDialConnForLocked(w)
1687 done = true
1688 break
1689 }
1690 }
1691 if q.len() == 0 {
1692 delete(t.connsPerHostWait, key)
1693 } else {
1694
1695
1696 t.connsPerHostWait[key] = q
1697 }
1698 if done {
1699 return
1700 }
1701 }
1702
1703
1704 if n--; n == 0 {
1705 delete(t.connsPerHost, key)
1706 } else {
1707 t.connsPerHost[key] = n
1708 }
1709 }
1710
1711
1712
1713
1714 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1715
1716 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1717 if cfg.ServerName == "" {
1718 cfg.ServerName = name
1719 }
1720 if pconn.cacheKey.onlyH1 {
1721 cfg.NextProtos = nil
1722 }
1723 plainConn := pconn.conn
1724 tlsConn := tls.Client(plainConn, cfg)
1725 errc := make(chan error, 2)
1726 var timer *time.Timer
1727 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1728 timer = time.AfterFunc(d, func() {
1729 errc <- tlsHandshakeTimeoutError{}
1730 })
1731 }
1732 go func() {
1733 if trace != nil && trace.TLSHandshakeStart != nil {
1734 trace.TLSHandshakeStart()
1735 }
1736 err := tlsConn.HandshakeContext(ctx)
1737 if timer != nil {
1738 timer.Stop()
1739 }
1740 errc <- err
1741 }()
1742 if err := <-errc; err != nil {
1743 plainConn.Close()
1744 if err == (tlsHandshakeTimeoutError{}) {
1745
1746
1747 <-errc
1748 }
1749 if trace != nil && trace.TLSHandshakeDone != nil {
1750 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1751 }
1752 return err
1753 }
1754 cs := tlsConn.ConnectionState()
1755 if trace != nil && trace.TLSHandshakeDone != nil {
1756 trace.TLSHandshakeDone(cs, nil)
1757 }
1758 pconn.tlsState = &cs
1759 pconn.conn = tlsConn
1760 return nil
1761 }
1762
1763 type erringRoundTripper interface {
1764 RoundTripErr() error
1765 }
1766
1767 var testHookProxyConnectTimeout = context.WithTimeout
1768
1769 func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
1770 pconn = &persistConn{
1771 t: t,
1772 cacheKey: cm.key(),
1773 reqch: make(chan requestAndChan, 1),
1774 writech: make(chan writeRequest, 1),
1775 closech: make(chan struct{}),
1776 writeErrCh: make(chan error, 1),
1777 writeLoopDone: make(chan struct{}),
1778 isClientConn: isClientConn,
1779 internalStateHook: internalStateHook,
1780 }
1781 trace := httptrace.ContextClientTrace(ctx)
1782 wrapErr := func(err error) error {
1783 if cm.proxyURL != nil {
1784
1785 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1786 }
1787 return err
1788 }
1789 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1790 var err error
1791 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1792 if err != nil {
1793 return nil, wrapErr(err)
1794 }
1795 if tc, ok := pconn.conn.(*tls.Conn); ok {
1796
1797
1798 if trace != nil && trace.TLSHandshakeStart != nil {
1799 trace.TLSHandshakeStart()
1800 }
1801 if err := tc.HandshakeContext(ctx); err != nil {
1802 go pconn.conn.Close()
1803 if trace != nil && trace.TLSHandshakeDone != nil {
1804 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1805 }
1806 return nil, err
1807 }
1808 cs := tc.ConnectionState()
1809 if trace != nil && trace.TLSHandshakeDone != nil {
1810 trace.TLSHandshakeDone(cs, nil)
1811 }
1812 pconn.tlsState = &cs
1813 }
1814 } else {
1815 conn, err := t.dial(ctx, "tcp", cm.addr())
1816 if err != nil {
1817 return nil, wrapErr(err)
1818 }
1819 pconn.conn = conn
1820 if cm.scheme() == "https" {
1821 var firstTLSHost string
1822 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1823 return nil, wrapErr(err)
1824 }
1825 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1826 return nil, wrapErr(err)
1827 }
1828 }
1829 }
1830
1831
1832 switch {
1833 case cm.proxyURL == nil:
1834
1835 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1836 conn := pconn.conn
1837 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1838 if u := cm.proxyURL.User; u != nil {
1839 auth := &socksUsernamePassword{
1840 Username: u.Username(),
1841 }
1842 auth.Password, _ = u.Password()
1843 d.AuthMethods = []socksAuthMethod{
1844 socksAuthMethodNotRequired,
1845 socksAuthMethodUsernamePassword,
1846 }
1847 d.Authenticate = auth.Authenticate
1848 }
1849 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1850 conn.Close()
1851 return nil, err
1852 }
1853 case cm.targetScheme == "http":
1854 pconn.isProxy = true
1855 if pa := cm.proxyAuth(); pa != "" {
1856 pconn.mutateHeaderFunc = func(h Header) {
1857 h.Set("Proxy-Authorization", pa)
1858 }
1859 }
1860 case cm.targetScheme == "https":
1861 conn := pconn.conn
1862 var hdr Header
1863 if t.GetProxyConnectHeader != nil {
1864 var err error
1865 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1866 if err != nil {
1867 conn.Close()
1868 return nil, err
1869 }
1870 } else {
1871 hdr = t.ProxyConnectHeader
1872 }
1873 if hdr == nil {
1874 hdr = make(Header)
1875 }
1876 if pa := cm.proxyAuth(); pa != "" {
1877 hdr = hdr.Clone()
1878 hdr.Set("Proxy-Authorization", pa)
1879 }
1880 connectReq := &Request{
1881 Method: "CONNECT",
1882 URL: &url.URL{Opaque: cm.targetAddr},
1883 Host: cm.targetAddr,
1884 Header: hdr,
1885 }
1886
1887
1888
1889
1890 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1891 defer cancel()
1892
1893 didReadResponse := make(chan struct{})
1894 var (
1895 resp *Response
1896 err error
1897 )
1898
1899 go func() {
1900 defer close(didReadResponse)
1901 err = connectReq.Write(conn)
1902 if err != nil {
1903 return
1904 }
1905
1906
1907 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1908 resp, err = ReadResponse(br, connectReq)
1909 }()
1910 select {
1911 case <-connectCtx.Done():
1912 conn.Close()
1913 <-didReadResponse
1914 return nil, connectCtx.Err()
1915 case <-didReadResponse:
1916
1917 }
1918 if err != nil {
1919 conn.Close()
1920 return nil, err
1921 }
1922
1923 if t.OnProxyConnectResponse != nil {
1924 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1925 if err != nil {
1926 conn.Close()
1927 return nil, err
1928 }
1929 }
1930
1931 if resp.StatusCode != 200 {
1932 _, text, ok := strings.Cut(resp.Status, " ")
1933 conn.Close()
1934 if !ok {
1935 return nil, errors.New("unknown status code")
1936 }
1937 return nil, errors.New(text)
1938 }
1939 }
1940
1941 if cm.proxyURL != nil && cm.targetScheme == "https" {
1942 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1943 return nil, err
1944 }
1945 }
1946
1947
1948 unencryptedHTTP2 := pconn.tlsState == nil &&
1949 t.Protocols != nil &&
1950 t.Protocols.UnencryptedHTTP2() &&
1951 !t.Protocols.HTTP1()
1952
1953 if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
1954 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
1955 h2, ok := altProto["https"].(newClientConner)
1956 if !ok {
1957 return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
1958 }
1959 alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
1960 if err != nil {
1961 pconn.conn.Close()
1962 return nil, err
1963 }
1964 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
1965 }
1966
1967 if unencryptedHTTP2 {
1968 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1969 if !ok {
1970 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1971 }
1972 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1973 if e, ok := alt.(erringRoundTripper); ok {
1974
1975 return nil, e.RoundTripErr()
1976 }
1977 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1978 }
1979
1980 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1981 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1982 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1983 if e, ok := alt.(erringRoundTripper); ok {
1984
1985 return nil, e.RoundTripErr()
1986 }
1987 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1988 }
1989 }
1990
1991 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1992 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1993
1994 go pconn.readLoop()
1995 go pconn.writeLoop()
1996 return pconn, nil
1997 }
1998
1999
2000
2001
2002
2003
2004
2005 type persistConnWriter struct {
2006 pc *persistConn
2007 }
2008
2009 func (w persistConnWriter) Write(p []byte) (n int, err error) {
2010 n, err = w.pc.conn.Write(p)
2011 w.pc.nwrite += int64(n)
2012 return
2013 }
2014
2015
2016
2017
2018 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
2019 n, err = io.Copy(w.pc.conn, r)
2020 w.pc.nwrite += n
2021 return
2022 }
2023
2024 var _ io.ReaderFrom = (*persistConnWriter)(nil)
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042 type connectMethod struct {
2043 _ incomparable
2044 proxyURL *url.URL
2045 targetScheme string
2046
2047
2048
2049 targetAddr string
2050 onlyH1 bool
2051 }
2052
2053 func (cm *connectMethod) key() connectMethodKey {
2054 proxyStr := ""
2055 targetAddr := cm.targetAddr
2056 if cm.proxyURL != nil {
2057 proxyStr = cm.proxyURL.String()
2058 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2059 targetAddr = ""
2060 }
2061 }
2062 return connectMethodKey{
2063 proxy: proxyStr,
2064 scheme: cm.targetScheme,
2065 addr: targetAddr,
2066 onlyH1: cm.onlyH1,
2067 }
2068 }
2069
2070
2071 func (cm *connectMethod) scheme() string {
2072 if cm.proxyURL != nil {
2073 return cm.proxyURL.Scheme
2074 }
2075 return cm.targetScheme
2076 }
2077
2078
2079 func (cm *connectMethod) addr() string {
2080 if cm.proxyURL != nil {
2081 return canonicalAddr(cm.proxyURL)
2082 }
2083 return cm.targetAddr
2084 }
2085
2086
2087
2088 func (cm *connectMethod) tlsHost() string {
2089 h := cm.targetAddr
2090 return removePort(h)
2091 }
2092
2093
2094
2095
2096 type connectMethodKey struct {
2097 proxy, scheme, addr string
2098 onlyH1 bool
2099 }
2100
2101 func (k connectMethodKey) String() string {
2102
2103 var h1 string
2104 if k.onlyH1 {
2105 h1 = ",h1"
2106 }
2107 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2108 }
2109
2110
2111
2112 type persistConn struct {
2113
2114
2115
2116 alt RoundTripper
2117
2118 t *Transport
2119 cacheKey connectMethodKey
2120 conn net.Conn
2121 tlsState *tls.ConnectionState
2122 br *bufio.Reader
2123 bw *bufio.Writer
2124 nwrite int64
2125 reqch chan requestAndChan
2126 writech chan writeRequest
2127 closech chan struct{}
2128 availch chan struct{}
2129 isProxy bool
2130 sawEOF bool
2131 isClientConn bool
2132 readLimit int64
2133
2134
2135
2136
2137 writeErrCh chan error
2138
2139 writeLoopDone chan struct{}
2140
2141
2142 idleAt time.Time
2143 idleTimer *time.Timer
2144
2145 mu sync.Mutex
2146 numExpectedResponses int
2147 closed error
2148 canceledErr error
2149 reused bool
2150 reserved bool
2151 inFlight bool
2152 internalStateHook func()
2153
2154
2155
2156
2157 mutateHeaderFunc func(Header)
2158 }
2159
2160 func (pc *persistConn) maxHeaderResponseSize() int64 {
2161 return pc.t.maxHeaderResponseSize()
2162 }
2163
2164 func (pc *persistConn) Read(p []byte) (n int, err error) {
2165 if pc.readLimit <= 0 {
2166 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2167 }
2168 if int64(len(p)) > pc.readLimit {
2169 p = p[:pc.readLimit]
2170 }
2171 n, err = pc.conn.Read(p)
2172 if err == io.EOF {
2173 pc.sawEOF = true
2174 }
2175 pc.readLimit -= int64(n)
2176 return
2177 }
2178
2179
2180 func (pc *persistConn) isBroken() bool {
2181 pc.mu.Lock()
2182 b := pc.closed != nil
2183 pc.mu.Unlock()
2184 return b
2185 }
2186
2187
2188
2189 func (pc *persistConn) canceled() error {
2190 pc.mu.Lock()
2191 defer pc.mu.Unlock()
2192 return pc.canceledErr
2193 }
2194
2195
2196 func (pc *persistConn) isReused() bool {
2197 pc.mu.Lock()
2198 r := pc.reused
2199 pc.mu.Unlock()
2200 return r
2201 }
2202
2203 func (pc *persistConn) cancelRequest(err error) {
2204 pc.mu.Lock()
2205 defer pc.mu.Unlock()
2206 pc.canceledErr = err
2207 pc.closeLocked(errRequestCanceled)
2208 }
2209
2210
2211
2212
2213 func (pc *persistConn) closeConnIfStillIdle() {
2214 t := pc.t
2215 t.idleMu.Lock()
2216 defer t.idleMu.Unlock()
2217 if _, ok := t.idleLRU.m[pc]; !ok {
2218
2219 return
2220 }
2221 t.removeIdleConnLocked(pc)
2222 pc.close(errIdleConnTimeout)
2223 }
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2234 if err == nil {
2235 return nil
2236 }
2237
2238
2239
2240
2241
2242
2243
2244
2245 <-pc.writeLoopDone
2246
2247
2248
2249
2250 if cerr := pc.canceled(); cerr != nil {
2251 return cerr
2252 }
2253
2254
2255 req.mu.Lock()
2256 reqErr := req.err
2257 req.mu.Unlock()
2258 if reqErr != nil {
2259 return reqErr
2260 }
2261
2262 if err == errServerClosedIdle {
2263
2264 return err
2265 }
2266
2267 if _, ok := err.(transportReadFromServerError); ok {
2268 if pc.nwrite == startBytesWritten {
2269 return nothingWrittenError{err}
2270 }
2271
2272 return err
2273 }
2274 if pc.isBroken() {
2275 if pc.nwrite == startBytesWritten {
2276 return nothingWrittenError{err}
2277 }
2278 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2279 }
2280 return err
2281 }
2282
2283
2284
2285
2286 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2287
2288
2289
2290
2291 const maxPostCloseReadBytes = 256 << 10
2292
2293
2294
2295
2296 const maxPostCloseReadTime = 50 * time.Millisecond
2297
2298 func maybeDrainBody(body io.Reader) bool {
2299 drainedCh := make(chan bool, 1)
2300 go func() {
2301 if _, err := io.CopyN(io.Discard, body, maxPostCloseReadBytes+1); err == io.EOF {
2302 drainedCh <- true
2303 } else {
2304 drainedCh <- false
2305 }
2306 }()
2307 select {
2308 case drained := <-drainedCh:
2309 return drained
2310 case <-time.After(maxPostCloseReadTime):
2311 return false
2312 }
2313 }
2314
2315 func (pc *persistConn) readLoop() {
2316 closeErr := errReadLoopExiting
2317 defer func() {
2318 pc.close(closeErr)
2319 pc.t.removeIdleConn(pc)
2320 if pc.internalStateHook != nil {
2321 pc.internalStateHook()
2322 }
2323 }()
2324
2325 tryPutIdleConn := func(treq *transportRequest) bool {
2326 trace := treq.trace
2327 if err := pc.t.tryPutIdleConn(pc); err != nil {
2328 closeErr = err
2329 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2330 trace.PutIdleConn(err)
2331 }
2332 return false
2333 }
2334 if trace != nil && trace.PutIdleConn != nil {
2335 trace.PutIdleConn(nil)
2336 }
2337 return true
2338 }
2339
2340
2341
2342
2343 eofc := make(chan struct{})
2344 defer close(eofc)
2345
2346
2347 testHookMu.Lock()
2348 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2349 testHookMu.Unlock()
2350
2351 alive := true
2352 for alive {
2353 pc.readLimit = pc.maxHeaderResponseSize()
2354 _, err := pc.br.Peek(1)
2355
2356 pc.mu.Lock()
2357 if pc.numExpectedResponses == 0 {
2358 pc.readLoopPeekFailLocked(err)
2359 pc.mu.Unlock()
2360 return
2361 }
2362 pc.mu.Unlock()
2363
2364 rc := <-pc.reqch
2365 trace := rc.treq.trace
2366
2367 var resp *Response
2368 if err == nil {
2369 resp, err = pc.readResponse(rc, trace)
2370 } else {
2371 err = transportReadFromServerError{err}
2372 closeErr = err
2373 }
2374
2375 if err != nil {
2376 if pc.readLimit <= 0 {
2377 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2378 }
2379
2380 select {
2381 case rc.ch <- responseAndError{err: err}:
2382 case <-rc.callerGone:
2383 return
2384 }
2385 return
2386 }
2387 pc.readLimit = maxInt64
2388
2389 pc.mu.Lock()
2390 pc.numExpectedResponses--
2391 pc.mu.Unlock()
2392
2393 bodyWritable := resp.bodyIsWritable()
2394 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2395
2396 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2397
2398
2399
2400 alive = false
2401 }
2402
2403 if !hasBody || bodyWritable {
2404
2405
2406
2407
2408
2409 alive = alive &&
2410 !pc.sawEOF &&
2411 pc.wroteRequest() &&
2412 tryPutIdleConn(rc.treq)
2413
2414 if bodyWritable {
2415 closeErr = errCallerOwnsConn
2416 }
2417
2418 select {
2419 case rc.ch <- responseAndError{res: resp}:
2420 case <-rc.callerGone:
2421 return
2422 }
2423
2424 rc.treq.cancel(errRequestDone)
2425
2426
2427
2428
2429 testHookReadLoopBeforeNextRead()
2430 continue
2431 }
2432
2433 waitForBodyRead := make(chan bool, 2)
2434 body := &bodyEOFSignal{
2435 body: resp.Body,
2436 earlyCloseFn: func() error {
2437 waitForBodyRead <- false
2438 <-eofc
2439 return nil
2440
2441 },
2442 fn: func(err error) error {
2443 isEOF := err == io.EOF
2444 waitForBodyRead <- isEOF
2445 if isEOF {
2446 <-eofc
2447 } else if err != nil {
2448 if cerr := pc.canceled(); cerr != nil {
2449 return cerr
2450 }
2451 }
2452 return err
2453 },
2454 }
2455
2456 resp.Body = body
2457 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2458 resp.Body = &gzipReader{body: body}
2459 resp.Header.Del("Content-Encoding")
2460 resp.Header.Del("Content-Length")
2461 resp.ContentLength = -1
2462 resp.Uncompressed = true
2463 }
2464
2465 select {
2466 case rc.ch <- responseAndError{res: resp}:
2467 case <-rc.callerGone:
2468 return
2469 }
2470
2471
2472
2473
2474 select {
2475 case bodyEOF := <-waitForBodyRead:
2476 tryDrain := !bodyEOF && resp.ContentLength <= maxPostCloseReadBytes
2477 if tryDrain {
2478 eofc <- struct{}{}
2479 bodyEOF = maybeDrainBody(body.body)
2480 }
2481 alive = alive &&
2482 bodyEOF &&
2483 !pc.sawEOF &&
2484 pc.wroteRequest() &&
2485 tryPutIdleConn(rc.treq)
2486 if !tryDrain && bodyEOF {
2487 eofc <- struct{}{}
2488 }
2489 case <-rc.treq.ctx.Done():
2490 alive = false
2491 pc.cancelRequest(context.Cause(rc.treq.ctx))
2492 case <-pc.closech:
2493 alive = false
2494 }
2495
2496 rc.treq.cancel(errRequestDone)
2497 testHookReadLoopBeforeNextRead()
2498 }
2499 }
2500
2501 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2502 if pc.closed != nil {
2503 return
2504 }
2505 if n := pc.br.Buffered(); n > 0 {
2506 buf, _ := pc.br.Peek(n)
2507 if is408Message(buf) {
2508 pc.closeLocked(errServerClosedIdle)
2509 return
2510 } else {
2511 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2512 }
2513 }
2514 if peekErr == io.EOF {
2515
2516 pc.closeLocked(errServerClosedIdle)
2517 } else {
2518 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2519 }
2520 }
2521
2522
2523
2524
2525 func is408Message(buf []byte) bool {
2526 if len(buf) < len("HTTP/1.x 408") {
2527 return false
2528 }
2529 if string(buf[:7]) != "HTTP/1." {
2530 return false
2531 }
2532 return string(buf[8:12]) == " 408"
2533 }
2534
2535
2536
2537
2538 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2539 if trace != nil && trace.GotFirstResponseByte != nil {
2540 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2541 trace.GotFirstResponseByte()
2542 }
2543 }
2544
2545 continueCh := rc.continueCh
2546 for {
2547 resp, err = ReadResponse(pc.br, rc.treq.Request)
2548 if err != nil {
2549 return
2550 }
2551 resCode := resp.StatusCode
2552 if continueCh != nil && resCode == StatusContinue {
2553 if trace != nil && trace.Got100Continue != nil {
2554 trace.Got100Continue()
2555 }
2556 continueCh <- struct{}{}
2557 continueCh = nil
2558 }
2559 is1xx := 100 <= resCode && resCode <= 199
2560
2561 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2562 if is1xxNonTerminal {
2563 if trace != nil && trace.Got1xxResponse != nil {
2564 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2565 return nil, err
2566 }
2567
2568
2569
2570
2571
2572
2573
2574 pc.readLimit = pc.maxHeaderResponseSize()
2575 }
2576 continue
2577 }
2578 break
2579 }
2580 if resp.isProtocolSwitch() {
2581 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2582 }
2583 if continueCh != nil {
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596 if resp.Close || rc.treq.Request.Close {
2597 close(continueCh)
2598 } else {
2599 continueCh <- struct{}{}
2600 }
2601 }
2602
2603 resp.TLS = pc.tlsState
2604 return
2605 }
2606
2607
2608
2609
2610 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2611 if continueCh == nil {
2612 return nil
2613 }
2614 return func() bool {
2615 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2616 defer timer.Stop()
2617
2618 select {
2619 case _, ok := <-continueCh:
2620 return ok
2621 case <-timer.C:
2622 return true
2623 case <-pc.closech:
2624 return false
2625 }
2626 }
2627 }
2628
2629 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2630 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2631 if br.Buffered() != 0 {
2632 body.br = br
2633 }
2634 return body
2635 }
2636
2637
2638
2639
2640
2641
2642 type readWriteCloserBody struct {
2643 _ incomparable
2644 br *bufio.Reader
2645 io.ReadWriteCloser
2646 }
2647
2648 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2649 if b.br != nil {
2650 if n := b.br.Buffered(); len(p) > n {
2651 p = p[:n]
2652 }
2653 n, err = b.br.Read(p)
2654 if b.br.Buffered() == 0 {
2655 b.br = nil
2656 }
2657 return n, err
2658 }
2659 return b.ReadWriteCloser.Read(p)
2660 }
2661
2662 func (b *readWriteCloserBody) CloseWrite() error {
2663 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2664 return cw.CloseWrite()
2665 }
2666 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2667 }
2668
2669
2670 type nothingWrittenError struct {
2671 error
2672 }
2673
2674 func (nwe nothingWrittenError) Unwrap() error {
2675 return nwe.error
2676 }
2677
2678 func (pc *persistConn) writeLoop() {
2679 defer close(pc.writeLoopDone)
2680 for {
2681 select {
2682 case wr := <-pc.writech:
2683 startBytesWritten := pc.nwrite
2684 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2685 if bre, ok := err.(requestBodyReadError); ok {
2686 err = bre.error
2687
2688
2689
2690
2691
2692
2693
2694 wr.req.setError(err)
2695 }
2696 if err == nil {
2697 err = pc.bw.Flush()
2698 }
2699 if err != nil {
2700 if pc.nwrite == startBytesWritten {
2701 err = nothingWrittenError{err}
2702 }
2703 }
2704 pc.writeErrCh <- err
2705 wr.ch <- err
2706 if err != nil {
2707 pc.close(err)
2708 return
2709 }
2710 case <-pc.closech:
2711 return
2712 }
2713 }
2714 }
2715
2716
2717
2718
2719
2720
2721
2722 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2723
2724
2725
2726 func (pc *persistConn) wroteRequest() bool {
2727 select {
2728 case err := <-pc.writeErrCh:
2729
2730
2731 return err == nil
2732 default:
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2744 defer t.Stop()
2745 select {
2746 case err := <-pc.writeErrCh:
2747 return err == nil
2748 case <-t.C:
2749 return false
2750 }
2751 }
2752 }
2753
2754
2755
2756 type responseAndError struct {
2757 _ incomparable
2758 res *Response
2759 err error
2760 }
2761
2762 type requestAndChan struct {
2763 _ incomparable
2764 treq *transportRequest
2765 ch chan responseAndError
2766
2767
2768
2769
2770 addedGzip bool
2771
2772
2773
2774
2775
2776 continueCh chan<- struct{}
2777
2778 callerGone <-chan struct{}
2779 }
2780
2781
2782
2783
2784
2785 type writeRequest struct {
2786 req *transportRequest
2787 ch chan<- error
2788
2789
2790
2791
2792 continueCh <-chan struct{}
2793 }
2794
2795
2796
2797 type timeoutError struct {
2798 err string
2799 }
2800
2801 func (e *timeoutError) Error() string { return e.err }
2802 func (e *timeoutError) Timeout() bool { return true }
2803 func (e *timeoutError) Temporary() bool { return true }
2804 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2805
2806 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2807
2808
2809
2810 var errRequestCanceled = http2errRequestCanceled
2811 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2812
2813
2814
2815 var errRequestDone = errors.New("net/http: request completed")
2816
2817 func nop() {}
2818
2819
2820 var (
2821 testHookEnterRoundTrip = nop
2822 testHookWaitResLoop = nop
2823 testHookRoundTripRetried = nop
2824 testHookPrePendingDial = nop
2825 testHookPostPendingDial = nop
2826
2827 testHookMu sync.Locker = fakeLocker{}
2828 testHookReadLoopBeforeNextRead = nop
2829 )
2830
2831 func (pc *persistConn) waitForAvailability(ctx context.Context) error {
2832 select {
2833 case <-pc.availch:
2834 return nil
2835 case <-pc.closech:
2836 return pc.closed
2837 case <-ctx.Done():
2838 return ctx.Err()
2839 }
2840 }
2841
2842 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2843 testHookEnterRoundTrip()
2844
2845 pc.mu.Lock()
2846 if pc.isClientConn {
2847 if !pc.reserved {
2848 pc.mu.Unlock()
2849 if err := pc.waitForAvailability(req.ctx); err != nil {
2850 return nil, err
2851 }
2852 pc.mu.Lock()
2853 }
2854 pc.reserved = false
2855 pc.inFlight = true
2856 }
2857 pc.numExpectedResponses++
2858 headerFn := pc.mutateHeaderFunc
2859 pc.mu.Unlock()
2860
2861 if headerFn != nil {
2862 headerFn(req.extraHeaders())
2863 }
2864
2865
2866
2867
2868
2869 requestedGzip := false
2870 if !pc.t.DisableCompression &&
2871 req.Header.Get("Accept-Encoding") == "" &&
2872 req.Header.Get("Range") == "" &&
2873 req.Method != "HEAD" {
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886 requestedGzip = true
2887 req.extraHeaders().Set("Accept-Encoding", "gzip")
2888 }
2889
2890 var continueCh chan struct{}
2891 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2892 continueCh = make(chan struct{}, 1)
2893 }
2894
2895 if pc.t.DisableKeepAlives &&
2896 !req.wantsClose() &&
2897 !isProtocolSwitchHeader(req.Header) {
2898 req.extraHeaders().Set("Connection", "close")
2899 }
2900
2901 gone := make(chan struct{})
2902 defer close(gone)
2903
2904 const debugRoundTrip = false
2905
2906
2907
2908
2909 startBytesWritten := pc.nwrite
2910 writeErrCh := make(chan error, 1)
2911 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2912
2913 resc := make(chan responseAndError)
2914 pc.reqch <- requestAndChan{
2915 treq: req,
2916 ch: resc,
2917 addedGzip: requestedGzip,
2918 continueCh: continueCh,
2919 callerGone: gone,
2920 }
2921
2922 handleResponse := func(re responseAndError) (*Response, error) {
2923 if (re.res == nil) == (re.err == nil) {
2924 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2925 }
2926 if debugRoundTrip {
2927 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2928 }
2929 if re.err != nil {
2930 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2931 }
2932 return re.res, nil
2933 }
2934
2935 var respHeaderTimer <-chan time.Time
2936 ctxDoneChan := req.ctx.Done()
2937 pcClosed := pc.closech
2938 for {
2939 testHookWaitResLoop()
2940 select {
2941 case err := <-writeErrCh:
2942 if debugRoundTrip {
2943 req.logf("writeErrCh recv: %T/%#v", err, err)
2944 }
2945 if err != nil {
2946 pc.close(fmt.Errorf("write error: %w", err))
2947 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2948 }
2949 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2950 if debugRoundTrip {
2951 req.logf("starting timer for %v", d)
2952 }
2953 timer := time.NewTimer(d)
2954 defer timer.Stop()
2955 respHeaderTimer = timer.C
2956 }
2957 case <-pcClosed:
2958 select {
2959 case re := <-resc:
2960
2961
2962
2963 return handleResponse(re)
2964 default:
2965 }
2966 if debugRoundTrip {
2967 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2968 }
2969 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2970 case <-respHeaderTimer:
2971 if debugRoundTrip {
2972 req.logf("timeout waiting for response headers.")
2973 }
2974 pc.close(errTimeout)
2975 return nil, errTimeout
2976 case re := <-resc:
2977 return handleResponse(re)
2978 case <-ctxDoneChan:
2979 select {
2980 case re := <-resc:
2981
2982
2983
2984 return handleResponse(re)
2985 default:
2986 }
2987 pc.cancelRequest(context.Cause(req.ctx))
2988 }
2989 }
2990 }
2991
2992
2993
2994 type tLogKey struct{}
2995
2996 func (tr *transportRequest) logf(format string, args ...any) {
2997 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2998 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2999 }
3000 }
3001
3002
3003
3004 func (pc *persistConn) markReused() {
3005 pc.mu.Lock()
3006 pc.reused = true
3007 pc.mu.Unlock()
3008 }
3009
3010
3011
3012
3013
3014
3015 func (pc *persistConn) close(err error) {
3016 pc.mu.Lock()
3017 defer pc.mu.Unlock()
3018 pc.closeLocked(err)
3019 }
3020
3021 func (pc *persistConn) closeLocked(err error) {
3022 if err == nil {
3023 panic("nil error")
3024 }
3025 if pc.closed == nil {
3026 pc.closed = err
3027 pc.t.decConnsPerHost(pc.cacheKey)
3028
3029
3030 if pc.alt == nil {
3031 if err != errCallerOwnsConn {
3032 pc.conn.Close()
3033 }
3034 close(pc.closech)
3035 }
3036 }
3037 pc.mutateHeaderFunc = nil
3038 }
3039
3040 func schemePort(scheme string) string {
3041 switch scheme {
3042 case "http":
3043 return "80"
3044 case "https":
3045 return "443"
3046 case "socks5", "socks5h":
3047 return "1080"
3048 default:
3049 return ""
3050 }
3051 }
3052
3053 func idnaASCIIFromURL(url *url.URL) string {
3054 addr := url.Hostname()
3055 if v, err := idnaASCII(addr); err == nil {
3056 addr = v
3057 }
3058 return addr
3059 }
3060
3061
3062 func canonicalAddr(url *url.URL) string {
3063 port := url.Port()
3064 if port == "" {
3065 port = schemePort(url.Scheme)
3066 }
3067 return net.JoinHostPort(idnaASCIIFromURL(url), port)
3068 }
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081 type bodyEOFSignal struct {
3082 body io.ReadCloser
3083 mu sync.Mutex
3084 closed bool
3085 rerr error
3086 fn func(error) error
3087 earlyCloseFn func() error
3088 }
3089
3090 var errReadOnClosedResBody = errors.New("http: read on closed response body")
3091 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
3092
3093 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
3094 es.mu.Lock()
3095 closed, rerr := es.closed, es.rerr
3096 es.mu.Unlock()
3097 if closed {
3098 return 0, errReadOnClosedResBody
3099 }
3100 if rerr != nil {
3101 return 0, rerr
3102 }
3103
3104 n, err = es.body.Read(p)
3105 if err != nil {
3106 es.mu.Lock()
3107 defer es.mu.Unlock()
3108 if es.rerr == nil {
3109 es.rerr = err
3110 }
3111 err = es.condfn(err)
3112 }
3113 return
3114 }
3115
3116 func (es *bodyEOFSignal) Close() error {
3117 es.mu.Lock()
3118 defer es.mu.Unlock()
3119 if es.closed {
3120 return nil
3121 }
3122 es.closed = true
3123 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3124 return es.earlyCloseFn()
3125 }
3126 err := es.body.Close()
3127 return es.condfn(err)
3128 }
3129
3130
3131 func (es *bodyEOFSignal) condfn(err error) error {
3132 if es.fn == nil {
3133 return err
3134 }
3135 err = es.fn(err)
3136 es.fn = nil
3137 return err
3138 }
3139
3140
3141
3142
3143
3144 type gzipReader struct {
3145 _ incomparable
3146 body *bodyEOFSignal
3147 mu sync.Mutex
3148 zr *gzip.Reader
3149 zerr error
3150 }
3151
3152 type eofReader struct{}
3153
3154 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3155 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3156
3157 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3158
3159
3160 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3161 zr := gzipPool.Get().(*gzip.Reader)
3162 if err := zr.Reset(r); err != nil {
3163 gzipPoolPut(zr)
3164 return nil, err
3165 }
3166 return zr, nil
3167 }
3168
3169
3170 func gzipPoolPut(zr *gzip.Reader) {
3171
3172
3173 var r flate.Reader = eofReader{}
3174 zr.Reset(r)
3175 gzipPool.Put(zr)
3176 }
3177
3178
3179
3180 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3181 gz.mu.Lock()
3182 defer gz.mu.Unlock()
3183 if gz.zerr != nil {
3184 return nil, gz.zerr
3185 }
3186 if gz.zr == nil {
3187 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3188 if gz.zerr != nil {
3189 return nil, gz.zerr
3190 }
3191 }
3192 ret := gz.zr
3193 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3194 return ret, nil
3195 }
3196
3197
3198 func (gz *gzipReader) release(zr *gzip.Reader) {
3199 gz.mu.Lock()
3200 defer gz.mu.Unlock()
3201 if gz.zerr == errConcurrentReadOnResBody {
3202 gz.zr, gz.zerr = zr, nil
3203 } else {
3204 gzipPoolPut(zr)
3205 }
3206 }
3207
3208
3209
3210 func (gz *gzipReader) close() {
3211 gz.mu.Lock()
3212 defer gz.mu.Unlock()
3213 if gz.zerr == nil && gz.zr != nil {
3214 gzipPoolPut(gz.zr)
3215 gz.zr = nil
3216 }
3217 gz.zerr = errReadOnClosedResBody
3218 }
3219
3220 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3221 zr, err := gz.acquire()
3222 if err != nil {
3223 return 0, err
3224 }
3225 defer gz.release(zr)
3226
3227 return zr.Read(p)
3228 }
3229
3230 func (gz *gzipReader) Close() error {
3231 gz.close()
3232
3233 return gz.body.Close()
3234 }
3235
3236 type tlsHandshakeTimeoutError struct{}
3237
3238 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3239 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3240 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3241
3242
3243
3244
3245 type fakeLocker struct{}
3246
3247 func (fakeLocker) Lock() {}
3248 func (fakeLocker) Unlock() {}
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3264 if cfg == nil {
3265 return &tls.Config{}
3266 }
3267 return cfg.Clone()
3268 }
3269
3270 type connLRU struct {
3271 ll *list.List
3272 m map[*persistConn]*list.Element
3273 }
3274
3275
3276 func (cl *connLRU) add(pc *persistConn) {
3277 if cl.ll == nil {
3278 cl.ll = list.New()
3279 cl.m = make(map[*persistConn]*list.Element)
3280 }
3281 ele := cl.ll.PushFront(pc)
3282 if _, ok := cl.m[pc]; ok {
3283 panic("persistConn was already in LRU")
3284 }
3285 cl.m[pc] = ele
3286 }
3287
3288 func (cl *connLRU) removeOldest() *persistConn {
3289 ele := cl.ll.Back()
3290 pc := ele.Value.(*persistConn)
3291 cl.ll.Remove(ele)
3292 delete(cl.m, pc)
3293 return pc
3294 }
3295
3296
3297 func (cl *connLRU) remove(pc *persistConn) {
3298 if ele, ok := cl.m[pc]; ok {
3299 cl.ll.Remove(ele)
3300 delete(cl.m, pc)
3301 }
3302 }
3303
3304
3305 func (cl *connLRU) len() int {
3306 return len(cl.m)
3307 }
3308
View as plain text