Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/ascii"
29 "net/textproto"
30 "net/url"
31 "reflect"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36 _ "unsafe"
37
38 "golang.org/x/net/http/httpguts"
39 "golang.org/x/net/http/httpproxy"
40 )
41
42
43
44
45
46
47 var DefaultTransport RoundTripper = &Transport{
48 Proxy: ProxyFromEnvironment,
49 DialContext: defaultTransportDialContext(&net.Dialer{
50 Timeout: 30 * time.Second,
51 KeepAlive: 30 * time.Second,
52 }),
53 ForceAttemptHTTP2: true,
54 MaxIdleConns: 100,
55 IdleConnTimeout: 90 * time.Second,
56 TLSHandshakeTimeout: 10 * time.Second,
57 ExpectContinueTimeout: 1 * time.Second,
58 }
59
60
61
62 const DefaultMaxIdleConnsPerHost = 2
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 type Transport struct {
99 idleMu sync.Mutex
100 closeIdle bool
101 idleConn map[connectMethodKey][]*persistConn
102 idleConnWait map[connectMethodKey]wantConnQueue
103 idleLRU connLRU
104
105 reqMu sync.Mutex
106 reqCanceler map[*Request]context.CancelCauseFunc
107
108 altMu sync.Mutex
109 altProto atomic.Value
110
111 connsPerHostMu sync.Mutex
112 connsPerHost map[connectMethodKey]int
113 connsPerHostWait map[connectMethodKey]wantConnQueue
114 dialsInProgress wantConnQueue
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 Proxy func(*Request) (*url.URL, error)
131
132
133
134
135 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
136
137
138
139
140
141
142
143
144
145 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
146
147
148
149
150
151
152
153
154
155
156
157 Dial func(network, addr string) (net.Conn, error)
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
173
174
175
176
177
178
179
180 DialTLS func(network, addr string) (net.Conn, error)
181
182
183
184
185
186 TLSClientConfig *tls.Config
187
188
189
190 TLSHandshakeTimeout time.Duration
191
192
193
194
195
196
197 DisableKeepAlives bool
198
199
200
201
202
203
204
205
206
207 DisableCompression bool
208
209
210
211 MaxIdleConns int
212
213
214
215
216 MaxIdleConnsPerHost int
217
218
219
220
221
222
223 MaxConnsPerHost int
224
225
226
227
228
229 IdleConnTimeout time.Duration
230
231
232
233
234
235 ResponseHeaderTimeout time.Duration
236
237
238
239
240
241
242
243
244 ExpectContinueTimeout time.Duration
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
260
261
262
263
264 ProxyConnectHeader Header
265
266
267
268
269
270
271
272
273 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
274
275
276
277
278
279
280 MaxResponseHeaderBytes int64
281
282
283
284
285 WriteBufferSize int
286
287
288
289
290 ReadBufferSize int
291
292
293
294 nextProtoOnce sync.Once
295 closeIdleFunc closeIdleConnectionser
296 h2Transport *http2Transport
297 h2Config http2ExternalTransportConfig
298 h3Transport dialClientConner
299 tlsNextProtoWasNil bool
300
301
302
303
304
305
306 ForceAttemptHTTP2 bool
307
308
309 HTTP2 *HTTP2Config
310
311
312
313
314
315
316
317
318
319 Protocols *Protocols
320 }
321
322 func (t *Transport) writeBufferSize() int {
323 if t.WriteBufferSize > 0 {
324 return t.WriteBufferSize
325 }
326 return 4 << 10
327 }
328
329 func (t *Transport) readBufferSize() int {
330 if t.ReadBufferSize > 0 {
331 return t.ReadBufferSize
332 }
333 return 4 << 10
334 }
335
336 func (t *Transport) maxHeaderResponseSize() int64 {
337 if t.MaxResponseHeaderBytes > 0 {
338 return t.MaxResponseHeaderBytes
339 }
340 return 10 << 20
341 }
342
343
344 func (t *Transport) Clone() *Transport {
345 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
346 t2 := &Transport{
347 Proxy: t.Proxy,
348 OnProxyConnectResponse: t.OnProxyConnectResponse,
349 DialContext: t.DialContext,
350 Dial: t.Dial,
351 DialTLS: t.DialTLS,
352 DialTLSContext: t.DialTLSContext,
353 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
354 DisableKeepAlives: t.DisableKeepAlives,
355 DisableCompression: t.DisableCompression,
356 MaxIdleConns: t.MaxIdleConns,
357 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
358 MaxConnsPerHost: t.MaxConnsPerHost,
359 IdleConnTimeout: t.IdleConnTimeout,
360 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
361 ExpectContinueTimeout: t.ExpectContinueTimeout,
362 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
363 GetProxyConnectHeader: t.GetProxyConnectHeader,
364 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
365 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
366 WriteBufferSize: t.WriteBufferSize,
367 ReadBufferSize: t.ReadBufferSize,
368 }
369 if t.TLSClientConfig != nil {
370 t2.TLSClientConfig = t.TLSClientConfig.Clone()
371 }
372 if t.HTTP2 != nil {
373 t2.HTTP2 = &HTTP2Config{}
374 *t2.HTTP2 = *t.HTTP2
375 }
376 if t.Protocols != nil {
377 t2.Protocols = &Protocols{}
378 *t2.Protocols = *t.Protocols
379 }
380 if !t.tlsNextProtoWasNil {
381 npm := maps.Clone(t.TLSNextProto)
382 if npm == nil {
383 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
384 }
385 t2.TLSNextProto = npm
386 }
387 return t2
388 }
389
390 type dialClientConner interface {
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416 DialClientConn(ctx context.Context, address string, proxy *url.URL, internalStateHook func()) (RoundTripper, error)
417 }
418
419 type closeIdleConnectionser interface {
420
421
422
423
424
425
426
427
428 CloseIdleConnections()
429 }
430
431 func (t *Transport) hasCustomTLSDialer() bool {
432 return t.DialTLS != nil || t.DialTLSContext != nil
433 }
434
435 var http2client = godebug.New("http2client")
436
437
438
439 func (t *Transport) onceSetNextProtoDefaults() {
440 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
441 if http2client.Value() == "0" {
442 http2client.IncNonDefault()
443 return
444 }
445
446
447
448
449
450
451 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
452 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
453 if v := rv.Field(0); v.CanInterface() {
454 if h2i, ok := v.Interface().(closeIdleConnectionser); ok {
455 t.closeIdleFunc = h2i
456 return
457 }
458 }
459 }
460
461 if _, ok := t.TLSNextProto["h2"]; ok {
462
463 return
464 }
465 protocols := t.protocols()
466 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
467 return
468 }
469 if omitBundledHTTP2 {
470 return
471 }
472
473 t.configureHTTP2(protocols)
474 }
475
476 func (t *Transport) protocols() Protocols {
477 if t.Protocols != nil {
478 return *t.Protocols
479 }
480 var p Protocols
481 p.SetHTTP1(true)
482 switch {
483 case t.TLSNextProto != nil:
484
485
486 if t.TLSNextProto["h2"] != nil {
487 p.SetHTTP2(true)
488 }
489 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
490
491
492
493
494
495
496 case http2client.Value() == "0":
497 default:
498 p.SetHTTP2(true)
499 }
500 return p
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
520 return envProxyFunc()(req.URL)
521 }
522
523
524
525 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
526 return func(*Request) (*url.URL, error) {
527 return fixedURL, nil
528 }
529 }
530
531
532
533
534 type transportRequest struct {
535 *Request
536 extra Header
537 trace *httptrace.ClientTrace
538
539 ctx context.Context
540 cancel context.CancelCauseFunc
541
542 mu sync.Mutex
543 err error
544 }
545
546 func (tr *transportRequest) extraHeaders() Header {
547 if tr.extra == nil {
548 tr.extra = make(Header)
549 }
550 return tr.extra
551 }
552
553 func (tr *transportRequest) setError(err error) {
554 tr.mu.Lock()
555 if tr.err == nil {
556 tr.err = err
557 }
558 tr.mu.Unlock()
559 }
560
561
562
563 func (t *Transport) useRegisteredProtocol(req *Request) bool {
564 if req.URL.Scheme == "https" && req.requiresHTTP1() {
565
566
567
568
569 return false
570 }
571 return true
572 }
573
574
575
576
577 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
578 if !t.useRegisteredProtocol(req) {
579 return nil
580 }
581 if req.URL.Scheme == "https" && t.h2Config != nil && t.h2Config.ExternalRoundTrip() {
582
583
584
585
586
587
588 return t.h2Config
589 }
590 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
591 return altProto[req.URL.Scheme]
592 }
593
594 func validateHeaders(hdrs Header) string {
595 for k, vv := range hdrs {
596 if !httpguts.ValidHeaderFieldName(k) {
597 return fmt.Sprintf("field name %q", k)
598 }
599 for _, v := range vv {
600 if !httpguts.ValidHeaderFieldValue(v) {
601
602
603 return fmt.Sprintf("field value for %q", k)
604 }
605 }
606 }
607 return ""
608 }
609
610
611 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
612 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
613 ctx := req.Context()
614 trace := httptrace.ContextClientTrace(ctx)
615
616 if req.URL == nil {
617 req.closeBody()
618 return nil, errors.New("http: nil Request.URL")
619 }
620 if req.Header == nil {
621 req.closeBody()
622 return nil, errors.New("http: nil Request.Header")
623 }
624 scheme := req.URL.Scheme
625 isHTTP := scheme == "http" || scheme == "https"
626 if isHTTP {
627
628 if err := validateHeaders(req.Header); err != "" {
629 req.closeBody()
630 return nil, fmt.Errorf("net/http: invalid header %s", err)
631 }
632
633
634 if err := validateHeaders(req.Trailer); err != "" {
635 req.closeBody()
636 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
637 }
638 }
639
640 origReq := req
641 req = setupRewindBody(req)
642
643 if altRT := t.alternateRoundTripper(req); altRT != nil {
644 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
645 return resp, err
646 }
647 var err error
648 req, err = rewindBody(req)
649 if err != nil {
650 return nil, err
651 }
652 }
653 if !isHTTP {
654 req.closeBody()
655 return nil, badStringError("unsupported protocol scheme", scheme)
656 }
657 if req.Method != "" && !validMethod(req.Method) {
658 req.closeBody()
659 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
660 }
661 if req.URL.Host == "" {
662 req.closeBody()
663 return nil, errors.New("http: no Host in request URL")
664 }
665
666
667
668
669
670
671
672
673
674
675 ctx, cancel := context.WithCancelCause(req.Context())
676
677
678 if origReq.Cancel != nil {
679 go awaitLegacyCancel(ctx, cancel, origReq)
680 }
681
682
683
684
685
686 cancel = t.prepareTransportCancel(origReq, cancel)
687
688 defer func() {
689 if err != nil {
690 cancel(err)
691 }
692 }()
693
694 for {
695 select {
696 case <-ctx.Done():
697 req.closeBody()
698 return nil, context.Cause(ctx)
699 default:
700 }
701
702
703 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
704 cm, err := t.connectMethodForRequest(treq)
705 if err != nil {
706 req.closeBody()
707 return nil, err
708 }
709
710
711
712
713
714 pconn, err := t.getConn(treq, cm)
715 if err != nil {
716 req.closeBody()
717 return nil, err
718 }
719
720 var resp *Response
721 if pconn.alt != nil {
722
723 resp, err = pconn.alt.RoundTrip(req)
724 } else {
725 resp, err = pconn.roundTrip(treq)
726 }
727 if err == nil {
728 if pconn.alt != nil {
729
730
731
732
733
734 cancel(errRequestDone)
735 }
736 resp.Request = origReq
737 return resp, nil
738 }
739
740
741 if http2isNoCachedConnError(err) {
742 if t.removeIdleConn(pconn) {
743 t.decConnsPerHost(pconn.cacheKey)
744 }
745 } else if !pconn.shouldRetryRequest(req, err) {
746
747
748 if e, ok := err.(nothingWrittenError); ok {
749 err = e.error
750 }
751 if e, ok := err.(transportReadFromServerError); ok {
752 err = e.err
753 }
754 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
755
756
757
758 req.closeBody()
759 }
760 return nil, err
761 }
762 testHookRoundTripRetried()
763
764
765 req, err = rewindBody(req)
766 if err != nil {
767 return nil, err
768 }
769 }
770 }
771
772 func http2isNoCachedConnError(err error) bool {
773 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
774 return ok
775 }
776
777 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
778 select {
779 case <-req.Cancel:
780 cancel(errRequestCanceled)
781 case <-ctx.Done():
782 }
783 }
784
785 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
786
787 type readTrackingBody struct {
788 io.ReadCloser
789 didRead bool
790 didClose atomic.Bool
791 }
792
793 func (r *readTrackingBody) Read(data []byte) (int, error) {
794 r.didRead = true
795 return r.ReadCloser.Read(data)
796 }
797
798 func (r *readTrackingBody) Close() error {
799 if !r.didClose.CompareAndSwap(false, true) {
800 return nil
801 }
802 return r.ReadCloser.Close()
803 }
804
805
806
807
808
809 func setupRewindBody(req *Request) *Request {
810 if req.Body == nil || req.Body == NoBody {
811 return req
812 }
813 newReq := *req
814 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
815 return &newReq
816 }
817
818
819
820
821
822 func rewindBody(req *Request) (rewound *Request, err error) {
823 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
824 return req, nil
825 }
826 if !req.Body.(*readTrackingBody).didClose.Load() {
827 req.closeBody()
828 }
829 if req.GetBody == nil {
830 return nil, errCannotRewind
831 }
832 body, err := req.GetBody()
833 if err != nil {
834 return nil, err
835 }
836 newReq := *req
837 newReq.Body = &readTrackingBody{ReadCloser: body}
838 return &newReq, nil
839 }
840
841
842
843
844 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
845 if http2isNoCachedConnError(err) {
846
847
848
849
850
851
852 return true
853 }
854 if err == errMissingHost {
855
856 return false
857 }
858 if !pc.isReused() {
859
860
861
862
863
864
865
866 return false
867 }
868 if _, ok := err.(nothingWrittenError); ok {
869
870
871 return req.outgoingLength() == 0 || req.GetBody != nil
872 }
873 if !req.isReplayable() {
874
875 return false
876 }
877 if _, ok := err.(transportReadFromServerError); ok {
878
879
880 return true
881 }
882 if err == errServerClosedIdle {
883
884
885
886 return true
887 }
888 return false
889 }
890
891
892 var ErrSkipAltProtocol = internal.ErrSkipAltProtocol
893
894
895
896
897
898
899
900
901
902
903
904 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
905 if err := t.registerProtocol(scheme, rt); err != nil {
906 panic(err)
907 }
908 }
909
910 func (t *Transport) registerProtocol(scheme string, rt RoundTripper) error {
911 t.altMu.Lock()
912 defer t.altMu.Unlock()
913
914 if scheme == "http/2" {
915 if t.h2Config != nil {
916 panic("http: HTTP/2 Transport already registered")
917 }
918 var ok bool
919 if t.h2Config, ok = rt.(http2ExternalTransportConfig); !ok {
920 panic("http: HTTP/2 configuration does not implement ExternalTransportConfig")
921 }
922 t.h2Config.Registered(t)
923 }
924
925 if scheme == "http/3" {
926 var ok bool
927 if t.h3Transport, ok = rt.(dialClientConner); !ok {
928 panic("http: HTTP/3 RoundTripper does not implement DialClientConn")
929 }
930 }
931
932 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
933 if _, exists := oldMap[scheme]; exists {
934 return errors.New("protocol " + scheme + " already registered")
935 }
936 newMap := maps.Clone(oldMap)
937 if newMap == nil {
938 newMap = make(map[string]RoundTripper)
939 }
940 newMap[scheme] = rt
941 t.altProto.Store(newMap)
942 return nil
943 }
944
945
946
947
948
949 func (t *Transport) CloseIdleConnections() {
950 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
951 t.idleMu.Lock()
952 m := t.idleConn
953 t.idleConn = nil
954 t.closeIdle = true
955 t.idleLRU = connLRU{}
956 t.idleMu.Unlock()
957 for _, conns := range m {
958 for _, pconn := range conns {
959 pconn.close(errCloseIdleConns)
960 }
961 }
962 t.connsPerHostMu.Lock()
963 t.dialsInProgress.all(func(w *wantConn) {
964 if w.cancelCtx != nil && !w.waiting() {
965 w.cancelCtx()
966 }
967 })
968 t.connsPerHostMu.Unlock()
969
970
971
972
973 if tr2 := t.h2Transport; tr2 != nil {
974 tr2.CloseIdleConnections()
975 }
976
977
978
979
980 if t2 := t.closeIdleFunc; t2 != nil {
981 t2.CloseIdleConnections()
982 }
983
984 if cc, ok := t.h3Transport.(closeIdleConnectionser); ok {
985 cc.CloseIdleConnections()
986 }
987 }
988
989
990 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
991
992
993
994
995
996
997 cancel := func(err error) {
998 origCancel(err)
999 t.reqMu.Lock()
1000 delete(t.reqCanceler, req)
1001 t.reqMu.Unlock()
1002 }
1003 t.reqMu.Lock()
1004 if t.reqCanceler == nil {
1005 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
1006 }
1007 t.reqCanceler[req] = cancel
1008 t.reqMu.Unlock()
1009 return cancel
1010 }
1011
1012
1013
1014
1015
1016
1017
1018 func (t *Transport) CancelRequest(req *Request) {
1019 t.reqMu.Lock()
1020 cancel := t.reqCanceler[req]
1021 t.reqMu.Unlock()
1022 if cancel != nil {
1023 cancel(errRequestCanceled)
1024 }
1025 }
1026
1027
1028
1029
1030
1031 var (
1032 envProxyOnce sync.Once
1033 envProxyFuncValue func(*url.URL) (*url.URL, error)
1034 )
1035
1036
1037
1038 func envProxyFunc() func(*url.URL) (*url.URL, error) {
1039 envProxyOnce.Do(func() {
1040 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
1041 })
1042 return envProxyFuncValue
1043 }
1044
1045
1046 func resetProxyConfig() {
1047 envProxyOnce = sync.Once{}
1048 envProxyFuncValue = nil
1049 }
1050
1051 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
1052 cm.targetScheme = treq.URL.Scheme
1053 cm.targetAddr = canonicalAddr(treq.URL)
1054 if t.Proxy != nil {
1055 cm.proxyURL, err = t.Proxy(treq.Request)
1056 }
1057 cm.onlyH1 = treq.requiresHTTP1()
1058 return cm, err
1059 }
1060
1061
1062
1063 func (cm *connectMethod) proxyAuth() string {
1064 if cm.proxyURL == nil {
1065 return ""
1066 }
1067 if u := cm.proxyURL.User; u != nil {
1068 username := u.Username()
1069 password, _ := u.Password()
1070 return "Basic " + basicAuth(username, password)
1071 }
1072 return ""
1073 }
1074
1075
1076 var (
1077 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1078 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1079 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1080 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1081 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1082 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1083 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1084 errIdleConnTimeout = errors.New("http: idle connection timeout")
1085
1086
1087
1088
1089
1090 errServerClosedIdle = errors.New("http: server closed idle connection")
1091 )
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101 type transportReadFromServerError struct {
1102 err error
1103 }
1104
1105 func (e transportReadFromServerError) Unwrap() error { return e.err }
1106
1107 func (e transportReadFromServerError) Error() string {
1108 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1109 }
1110
1111 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1112 if err := t.tryPutIdleConn(pconn); err != nil {
1113 pconn.close(err)
1114 }
1115 }
1116
1117 func (t *Transport) maxIdleConnsPerHost() int {
1118 if v := t.MaxIdleConnsPerHost; v != 0 {
1119 return v
1120 }
1121 return DefaultMaxIdleConnsPerHost
1122 }
1123
1124
1125
1126
1127
1128
1129 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1130 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1131 return errKeepAlivesDisabled
1132 }
1133 if pconn.isBroken() {
1134 return errConnBroken
1135 }
1136 pconn.markReused()
1137 if pconn.isClientConn {
1138
1139 defer pconn.internalStateHook()
1140 pconn.mu.Lock()
1141 defer pconn.mu.Unlock()
1142 if !pconn.inFlight {
1143 panic("pconn is not in flight")
1144 }
1145 pconn.inFlight = false
1146 select {
1147 case pconn.availch <- struct{}{}:
1148 default:
1149 panic("unable to make pconn available")
1150 }
1151 return nil
1152 }
1153
1154 t.idleMu.Lock()
1155 defer t.idleMu.Unlock()
1156
1157
1158
1159
1160 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1161 return nil
1162 }
1163
1164
1165
1166
1167
1168 key := pconn.cacheKey
1169 if q, ok := t.idleConnWait[key]; ok {
1170 done := false
1171 if pconn.alt == nil {
1172
1173
1174 for q.len() > 0 {
1175 w := q.popFront()
1176 if w.tryDeliver(pconn, nil, time.Time{}) {
1177 done = true
1178 break
1179 }
1180 }
1181 } else {
1182
1183
1184
1185
1186 for q.len() > 0 {
1187 w := q.popFront()
1188 w.tryDeliver(pconn, nil, time.Time{})
1189 }
1190 }
1191 if q.len() == 0 {
1192 delete(t.idleConnWait, key)
1193 } else {
1194 t.idleConnWait[key] = q
1195 }
1196 if done {
1197 return nil
1198 }
1199 }
1200
1201 if t.closeIdle {
1202 return errCloseIdle
1203 }
1204 if t.idleConn == nil {
1205 t.idleConn = make(map[connectMethodKey][]*persistConn)
1206 }
1207 idles := t.idleConn[key]
1208 if len(idles) >= t.maxIdleConnsPerHost() {
1209 return errTooManyIdleHost
1210 }
1211 for _, exist := range idles {
1212 if exist == pconn {
1213 log.Fatalf("dup idle pconn %p in freelist", pconn)
1214 }
1215 }
1216 t.idleConn[key] = append(idles, pconn)
1217 t.idleLRU.add(pconn)
1218 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1219 oldest := t.idleLRU.removeOldest()
1220 oldest.close(errTooManyIdle)
1221 t.removeIdleConnLocked(oldest)
1222 }
1223
1224
1225
1226
1227 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1228 if pconn.idleTimer != nil {
1229 pconn.idleTimer.Reset(t.IdleConnTimeout)
1230 } else {
1231 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1232 }
1233 }
1234 pconn.idleAt = time.Now()
1235 return nil
1236 }
1237
1238
1239
1240
1241 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1242 if t.DisableKeepAlives {
1243 return false
1244 }
1245
1246 t.idleMu.Lock()
1247 defer t.idleMu.Unlock()
1248
1249
1250
1251 t.closeIdle = false
1252
1253 if w == nil {
1254
1255 return false
1256 }
1257
1258
1259
1260
1261 var oldTime time.Time
1262 if t.IdleConnTimeout > 0 {
1263 oldTime = time.Now().Add(-t.IdleConnTimeout)
1264 }
1265
1266
1267 if list, ok := t.idleConn[w.key]; ok {
1268 stop := false
1269 delivered := false
1270 for len(list) > 0 && !stop {
1271 pconn := list[len(list)-1]
1272
1273
1274
1275
1276 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1277 if tooOld {
1278
1279
1280
1281 go pconn.closeConnIfStillIdle()
1282 }
1283 if pconn.isBroken() || tooOld {
1284
1285
1286
1287
1288
1289 list = list[:len(list)-1]
1290 continue
1291 }
1292 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1293 if delivered {
1294 if pconn.alt != nil {
1295
1296
1297 } else {
1298
1299
1300 t.idleLRU.remove(pconn)
1301 list = list[:len(list)-1]
1302 }
1303 }
1304 stop = true
1305 }
1306 if len(list) > 0 {
1307 t.idleConn[w.key] = list
1308 } else {
1309 delete(t.idleConn, w.key)
1310 }
1311 if stop {
1312 return delivered
1313 }
1314 }
1315
1316
1317 if t.idleConnWait == nil {
1318 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1319 }
1320 q := t.idleConnWait[w.key]
1321 q.cleanFrontNotWaiting()
1322 q.pushBack(w)
1323 t.idleConnWait[w.key] = q
1324 return false
1325 }
1326
1327
1328 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1329 if pconn.isClientConn {
1330 return true
1331 }
1332 t.idleMu.Lock()
1333 defer t.idleMu.Unlock()
1334 return t.removeIdleConnLocked(pconn)
1335 }
1336
1337
1338 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1339 if pconn.idleTimer != nil {
1340 pconn.idleTimer.Stop()
1341 }
1342 t.idleLRU.remove(pconn)
1343 key := pconn.cacheKey
1344 pconns := t.idleConn[key]
1345 var removed bool
1346 switch len(pconns) {
1347 case 0:
1348
1349 case 1:
1350 if pconns[0] == pconn {
1351 delete(t.idleConn, key)
1352 removed = true
1353 }
1354 default:
1355 for i, v := range pconns {
1356 if v != pconn {
1357 continue
1358 }
1359
1360
1361 copy(pconns[i:], pconns[i+1:])
1362 t.idleConn[key] = pconns[:len(pconns)-1]
1363 removed = true
1364 break
1365 }
1366 }
1367 return removed
1368 }
1369
1370 var zeroDialer net.Dialer
1371
1372 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1373 if t.DialContext != nil {
1374 c, err := t.DialContext(ctx, network, addr)
1375 if c == nil && err == nil {
1376 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1377 }
1378 return c, err
1379 }
1380 if t.Dial != nil {
1381 c, err := t.Dial(network, addr)
1382 if c == nil && err == nil {
1383 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1384 }
1385 return c, err
1386 }
1387 return zeroDialer.DialContext(ctx, network, addr)
1388 }
1389
1390
1391
1392
1393
1394
1395
1396 type wantConn struct {
1397 cm connectMethod
1398 key connectMethodKey
1399
1400
1401
1402
1403 beforeDial func()
1404 afterDial func()
1405
1406 mu sync.Mutex
1407 ctx context.Context
1408 cancelCtx context.CancelFunc
1409 done bool
1410 result chan connOrError
1411 }
1412
1413 type connOrError struct {
1414 pc *persistConn
1415 err error
1416 idleAt time.Time
1417 }
1418
1419
1420 func (w *wantConn) waiting() bool {
1421 w.mu.Lock()
1422 defer w.mu.Unlock()
1423
1424 return !w.done
1425 }
1426
1427
1428 func (w *wantConn) getCtxForDial() context.Context {
1429 w.mu.Lock()
1430 defer w.mu.Unlock()
1431
1432 return w.ctx
1433 }
1434
1435
1436 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1437 w.mu.Lock()
1438 defer w.mu.Unlock()
1439
1440 if w.done {
1441 return false
1442 }
1443 if (pc == nil) == (err == nil) {
1444 panic("net/http: internal error: misuse of tryDeliver")
1445 }
1446 w.ctx = nil
1447 w.done = true
1448
1449 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1450 close(w.result)
1451
1452 return true
1453 }
1454
1455
1456
1457 func (w *wantConn) cancel(t *Transport) {
1458 w.mu.Lock()
1459 var pc *persistConn
1460 if w.done {
1461 if r, ok := <-w.result; ok {
1462 pc = r.pc
1463 }
1464 } else {
1465 close(w.result)
1466 }
1467 w.ctx = nil
1468 w.done = true
1469 w.mu.Unlock()
1470
1471
1472
1473
1474 if pc != nil && pc.alt == nil {
1475 t.putOrCloseIdleConn(pc)
1476 }
1477 }
1478
1479
1480 type wantConnQueue struct {
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491 head []*wantConn
1492 headPos int
1493 tail []*wantConn
1494 }
1495
1496
1497 func (q *wantConnQueue) len() int {
1498 return len(q.head) - q.headPos + len(q.tail)
1499 }
1500
1501
1502 func (q *wantConnQueue) pushBack(w *wantConn) {
1503 q.tail = append(q.tail, w)
1504 }
1505
1506
1507 func (q *wantConnQueue) popFront() *wantConn {
1508 if q.headPos >= len(q.head) {
1509 if len(q.tail) == 0 {
1510 return nil
1511 }
1512
1513 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1514 }
1515 w := q.head[q.headPos]
1516 q.head[q.headPos] = nil
1517 q.headPos++
1518 return w
1519 }
1520
1521
1522 func (q *wantConnQueue) peekFront() *wantConn {
1523 if q.headPos < len(q.head) {
1524 return q.head[q.headPos]
1525 }
1526 if len(q.tail) > 0 {
1527 return q.tail[0]
1528 }
1529 return nil
1530 }
1531
1532
1533
1534 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1535 for {
1536 w := q.peekFront()
1537 if w == nil || w.waiting() {
1538 return cleaned
1539 }
1540 q.popFront()
1541 cleaned = true
1542 }
1543 }
1544
1545
1546 func (q *wantConnQueue) cleanFrontCanceled() {
1547 for {
1548 w := q.peekFront()
1549 if w == nil || w.cancelCtx != nil {
1550 return
1551 }
1552 q.popFront()
1553 }
1554 }
1555
1556
1557
1558 func (q *wantConnQueue) all(f func(*wantConn)) {
1559 for _, w := range q.head[q.headPos:] {
1560 f(w)
1561 }
1562 for _, w := range q.tail {
1563 f(w)
1564 }
1565 }
1566
1567 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1568 if t.DialTLSContext != nil {
1569 conn, err = t.DialTLSContext(ctx, network, addr)
1570 } else {
1571 conn, err = t.DialTLS(network, addr)
1572 }
1573 if conn == nil && err == nil {
1574 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1575 }
1576 return
1577 }
1578
1579
1580
1581
1582
1583 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1584 req := treq.Request
1585 trace := treq.trace
1586 ctx := req.Context()
1587 if trace != nil && trace.GetConn != nil {
1588 trace.GetConn(cm.addr())
1589 }
1590
1591
1592
1593
1594
1595
1596 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1597
1598 w := &wantConn{
1599 cm: cm,
1600 key: cm.key(),
1601 ctx: dialCtx,
1602 cancelCtx: dialCancel,
1603 result: make(chan connOrError, 1),
1604 beforeDial: testHookPrePendingDial,
1605 afterDial: testHookPostPendingDial,
1606 }
1607 defer func() {
1608 if err != nil {
1609 w.cancel(t)
1610 }
1611 }()
1612
1613
1614 if delivered := t.queueForIdleConn(w); !delivered {
1615 t.queueForDial(w)
1616 }
1617
1618
1619 select {
1620 case r := <-w.result:
1621
1622
1623 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1624 info := httptrace.GotConnInfo{
1625 Conn: r.pc.conn,
1626 Reused: r.pc.isReused(),
1627 }
1628 if !r.idleAt.IsZero() {
1629 info.WasIdle = true
1630 info.IdleTime = time.Since(r.idleAt)
1631 }
1632 trace.GotConn(info)
1633 }
1634 if r.err != nil {
1635
1636
1637
1638 select {
1639 case <-treq.ctx.Done():
1640 err := context.Cause(treq.ctx)
1641 if err == errRequestCanceled {
1642 err = errRequestCanceledConn
1643 }
1644 return nil, err
1645 default:
1646
1647 }
1648 }
1649 return r.pc, r.err
1650 case <-treq.ctx.Done():
1651 err := context.Cause(treq.ctx)
1652 if err == errRequestCanceled {
1653 err = errRequestCanceledConn
1654 }
1655 return nil, err
1656 }
1657 }
1658
1659
1660
1661 func (t *Transport) queueForDial(w *wantConn) {
1662 w.beforeDial()
1663
1664 t.connsPerHostMu.Lock()
1665 defer t.connsPerHostMu.Unlock()
1666
1667 if t.MaxConnsPerHost <= 0 {
1668 t.startDialConnForLocked(w)
1669 return
1670 }
1671
1672 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1673 if t.connsPerHost == nil {
1674 t.connsPerHost = make(map[connectMethodKey]int)
1675 }
1676 t.connsPerHost[w.key] = n + 1
1677 t.startDialConnForLocked(w)
1678 return
1679 }
1680
1681 if t.connsPerHostWait == nil {
1682 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1683 }
1684 q := t.connsPerHostWait[w.key]
1685 q.cleanFrontNotWaiting()
1686 q.pushBack(w)
1687 t.connsPerHostWait[w.key] = q
1688 }
1689
1690
1691
1692 func (t *Transport) startDialConnForLocked(w *wantConn) {
1693 t.dialsInProgress.cleanFrontCanceled()
1694 t.dialsInProgress.pushBack(w)
1695 go func() {
1696 t.dialConnFor(w)
1697 t.connsPerHostMu.Lock()
1698 defer t.connsPerHostMu.Unlock()
1699 w.cancelCtx = nil
1700 }()
1701 }
1702
1703
1704
1705
1706 func (t *Transport) dialConnFor(w *wantConn) {
1707 defer w.afterDial()
1708 ctx := w.getCtxForDial()
1709 if ctx == nil {
1710 t.decConnsPerHost(w.key)
1711 return
1712 }
1713
1714 const isClientConn = false
1715 pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
1716 delivered := w.tryDeliver(pc, err, time.Time{})
1717 if err == nil && (!delivered || pc.alt != nil) {
1718
1719
1720
1721 t.putOrCloseIdleConn(pc)
1722 }
1723 if err != nil {
1724 t.decConnsPerHost(w.key)
1725 }
1726 }
1727
1728
1729
1730 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1731 if t.MaxConnsPerHost <= 0 {
1732 return
1733 }
1734
1735 t.connsPerHostMu.Lock()
1736 defer t.connsPerHostMu.Unlock()
1737 n := t.connsPerHost[key]
1738 if n == 0 {
1739
1740
1741 panic("net/http: internal error: connCount underflow")
1742 }
1743
1744
1745
1746
1747
1748 if q := t.connsPerHostWait[key]; q.len() > 0 {
1749 done := false
1750 for q.len() > 0 {
1751 w := q.popFront()
1752 if w.waiting() {
1753 t.startDialConnForLocked(w)
1754 done = true
1755 break
1756 }
1757 }
1758 if q.len() == 0 {
1759 delete(t.connsPerHostWait, key)
1760 } else {
1761
1762
1763 t.connsPerHostWait[key] = q
1764 }
1765 if done {
1766 return
1767 }
1768 }
1769
1770
1771 if n--; n == 0 {
1772 delete(t.connsPerHost, key)
1773 } else {
1774 t.connsPerHost[key] = n
1775 }
1776 }
1777
1778
1779
1780
1781 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1782
1783 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1784 if cfg.ServerName == "" {
1785 cfg.ServerName = name
1786 }
1787 if pconn.cacheKey.onlyH1 {
1788 cfg.NextProtos = nil
1789 }
1790 plainConn := pconn.conn
1791 tlsConn := tls.Client(plainConn, cfg)
1792 errc := make(chan error, 2)
1793 var timer *time.Timer
1794 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1795 timer = time.AfterFunc(d, func() {
1796 errc <- tlsHandshakeTimeoutError{}
1797 })
1798 }
1799 go func() {
1800 if trace != nil && trace.TLSHandshakeStart != nil {
1801 trace.TLSHandshakeStart()
1802 }
1803 err := tlsConn.HandshakeContext(ctx)
1804 if timer != nil {
1805 timer.Stop()
1806 }
1807 errc <- err
1808 }()
1809 if err := <-errc; err != nil {
1810 plainConn.Close()
1811 if err == (tlsHandshakeTimeoutError{}) {
1812
1813
1814 <-errc
1815 }
1816 if trace != nil && trace.TLSHandshakeDone != nil {
1817 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1818 }
1819 return err
1820 }
1821 cs := tlsConn.ConnectionState()
1822 if trace != nil && trace.TLSHandshakeDone != nil {
1823 trace.TLSHandshakeDone(cs, nil)
1824 }
1825 pconn.tlsState = &cs
1826 pconn.conn = tlsConn
1827 return nil
1828 }
1829
1830 type erringRoundTripper interface {
1831 RoundTripErr() error
1832 }
1833
1834 var testHookProxyConnectTimeout = context.WithTimeout
1835
1836 func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
1837
1838
1839
1840
1841 if p := t.protocols(); p.http3() {
1842 if p.HTTP1() || p.HTTP2() || p.UnencryptedHTTP2() {
1843 return nil, errors.New("http: when using HTTP3, Transport.Protocols must contain only HTTP3")
1844 }
1845 if t.h3Transport == nil {
1846 return nil, errors.New("http: Transport.Protocols contains HTTP3, but Transport does not support HTTP/3")
1847 }
1848 rt, err := t.h3Transport.DialClientConn(ctx, cm.addr(), cm.proxyURL, internalStateHook)
1849 if err != nil {
1850 return nil, err
1851 }
1852 return &persistConn{
1853 t: t,
1854 cacheKey: cm.key(),
1855 alt: rt,
1856 }, nil
1857 }
1858
1859 pconn = &persistConn{
1860 t: t,
1861 cacheKey: cm.key(),
1862 reqch: make(chan requestAndChan, 1),
1863 writech: make(chan writeRequest, 1),
1864 closech: make(chan struct{}),
1865 writeErrCh: make(chan error, 1),
1866 writeLoopDone: make(chan struct{}),
1867 isClientConn: isClientConn,
1868 internalStateHook: internalStateHook,
1869 }
1870 trace := httptrace.ContextClientTrace(ctx)
1871 wrapErr := func(err error) error {
1872 if cm.proxyURL != nil {
1873
1874 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1875 }
1876 return err
1877 }
1878
1879 if rt, err := t.http2ExternalDial(ctx, cm); err != errors.ErrUnsupported {
1880 if err != nil {
1881 return nil, err
1882 }
1883 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: rt}, nil
1884 }
1885
1886 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1887 var err error
1888 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1889 if err != nil {
1890 return nil, wrapErr(err)
1891 }
1892 type connectionStater interface {
1893 ConnectionState() tls.ConnectionState
1894 }
1895 type handshaker interface {
1896 HandshakeContext(context.Context) error
1897 }
1898 if cstater, ok := pconn.conn.(connectionStater); ok {
1899 if trace != nil && trace.TLSHandshakeStart != nil {
1900 trace.TLSHandshakeStart()
1901 }
1902 if handshaker, ok := cstater.(handshaker); ok {
1903
1904
1905 if err := handshaker.HandshakeContext(ctx); err != nil {
1906 go pconn.conn.Close()
1907 if trace != nil && trace.TLSHandshakeDone != nil {
1908 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1909 }
1910 return nil, err
1911 }
1912 }
1913 cs := cstater.ConnectionState()
1914 if trace != nil && trace.TLSHandshakeDone != nil {
1915 trace.TLSHandshakeDone(cs, nil)
1916 }
1917 pconn.tlsState = &cs
1918 }
1919 } else {
1920 conn, err := t.dial(ctx, "tcp", cm.addr())
1921 if err != nil {
1922 return nil, wrapErr(err)
1923 }
1924 pconn.conn = conn
1925 if cm.scheme() == "https" {
1926 var firstTLSHost string
1927 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1928 return nil, wrapErr(err)
1929 }
1930 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1931 return nil, wrapErr(err)
1932 }
1933 }
1934 }
1935
1936
1937 switch {
1938 case cm.proxyURL == nil:
1939
1940 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1941 conn := pconn.conn
1942 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1943 if u := cm.proxyURL.User; u != nil {
1944 auth := &socksUsernamePassword{
1945 Username: u.Username(),
1946 }
1947 auth.Password, _ = u.Password()
1948 d.AuthMethods = []socksAuthMethod{
1949 socksAuthMethodNotRequired,
1950 socksAuthMethodUsernamePassword,
1951 }
1952 d.Authenticate = auth.Authenticate
1953 }
1954 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1955 conn.Close()
1956 return nil, err
1957 }
1958 case cm.targetScheme == "http":
1959 pconn.isProxy = true
1960 if pa := cm.proxyAuth(); pa != "" {
1961 pconn.mutateHeaderFunc = func(h Header) {
1962 h.Set("Proxy-Authorization", pa)
1963 }
1964 }
1965 case cm.targetScheme == "https":
1966 conn := pconn.conn
1967 var hdr Header
1968 if t.GetProxyConnectHeader != nil {
1969 var err error
1970 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1971 if err != nil {
1972 conn.Close()
1973 return nil, err
1974 }
1975 } else {
1976 hdr = t.ProxyConnectHeader
1977 }
1978 if hdr == nil {
1979 hdr = make(Header)
1980 }
1981 if pa := cm.proxyAuth(); pa != "" {
1982 hdr = hdr.Clone()
1983 hdr.Set("Proxy-Authorization", pa)
1984 }
1985 connectReq := &Request{
1986 Method: "CONNECT",
1987 URL: &url.URL{Opaque: cm.targetAddr},
1988 Host: cm.targetAddr,
1989 Header: hdr,
1990 }
1991
1992
1993
1994
1995 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1996 defer cancel()
1997
1998 didReadResponse := make(chan struct{})
1999 var (
2000 resp *Response
2001 err error
2002 )
2003
2004 go func() {
2005 defer close(didReadResponse)
2006 err = connectReq.Write(conn)
2007 if err != nil {
2008 return
2009 }
2010
2011
2012 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
2013 resp, err = ReadResponse(br, connectReq)
2014 }()
2015 select {
2016 case <-connectCtx.Done():
2017 conn.Close()
2018 <-didReadResponse
2019 return nil, connectCtx.Err()
2020 case <-didReadResponse:
2021
2022 }
2023 if err != nil {
2024 conn.Close()
2025 return nil, err
2026 }
2027
2028 if t.OnProxyConnectResponse != nil {
2029 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
2030 if err != nil {
2031 conn.Close()
2032 return nil, err
2033 }
2034 }
2035
2036 if resp.StatusCode != 200 {
2037 _, text, ok := strings.Cut(resp.Status, " ")
2038 conn.Close()
2039 if !ok {
2040 return nil, errors.New("unknown status code")
2041 }
2042 return nil, errors.New(text)
2043 }
2044 }
2045
2046 if cm.proxyURL != nil && cm.targetScheme == "https" {
2047 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
2048 return nil, err
2049 }
2050 }
2051
2052
2053 unencryptedHTTP2 := pconn.tlsState == nil &&
2054 t.Protocols != nil &&
2055 t.Protocols.UnencryptedHTTP2() &&
2056 !t.Protocols.HTTP1()
2057
2058 http2 := unencryptedHTTP2 ||
2059 (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")
2060
2061 if http2 && t.h2Transport != nil {
2062 if isClientConn {
2063 cc, err := t.http2NewClientConn(pconn.conn, internalStateHook)
2064 if err == nil {
2065 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: cc, isClientConn: true}, nil
2066 }
2067 if err != errors.ErrUnsupported {
2068 return nil, err
2069 }
2070 } else {
2071 rt, err := t.http2AddConn(cm.targetScheme, cm.targetAddr, pconn.conn)
2072 if err == nil {
2073 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: rt}, nil
2074 }
2075 if err != errors.ErrUnsupported {
2076 return nil, err
2077 }
2078 }
2079 }
2080
2081 if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
2082 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
2083 h2, ok := altProto["https"].(newClientConner)
2084 if !ok {
2085 return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
2086 }
2087 alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
2088 if err != nil {
2089 pconn.conn.Close()
2090 return nil, err
2091 }
2092 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
2093 }
2094
2095 if unencryptedHTTP2 {
2096 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
2097 if !ok {
2098 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
2099 }
2100 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
2101 if e, ok := alt.(erringRoundTripper); ok {
2102
2103 return nil, e.RoundTripErr()
2104 }
2105 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2106 }
2107
2108 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
2109 tlsConn, tlsConnOK := pconn.conn.(*tls.Conn)
2110 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; tlsConnOK && ok {
2111 alt := next(cm.targetAddr, tlsConn)
2112 if e, ok := alt.(erringRoundTripper); ok {
2113
2114 return nil, e.RoundTripErr()
2115 }
2116 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
2117 }
2118 }
2119
2120 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
2121 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
2122
2123 go pconn.readLoop()
2124 go pconn.writeLoop()
2125 return pconn, nil
2126 }
2127
2128
2129
2130
2131
2132
2133
2134 type persistConnWriter struct {
2135 pc *persistConn
2136 }
2137
2138 func (w persistConnWriter) Write(p []byte) (n int, err error) {
2139 n, err = w.pc.conn.Write(p)
2140 w.pc.nwrite += int64(n)
2141 return
2142 }
2143
2144
2145
2146
2147 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
2148 n, err = io.Copy(w.pc.conn, r)
2149 w.pc.nwrite += n
2150 return
2151 }
2152
2153 var _ io.ReaderFrom = (*persistConnWriter)(nil)
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171 type connectMethod struct {
2172 _ incomparable
2173 proxyURL *url.URL
2174 targetScheme string
2175
2176
2177
2178 targetAddr string
2179 onlyH1 bool
2180 }
2181
2182 func (cm *connectMethod) key() connectMethodKey {
2183 proxyStr := ""
2184 targetAddr := cm.targetAddr
2185 if cm.proxyURL != nil {
2186 proxyStr = cm.proxyURL.String()
2187 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2188 targetAddr = ""
2189 }
2190 }
2191 return connectMethodKey{
2192 proxy: proxyStr,
2193 scheme: cm.targetScheme,
2194 addr: targetAddr,
2195 onlyH1: cm.onlyH1,
2196 }
2197 }
2198
2199
2200 func (cm *connectMethod) scheme() string {
2201 if cm.proxyURL != nil {
2202 return cm.proxyURL.Scheme
2203 }
2204 return cm.targetScheme
2205 }
2206
2207
2208 func (cm *connectMethod) addr() string {
2209 if cm.proxyURL != nil {
2210 return canonicalAddr(cm.proxyURL)
2211 }
2212 return cm.targetAddr
2213 }
2214
2215
2216
2217 func (cm *connectMethod) tlsHost() string {
2218 h := cm.targetAddr
2219 return removePort(h)
2220 }
2221
2222
2223
2224
2225 type connectMethodKey struct {
2226 proxy, scheme, addr string
2227 onlyH1 bool
2228 }
2229
2230 func (k connectMethodKey) String() string {
2231
2232 var h1 string
2233 if k.onlyH1 {
2234 h1 = ",h1"
2235 }
2236 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2237 }
2238
2239
2240
2241 type persistConn struct {
2242
2243
2244
2245 alt RoundTripper
2246
2247 t *Transport
2248 cacheKey connectMethodKey
2249 conn net.Conn
2250 tlsState *tls.ConnectionState
2251 br *bufio.Reader
2252 bw *bufio.Writer
2253 nwrite int64
2254 reqch chan requestAndChan
2255 writech chan writeRequest
2256 closech chan struct{}
2257 availch chan struct{}
2258 isProxy bool
2259 sawEOF bool
2260 isClientConn bool
2261 readLimit int64
2262
2263
2264
2265
2266 writeErrCh chan error
2267
2268 writeLoopDone chan struct{}
2269
2270
2271 idleAt time.Time
2272 idleTimer *time.Timer
2273
2274 mu sync.Mutex
2275 numExpectedResponses int
2276 closed error
2277 canceledErr error
2278 reused bool
2279 reserved bool
2280 inFlight bool
2281 internalStateHook func()
2282
2283
2284
2285
2286 mutateHeaderFunc func(Header)
2287 }
2288
2289 func (pc *persistConn) maxHeaderResponseSize() int64 {
2290 return pc.t.maxHeaderResponseSize()
2291 }
2292
2293 func (pc *persistConn) Read(p []byte) (n int, err error) {
2294 if pc.readLimit <= 0 {
2295 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2296 }
2297 if int64(len(p)) > pc.readLimit {
2298 p = p[:pc.readLimit]
2299 }
2300 n, err = pc.conn.Read(p)
2301 if err == io.EOF {
2302 pc.sawEOF = true
2303 }
2304 pc.readLimit -= int64(n)
2305 return
2306 }
2307
2308
2309 func (pc *persistConn) isBroken() bool {
2310 pc.mu.Lock()
2311 b := pc.closed != nil
2312 pc.mu.Unlock()
2313 return b
2314 }
2315
2316
2317
2318 func (pc *persistConn) canceled() error {
2319 pc.mu.Lock()
2320 defer pc.mu.Unlock()
2321 return pc.canceledErr
2322 }
2323
2324
2325 func (pc *persistConn) isReused() bool {
2326 pc.mu.Lock()
2327 r := pc.reused
2328 pc.mu.Unlock()
2329 return r
2330 }
2331
2332 func (pc *persistConn) cancelRequest(err error) {
2333 pc.mu.Lock()
2334 defer pc.mu.Unlock()
2335 pc.canceledErr = err
2336 pc.closeLocked(errRequestCanceled)
2337 }
2338
2339
2340
2341
2342 func (pc *persistConn) closeConnIfStillIdle() {
2343 t := pc.t
2344 t.idleMu.Lock()
2345 defer t.idleMu.Unlock()
2346 if _, ok := t.idleLRU.m[pc]; !ok {
2347
2348 return
2349 }
2350 t.removeIdleConnLocked(pc)
2351 pc.close(errIdleConnTimeout)
2352 }
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2363 if err == nil {
2364 return nil
2365 }
2366
2367
2368
2369
2370
2371
2372
2373
2374 <-pc.writeLoopDone
2375
2376
2377
2378
2379 if cerr := pc.canceled(); cerr != nil {
2380 return cerr
2381 }
2382
2383
2384 req.mu.Lock()
2385 reqErr := req.err
2386 req.mu.Unlock()
2387 if reqErr != nil {
2388 return reqErr
2389 }
2390
2391 if err == errServerClosedIdle {
2392
2393 return err
2394 }
2395
2396 if _, ok := err.(transportReadFromServerError); ok {
2397 if pc.nwrite == startBytesWritten {
2398 return nothingWrittenError{err}
2399 }
2400
2401 return err
2402 }
2403 if pc.isBroken() {
2404 if pc.nwrite == startBytesWritten {
2405 return nothingWrittenError{err}
2406 }
2407 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2408 }
2409 return err
2410 }
2411
2412
2413
2414
2415 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2416
2417
2418
2419
2420 const maxPostCloseReadBytes = 256 << 10
2421
2422
2423
2424
2425 const maxPostCloseReadTime = 50 * time.Millisecond
2426
2427 func maybeDrainBody(body io.Reader) bool {
2428 drainedCh := make(chan bool, 1)
2429 go func() {
2430 if _, err := io.CopyN(io.Discard, body, maxPostCloseReadBytes+1); err == io.EOF {
2431 drainedCh <- true
2432 } else {
2433 drainedCh <- false
2434 }
2435 }()
2436 select {
2437 case drained := <-drainedCh:
2438 return drained
2439 case <-time.After(maxPostCloseReadTime):
2440 return false
2441 }
2442 }
2443
2444 func (pc *persistConn) readLoop() {
2445 closeErr := errReadLoopExiting
2446 defer func() {
2447 pc.close(closeErr)
2448 pc.t.removeIdleConn(pc)
2449 if pc.internalStateHook != nil {
2450 pc.internalStateHook()
2451 }
2452 }()
2453
2454 tryPutIdleConn := func(treq *transportRequest) bool {
2455 trace := treq.trace
2456 if err := pc.t.tryPutIdleConn(pc); err != nil {
2457 closeErr = err
2458 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2459 trace.PutIdleConn(err)
2460 }
2461 return false
2462 }
2463 if trace != nil && trace.PutIdleConn != nil {
2464 trace.PutIdleConn(nil)
2465 }
2466 return true
2467 }
2468
2469
2470
2471
2472 eofc := make(chan struct{})
2473 defer close(eofc)
2474
2475
2476 testHookMu.Lock()
2477 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2478 testHookMu.Unlock()
2479
2480 alive := true
2481 for alive {
2482 pc.readLimit = pc.maxHeaderResponseSize()
2483 _, err := pc.br.Peek(1)
2484
2485 pc.mu.Lock()
2486 if pc.numExpectedResponses == 0 {
2487 pc.readLoopPeekFailLocked(err)
2488 pc.mu.Unlock()
2489 return
2490 }
2491 pc.mu.Unlock()
2492
2493 rc := <-pc.reqch
2494 trace := rc.treq.trace
2495
2496 var resp *Response
2497 if err == nil {
2498 resp, err = pc.readResponse(rc, trace)
2499 } else {
2500 err = transportReadFromServerError{err}
2501 closeErr = err
2502 }
2503
2504 if err != nil {
2505 if pc.readLimit <= 0 {
2506 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2507 }
2508
2509 select {
2510 case rc.ch <- responseAndError{err: err}:
2511 case <-rc.callerGone:
2512 return
2513 }
2514 return
2515 }
2516 pc.readLimit = maxInt64
2517
2518 pc.mu.Lock()
2519 pc.numExpectedResponses--
2520 pc.mu.Unlock()
2521
2522 bodyWritable := resp.bodyIsWritable()
2523 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2524
2525 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2526
2527
2528
2529 alive = false
2530 }
2531
2532 if !hasBody || bodyWritable {
2533
2534
2535
2536
2537
2538 alive = alive &&
2539 !pc.sawEOF &&
2540 pc.wroteRequest() &&
2541 tryPutIdleConn(rc.treq)
2542
2543 if bodyWritable {
2544 closeErr = errCallerOwnsConn
2545 }
2546
2547 select {
2548 case rc.ch <- responseAndError{res: resp}:
2549 case <-rc.callerGone:
2550 return
2551 }
2552
2553 rc.treq.cancel(errRequestDone)
2554
2555
2556
2557
2558 testHookReadLoopBeforeNextRead()
2559 continue
2560 }
2561
2562 waitForBodyRead := make(chan bool, 2)
2563 body := &bodyEOFSignal{
2564 body: resp.Body,
2565 earlyCloseFn: func() error {
2566 waitForBodyRead <- false
2567 <-eofc
2568 return nil
2569
2570 },
2571 fn: func(err error) error {
2572 isEOF := err == io.EOF
2573 waitForBodyRead <- isEOF
2574 if isEOF {
2575 <-eofc
2576 } else if err != nil {
2577 if cerr := pc.canceled(); cerr != nil {
2578 return cerr
2579 }
2580 }
2581 return err
2582 },
2583 }
2584
2585 resp.Body = body
2586 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2587 resp.Body = &gzipReader{body: body}
2588 resp.Header.Del("Content-Encoding")
2589 resp.Header.Del("Content-Length")
2590 resp.ContentLength = -1
2591 resp.Uncompressed = true
2592 }
2593
2594 select {
2595 case rc.ch <- responseAndError{res: resp}:
2596 case <-rc.callerGone:
2597 return
2598 }
2599
2600
2601
2602
2603 select {
2604 case bodyEOF := <-waitForBodyRead:
2605 tryDrain := !bodyEOF && resp.ContentLength <= maxPostCloseReadBytes
2606 if tryDrain {
2607 eofc <- struct{}{}
2608 bodyEOF = maybeDrainBody(body.body)
2609 }
2610 alive = alive &&
2611 bodyEOF &&
2612 !pc.sawEOF &&
2613 pc.wroteRequest() &&
2614 tryPutIdleConn(rc.treq)
2615 if !tryDrain && bodyEOF {
2616 eofc <- struct{}{}
2617 }
2618 case <-rc.treq.ctx.Done():
2619 alive = false
2620 pc.cancelRequest(context.Cause(rc.treq.ctx))
2621 case <-pc.closech:
2622 alive = false
2623 }
2624
2625 rc.treq.cancel(errRequestDone)
2626 testHookReadLoopBeforeNextRead()
2627 }
2628 }
2629
2630 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2631 if pc.closed != nil {
2632 return
2633 }
2634 if n := pc.br.Buffered(); n > 0 {
2635 buf, _ := pc.br.Peek(n)
2636 if is408Message(buf) {
2637 pc.closeLocked(errServerClosedIdle)
2638 return
2639 } else {
2640 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2641 }
2642 }
2643 if peekErr == io.EOF {
2644
2645 pc.closeLocked(errServerClosedIdle)
2646 } else {
2647 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2648 }
2649 }
2650
2651
2652
2653
2654 func is408Message(buf []byte) bool {
2655 if len(buf) < len("HTTP/1.x 408") {
2656 return false
2657 }
2658 if string(buf[:7]) != "HTTP/1." {
2659 return false
2660 }
2661 return string(buf[8:12]) == " 408"
2662 }
2663
2664
2665
2666
2667 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2668 if trace != nil && trace.GotFirstResponseByte != nil {
2669 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2670 trace.GotFirstResponseByte()
2671 }
2672 }
2673
2674 continueCh := rc.continueCh
2675 for {
2676 resp, err = ReadResponse(pc.br, rc.treq.Request)
2677 if err != nil {
2678 return
2679 }
2680 resCode := resp.StatusCode
2681 if continueCh != nil && resCode == StatusContinue {
2682 if trace != nil && trace.Got100Continue != nil {
2683 trace.Got100Continue()
2684 }
2685 continueCh <- struct{}{}
2686 continueCh = nil
2687 }
2688 is1xx := 100 <= resCode && resCode <= 199
2689
2690 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2691 if is1xxNonTerminal {
2692 if trace != nil && trace.Got1xxResponse != nil {
2693 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2694 return nil, err
2695 }
2696
2697
2698
2699
2700
2701
2702
2703 pc.readLimit = pc.maxHeaderResponseSize()
2704 }
2705 continue
2706 }
2707 break
2708 }
2709 if resp.isProtocolSwitch() {
2710 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2711 }
2712 if continueCh != nil {
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725 if resp.Close || rc.treq.Request.Close {
2726 close(continueCh)
2727 } else {
2728 continueCh <- struct{}{}
2729 }
2730 }
2731
2732 resp.TLS = pc.tlsState
2733 return
2734 }
2735
2736
2737
2738
2739 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2740 if continueCh == nil {
2741 return nil
2742 }
2743 return func() bool {
2744 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2745 defer timer.Stop()
2746
2747 select {
2748 case _, ok := <-continueCh:
2749 return ok
2750 case <-timer.C:
2751 return true
2752 case <-pc.closech:
2753 return false
2754 }
2755 }
2756 }
2757
2758 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2759 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2760 if br.Buffered() != 0 {
2761 body.br = br
2762 }
2763 return body
2764 }
2765
2766
2767
2768
2769
2770
2771 type readWriteCloserBody struct {
2772 _ incomparable
2773 br *bufio.Reader
2774 io.ReadWriteCloser
2775 }
2776
2777 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2778 if b.br != nil {
2779 if n := b.br.Buffered(); len(p) > n {
2780 p = p[:n]
2781 }
2782 n, err = b.br.Read(p)
2783 if b.br.Buffered() == 0 {
2784 b.br = nil
2785 }
2786 return n, err
2787 }
2788 return b.ReadWriteCloser.Read(p)
2789 }
2790
2791 func (b *readWriteCloserBody) CloseWrite() error {
2792 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2793 return cw.CloseWrite()
2794 }
2795 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2796 }
2797
2798
2799 type nothingWrittenError struct {
2800 error
2801 }
2802
2803 func (nwe nothingWrittenError) Unwrap() error {
2804 return nwe.error
2805 }
2806
2807 func (pc *persistConn) writeLoop() {
2808 defer close(pc.writeLoopDone)
2809 for {
2810 select {
2811 case wr := <-pc.writech:
2812 startBytesWritten := pc.nwrite
2813 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2814 if bre, ok := err.(requestBodyReadError); ok {
2815 err = bre.error
2816
2817
2818
2819
2820
2821
2822
2823 wr.req.setError(err)
2824 }
2825 if err == nil {
2826 err = pc.bw.Flush()
2827 }
2828 if err != nil {
2829 if pc.nwrite == startBytesWritten {
2830 err = nothingWrittenError{err}
2831 }
2832 }
2833 pc.writeErrCh <- err
2834 wr.ch <- err
2835 if err != nil {
2836 pc.close(err)
2837 return
2838 }
2839 case <-pc.closech:
2840 return
2841 }
2842 }
2843 }
2844
2845
2846
2847
2848
2849
2850
2851 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2852
2853
2854
2855 func (pc *persistConn) wroteRequest() bool {
2856 select {
2857 case err := <-pc.writeErrCh:
2858
2859
2860 return err == nil
2861 default:
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2873 defer t.Stop()
2874 select {
2875 case err := <-pc.writeErrCh:
2876 return err == nil
2877 case <-t.C:
2878 return false
2879 }
2880 }
2881 }
2882
2883
2884
2885 type responseAndError struct {
2886 _ incomparable
2887 res *Response
2888 err error
2889 }
2890
2891 type requestAndChan struct {
2892 _ incomparable
2893 treq *transportRequest
2894 ch chan responseAndError
2895
2896
2897
2898
2899 addedGzip bool
2900
2901
2902
2903
2904
2905 continueCh chan<- struct{}
2906
2907 callerGone <-chan struct{}
2908 }
2909
2910
2911
2912
2913
2914 type writeRequest struct {
2915 req *transportRequest
2916 ch chan<- error
2917
2918
2919
2920
2921 continueCh <-chan struct{}
2922 }
2923
2924
2925
2926 type timeoutError struct {
2927 err string
2928 }
2929
2930 func (e *timeoutError) Error() string { return e.err }
2931 func (e *timeoutError) Timeout() bool { return true }
2932 func (e *timeoutError) Temporary() bool { return true }
2933 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2934
2935 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2936
2937
2938
2939 var errRequestCanceled = internal.ErrRequestCanceled
2940 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2941
2942
2943
2944 var errRequestDone = errors.New("net/http: request completed")
2945
2946 func nop() {}
2947
2948
2949 var (
2950 testHookEnterRoundTrip = nop
2951 testHookWaitResLoop = nop
2952 testHookRoundTripRetried = nop
2953 testHookPrePendingDial = nop
2954 testHookPostPendingDial = nop
2955
2956 testHookMu sync.Locker = fakeLocker{}
2957 testHookReadLoopBeforeNextRead = nop
2958 )
2959
2960 func (pc *persistConn) waitForAvailability(ctx context.Context) error {
2961 select {
2962 case <-pc.availch:
2963 return nil
2964 case <-pc.closech:
2965 return pc.closed
2966 case <-ctx.Done():
2967 return ctx.Err()
2968 }
2969 }
2970
2971 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2972 testHookEnterRoundTrip()
2973
2974 pc.mu.Lock()
2975 if pc.isClientConn {
2976 if !pc.reserved {
2977 pc.mu.Unlock()
2978 if err := pc.waitForAvailability(req.ctx); err != nil {
2979 return nil, err
2980 }
2981 pc.mu.Lock()
2982 }
2983 pc.reserved = false
2984 pc.inFlight = true
2985 }
2986 pc.numExpectedResponses++
2987 headerFn := pc.mutateHeaderFunc
2988 pc.mu.Unlock()
2989
2990 if headerFn != nil {
2991 headerFn(req.extraHeaders())
2992 }
2993
2994
2995
2996
2997
2998 requestedGzip := false
2999 if !pc.t.DisableCompression &&
3000 req.Header.Get("Accept-Encoding") == "" &&
3001 req.Header.Get("Range") == "" &&
3002 req.Method != "HEAD" {
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015 requestedGzip = true
3016 req.extraHeaders().Set("Accept-Encoding", "gzip")
3017 }
3018
3019 var continueCh chan struct{}
3020 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
3021 continueCh = make(chan struct{}, 1)
3022 }
3023
3024 if pc.t.DisableKeepAlives &&
3025 !req.wantsClose() &&
3026 !isProtocolSwitchHeader(req.Header) {
3027 req.extraHeaders().Set("Connection", "close")
3028 }
3029
3030 gone := make(chan struct{})
3031 defer close(gone)
3032
3033 const debugRoundTrip = false
3034
3035
3036
3037
3038 startBytesWritten := pc.nwrite
3039 writeErrCh := make(chan error, 1)
3040 pc.writech <- writeRequest{req, writeErrCh, continueCh}
3041
3042 resc := make(chan responseAndError)
3043 pc.reqch <- requestAndChan{
3044 treq: req,
3045 ch: resc,
3046 addedGzip: requestedGzip,
3047 continueCh: continueCh,
3048 callerGone: gone,
3049 }
3050
3051 handleResponse := func(re responseAndError) (*Response, error) {
3052 if (re.res == nil) == (re.err == nil) {
3053 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
3054 }
3055 if debugRoundTrip {
3056 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
3057 }
3058 if re.err != nil {
3059 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
3060 }
3061 return re.res, nil
3062 }
3063
3064 var respHeaderTimer <-chan time.Time
3065 ctxDoneChan := req.ctx.Done()
3066 pcClosed := pc.closech
3067 for {
3068 testHookWaitResLoop()
3069 select {
3070 case err := <-writeErrCh:
3071 if debugRoundTrip {
3072 req.logf("writeErrCh recv: %T/%#v", err, err)
3073 }
3074 if err != nil {
3075 pc.close(fmt.Errorf("write error: %w", err))
3076 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
3077 }
3078 if d := pc.t.ResponseHeaderTimeout; d > 0 {
3079 if debugRoundTrip {
3080 req.logf("starting timer for %v", d)
3081 }
3082 timer := time.NewTimer(d)
3083 defer timer.Stop()
3084 respHeaderTimer = timer.C
3085 }
3086 case <-pcClosed:
3087 select {
3088 case re := <-resc:
3089
3090
3091
3092 return handleResponse(re)
3093 default:
3094 }
3095 if debugRoundTrip {
3096 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
3097 }
3098 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
3099 case <-respHeaderTimer:
3100 if debugRoundTrip {
3101 req.logf("timeout waiting for response headers.")
3102 }
3103 pc.close(errTimeout)
3104 return nil, errTimeout
3105 case re := <-resc:
3106 return handleResponse(re)
3107 case <-ctxDoneChan:
3108 select {
3109 case re := <-resc:
3110
3111
3112
3113 return handleResponse(re)
3114 default:
3115 }
3116 pc.cancelRequest(context.Cause(req.ctx))
3117 }
3118 }
3119 }
3120
3121
3122
3123 type tLogKey struct{}
3124
3125 func (tr *transportRequest) logf(format string, args ...any) {
3126 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
3127 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
3128 }
3129 }
3130
3131
3132
3133 func (pc *persistConn) markReused() {
3134 pc.mu.Lock()
3135 pc.reused = true
3136 pc.mu.Unlock()
3137 }
3138
3139
3140
3141
3142
3143
3144 func (pc *persistConn) close(err error) {
3145 pc.mu.Lock()
3146 defer pc.mu.Unlock()
3147 pc.closeLocked(err)
3148 }
3149
3150 func (pc *persistConn) closeLocked(err error) {
3151 if err == nil {
3152 panic("nil error")
3153 }
3154 if pc.closed == nil {
3155 pc.closed = err
3156 pc.t.decConnsPerHost(pc.cacheKey)
3157
3158
3159
3160 if pc.alt == nil {
3161 if err != errCallerOwnsConn {
3162 pc.conn.Close()
3163 }
3164 close(pc.closech)
3165 } else {
3166 if cc, ok := pc.alt.(io.Closer); ok {
3167 cc.Close()
3168 }
3169 }
3170 }
3171 pc.mutateHeaderFunc = nil
3172 }
3173
3174 func schemePort(scheme string) string {
3175 switch scheme {
3176 case "http":
3177 return "80"
3178 case "https":
3179 return "443"
3180 case "socks5", "socks5h":
3181 return "1080"
3182 default:
3183 return ""
3184 }
3185 }
3186
3187 func idnaASCIIFromURL(url *url.URL) string {
3188 addr := url.Hostname()
3189 if v, err := idnaASCII(addr); err == nil {
3190 addr = v
3191 }
3192 return addr
3193 }
3194
3195
3196 func canonicalAddr(url *url.URL) string {
3197 port := url.Port()
3198 if port == "" {
3199 port = schemePort(url.Scheme)
3200 }
3201 return net.JoinHostPort(idnaASCIIFromURL(url), port)
3202 }
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215 type bodyEOFSignal struct {
3216 body io.ReadCloser
3217 mu sync.Mutex
3218 closed bool
3219 rerr error
3220 fn func(error) error
3221 earlyCloseFn func() error
3222 }
3223
3224 var errReadOnClosedResBody = errors.New("http: read on closed response body")
3225 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
3226
3227 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
3228 es.mu.Lock()
3229 closed, rerr := es.closed, es.rerr
3230 es.mu.Unlock()
3231 if closed {
3232 return 0, errReadOnClosedResBody
3233 }
3234 if rerr != nil {
3235 return 0, rerr
3236 }
3237
3238 n, err = es.body.Read(p)
3239 if err != nil {
3240 es.mu.Lock()
3241 defer es.mu.Unlock()
3242 if es.rerr == nil {
3243 es.rerr = err
3244 }
3245 err = es.condfn(err)
3246 }
3247 return
3248 }
3249
3250 func (es *bodyEOFSignal) Close() error {
3251 es.mu.Lock()
3252 defer es.mu.Unlock()
3253 if es.closed {
3254 return nil
3255 }
3256 es.closed = true
3257 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3258 return es.earlyCloseFn()
3259 }
3260 err := es.body.Close()
3261 return es.condfn(err)
3262 }
3263
3264
3265 func (es *bodyEOFSignal) condfn(err error) error {
3266 if es.fn == nil {
3267 return err
3268 }
3269 err = es.fn(err)
3270 es.fn = nil
3271 return err
3272 }
3273
3274
3275
3276
3277
3278 type gzipReader struct {
3279 _ incomparable
3280 body *bodyEOFSignal
3281 mu sync.Mutex
3282 zr *gzip.Reader
3283 zerr error
3284 }
3285
3286 type eofReader struct{}
3287
3288 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3289 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3290
3291 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3292
3293
3294 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3295 zr := gzipPool.Get().(*gzip.Reader)
3296 if err := zr.Reset(r); err != nil {
3297 gzipPoolPut(zr)
3298 return nil, err
3299 }
3300 return zr, nil
3301 }
3302
3303
3304 func gzipPoolPut(zr *gzip.Reader) {
3305
3306
3307 var r flate.Reader = eofReader{}
3308 zr.Reset(r)
3309 gzipPool.Put(zr)
3310 }
3311
3312
3313
3314 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3315 gz.mu.Lock()
3316 defer gz.mu.Unlock()
3317 if gz.zerr != nil {
3318 return nil, gz.zerr
3319 }
3320 if gz.zr == nil {
3321 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3322 if gz.zerr != nil {
3323 return nil, gz.zerr
3324 }
3325 }
3326 ret := gz.zr
3327 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3328 return ret, nil
3329 }
3330
3331
3332 func (gz *gzipReader) release(zr *gzip.Reader) {
3333 gz.mu.Lock()
3334 defer gz.mu.Unlock()
3335 if gz.zerr == errConcurrentReadOnResBody {
3336 gz.zr, gz.zerr = zr, nil
3337 } else {
3338 gzipPoolPut(zr)
3339 }
3340 }
3341
3342
3343
3344 func (gz *gzipReader) close() {
3345 gz.mu.Lock()
3346 defer gz.mu.Unlock()
3347 if gz.zerr == nil && gz.zr != nil {
3348 gzipPoolPut(gz.zr)
3349 gz.zr = nil
3350 }
3351 gz.zerr = errReadOnClosedResBody
3352 }
3353
3354 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3355 zr, err := gz.acquire()
3356 if err != nil {
3357 return 0, err
3358 }
3359 defer gz.release(zr)
3360
3361 return zr.Read(p)
3362 }
3363
3364 func (gz *gzipReader) Close() error {
3365 gz.close()
3366
3367 return gz.body.Close()
3368 }
3369
3370 type tlsHandshakeTimeoutError struct{}
3371
3372 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3373 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3374 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3375
3376
3377
3378
3379 type fakeLocker struct{}
3380
3381 func (fakeLocker) Lock() {}
3382 func (fakeLocker) Unlock() {}
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3398 if cfg == nil {
3399 return &tls.Config{}
3400 }
3401 return cfg.Clone()
3402 }
3403
3404 type connLRU struct {
3405 ll *list.List
3406 m map[*persistConn]*list.Element
3407 }
3408
3409
3410 func (cl *connLRU) add(pc *persistConn) {
3411 if cl.ll == nil {
3412 cl.ll = list.New()
3413 cl.m = make(map[*persistConn]*list.Element)
3414 }
3415 ele := cl.ll.PushFront(pc)
3416 if _, ok := cl.m[pc]; ok {
3417 panic("persistConn was already in LRU")
3418 }
3419 cl.m[pc] = ele
3420 }
3421
3422 func (cl *connLRU) removeOldest() *persistConn {
3423 ele := cl.ll.Back()
3424 pc := ele.Value.(*persistConn)
3425 cl.ll.Remove(ele)
3426 delete(cl.m, pc)
3427 return pc
3428 }
3429
3430
3431 func (cl *connLRU) remove(pc *persistConn) {
3432 if ele, ok := cl.m[pc]; ok {
3433 cl.ll.Remove(ele)
3434 delete(cl.m, pc)
3435 }
3436 }
3437
3438
3439 func (cl *connLRU) len() int {
3440 return len(cl.m)
3441 }
3442
View as plain text