Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/flate"
15 "compress/gzip"
16 "container/list"
17 "context"
18 "crypto/tls"
19 "errors"
20 "fmt"
21 "internal/godebug"
22 "io"
23 "log"
24 "maps"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal/ascii"
28 "net/textproto"
29 "net/url"
30 "reflect"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35 _ "unsafe"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http/httpproxy"
39 )
40
41
42
43
44
45
46 var DefaultTransport RoundTripper = &Transport{
47 Proxy: ProxyFromEnvironment,
48 DialContext: defaultTransportDialContext(&net.Dialer{
49 Timeout: 30 * time.Second,
50 KeepAlive: 30 * time.Second,
51 }),
52 ForceAttemptHTTP2: true,
53 MaxIdleConns: 100,
54 IdleConnTimeout: 90 * time.Second,
55 TLSHandshakeTimeout: 10 * time.Second,
56 ExpectContinueTimeout: 1 * time.Second,
57 }
58
59
60
61 const DefaultMaxIdleConnsPerHost = 2
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 type Transport struct {
98 idleMu sync.Mutex
99 closeIdle bool
100 idleConn map[connectMethodKey][]*persistConn
101 idleConnWait map[connectMethodKey]wantConnQueue
102 idleLRU connLRU
103
104 reqMu sync.Mutex
105 reqCanceler map[*Request]context.CancelCauseFunc
106
107 altMu sync.Mutex
108 altProto atomic.Value
109
110 connsPerHostMu sync.Mutex
111 connsPerHost map[connectMethodKey]int
112 connsPerHostWait map[connectMethodKey]wantConnQueue
113 dialsInProgress wantConnQueue
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 Proxy func(*Request) (*url.URL, error)
130
131
132
133
134 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
135
136
137
138
139
140
141
142
143
144 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
145
146
147
148
149
150
151
152
153
154
155
156 Dial func(network, addr string) (net.Conn, error)
157
158
159
160
161
162
163
164
165
166
167
168 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
169
170
171
172
173
174
175
176 DialTLS func(network, addr string) (net.Conn, error)
177
178
179
180
181
182 TLSClientConfig *tls.Config
183
184
185
186 TLSHandshakeTimeout time.Duration
187
188
189
190
191
192
193 DisableKeepAlives bool
194
195
196
197
198
199
200
201
202
203 DisableCompression bool
204
205
206
207 MaxIdleConns int
208
209
210
211
212 MaxIdleConnsPerHost int
213
214
215
216
217
218
219 MaxConnsPerHost int
220
221
222
223
224
225 IdleConnTimeout time.Duration
226
227
228
229
230
231 ResponseHeaderTimeout time.Duration
232
233
234
235
236
237
238
239
240 ExpectContinueTimeout time.Duration
241
242
243
244
245
246
247
248
249
250
251
252 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
253
254
255
256
257 ProxyConnectHeader Header
258
259
260
261
262
263
264
265
266 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
267
268
269
270
271
272
273 MaxResponseHeaderBytes int64
274
275
276
277
278 WriteBufferSize int
279
280
281
282
283 ReadBufferSize int
284
285
286
287 nextProtoOnce sync.Once
288 h2transport h2Transport
289 tlsNextProtoWasNil bool
290
291
292
293
294
295
296 ForceAttemptHTTP2 bool
297
298
299
300
301
302 HTTP2 *HTTP2Config
303
304
305
306
307
308
309
310
311
312 Protocols *Protocols
313 }
314
315 func (t *Transport) writeBufferSize() int {
316 if t.WriteBufferSize > 0 {
317 return t.WriteBufferSize
318 }
319 return 4 << 10
320 }
321
322 func (t *Transport) readBufferSize() int {
323 if t.ReadBufferSize > 0 {
324 return t.ReadBufferSize
325 }
326 return 4 << 10
327 }
328
329 func (t *Transport) maxHeaderResponseSize() int64 {
330 if t.MaxResponseHeaderBytes > 0 {
331 return t.MaxResponseHeaderBytes
332 }
333 return 10 << 20
334 }
335
336
337 func (t *Transport) Clone() *Transport {
338 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
339 t2 := &Transport{
340 Proxy: t.Proxy,
341 OnProxyConnectResponse: t.OnProxyConnectResponse,
342 DialContext: t.DialContext,
343 Dial: t.Dial,
344 DialTLS: t.DialTLS,
345 DialTLSContext: t.DialTLSContext,
346 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
347 DisableKeepAlives: t.DisableKeepAlives,
348 DisableCompression: t.DisableCompression,
349 MaxIdleConns: t.MaxIdleConns,
350 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
351 MaxConnsPerHost: t.MaxConnsPerHost,
352 IdleConnTimeout: t.IdleConnTimeout,
353 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
354 ExpectContinueTimeout: t.ExpectContinueTimeout,
355 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
356 GetProxyConnectHeader: t.GetProxyConnectHeader,
357 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
358 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
359 WriteBufferSize: t.WriteBufferSize,
360 ReadBufferSize: t.ReadBufferSize,
361 }
362 if t.TLSClientConfig != nil {
363 t2.TLSClientConfig = t.TLSClientConfig.Clone()
364 }
365 if t.HTTP2 != nil {
366 t2.HTTP2 = &HTTP2Config{}
367 *t2.HTTP2 = *t.HTTP2
368 }
369 if t.Protocols != nil {
370 t2.Protocols = &Protocols{}
371 *t2.Protocols = *t.Protocols
372 }
373 if !t.tlsNextProtoWasNil {
374 npm := maps.Clone(t.TLSNextProto)
375 if npm == nil {
376 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
377 }
378 t2.TLSNextProto = npm
379 }
380 return t2
381 }
382
383
384
385
386
387
388
389 type h2Transport interface {
390 CloseIdleConnections()
391 }
392
393 func (t *Transport) hasCustomTLSDialer() bool {
394 return t.DialTLS != nil || t.DialTLSContext != nil
395 }
396
397 var http2client = godebug.New("http2client")
398
399
400
401 func (t *Transport) onceSetNextProtoDefaults() {
402 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
403 if http2client.Value() == "0" {
404 http2client.IncNonDefault()
405 return
406 }
407
408
409
410
411
412
413 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
414 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
415 if v := rv.Field(0); v.CanInterface() {
416 if h2i, ok := v.Interface().(h2Transport); ok {
417 t.h2transport = h2i
418 return
419 }
420 }
421 }
422
423 if _, ok := t.TLSNextProto["h2"]; ok {
424
425 return
426 }
427 protocols := t.protocols()
428 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
429 return
430 }
431 if omitBundledHTTP2 {
432 return
433 }
434 t2, err := http2configureTransports(t)
435 if err != nil {
436 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
437 return
438 }
439 t.h2transport = t2
440
441
442
443
444
445
446
447 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
448 const h2max = 1<<32 - 1
449 if limit1 >= h2max {
450 t2.MaxHeaderListSize = h2max
451 } else {
452 t2.MaxHeaderListSize = uint32(limit1)
453 }
454 }
455
456
457
458
459
460
461 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
462 }
463
464 func (t *Transport) protocols() Protocols {
465 if t.Protocols != nil {
466 return *t.Protocols
467 }
468 var p Protocols
469 p.SetHTTP1(true)
470 switch {
471 case t.TLSNextProto != nil:
472
473
474 if t.TLSNextProto["h2"] != nil {
475 p.SetHTTP2(true)
476 }
477 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
478
479
480
481
482
483
484 case http2client.Value() == "0":
485 default:
486 p.SetHTTP2(true)
487 }
488 return p
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
508 return envProxyFunc()(req.URL)
509 }
510
511
512
513 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
514 return func(*Request) (*url.URL, error) {
515 return fixedURL, nil
516 }
517 }
518
519
520
521
522 type transportRequest struct {
523 *Request
524 extra Header
525 trace *httptrace.ClientTrace
526
527 ctx context.Context
528 cancel context.CancelCauseFunc
529
530 mu sync.Mutex
531 err error
532 }
533
534 func (tr *transportRequest) extraHeaders() Header {
535 if tr.extra == nil {
536 tr.extra = make(Header)
537 }
538 return tr.extra
539 }
540
541 func (tr *transportRequest) setError(err error) {
542 tr.mu.Lock()
543 if tr.err == nil {
544 tr.err = err
545 }
546 tr.mu.Unlock()
547 }
548
549
550
551 func (t *Transport) useRegisteredProtocol(req *Request) bool {
552 if req.URL.Scheme == "https" && req.requiresHTTP1() {
553
554
555
556
557 return false
558 }
559 return true
560 }
561
562
563
564
565 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
566 if !t.useRegisteredProtocol(req) {
567 return nil
568 }
569 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
570 return altProto[req.URL.Scheme]
571 }
572
573 func validateHeaders(hdrs Header) string {
574 for k, vv := range hdrs {
575 if !httpguts.ValidHeaderFieldName(k) {
576 return fmt.Sprintf("field name %q", k)
577 }
578 for _, v := range vv {
579 if !httpguts.ValidHeaderFieldValue(v) {
580
581
582 return fmt.Sprintf("field value for %q", k)
583 }
584 }
585 }
586 return ""
587 }
588
589
590 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
591 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
592 ctx := req.Context()
593 trace := httptrace.ContextClientTrace(ctx)
594
595 if req.URL == nil {
596 req.closeBody()
597 return nil, errors.New("http: nil Request.URL")
598 }
599 if req.Header == nil {
600 req.closeBody()
601 return nil, errors.New("http: nil Request.Header")
602 }
603 scheme := req.URL.Scheme
604 isHTTP := scheme == "http" || scheme == "https"
605 if isHTTP {
606
607 if err := validateHeaders(req.Header); err != "" {
608 req.closeBody()
609 return nil, fmt.Errorf("net/http: invalid header %s", err)
610 }
611
612
613 if err := validateHeaders(req.Trailer); err != "" {
614 req.closeBody()
615 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
616 }
617 }
618
619 origReq := req
620 req = setupRewindBody(req)
621
622 if altRT := t.alternateRoundTripper(req); altRT != nil {
623 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
624 return resp, err
625 }
626 var err error
627 req, err = rewindBody(req)
628 if err != nil {
629 return nil, err
630 }
631 }
632 if !isHTTP {
633 req.closeBody()
634 return nil, badStringError("unsupported protocol scheme", scheme)
635 }
636 if req.Method != "" && !validMethod(req.Method) {
637 req.closeBody()
638 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
639 }
640 if req.URL.Host == "" {
641 req.closeBody()
642 return nil, errors.New("http: no Host in request URL")
643 }
644
645
646
647
648
649
650
651
652
653
654 ctx, cancel := context.WithCancelCause(req.Context())
655
656
657 if origReq.Cancel != nil {
658 go awaitLegacyCancel(ctx, cancel, origReq)
659 }
660
661
662
663
664
665 cancel = t.prepareTransportCancel(origReq, cancel)
666
667 defer func() {
668 if err != nil {
669 cancel(err)
670 }
671 }()
672
673 for {
674 select {
675 case <-ctx.Done():
676 req.closeBody()
677 return nil, context.Cause(ctx)
678 default:
679 }
680
681
682 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
683 cm, err := t.connectMethodForRequest(treq)
684 if err != nil {
685 req.closeBody()
686 return nil, err
687 }
688
689
690
691
692
693 pconn, err := t.getConn(treq, cm)
694 if err != nil {
695 req.closeBody()
696 return nil, err
697 }
698
699 var resp *Response
700 if pconn.alt != nil {
701
702 resp, err = pconn.alt.RoundTrip(req)
703 } else {
704 resp, err = pconn.roundTrip(treq)
705 }
706 if err == nil {
707 if pconn.alt != nil {
708
709
710
711
712
713 cancel(errRequestDone)
714 }
715 resp.Request = origReq
716 return resp, nil
717 }
718
719
720 if http2isNoCachedConnError(err) {
721 if t.removeIdleConn(pconn) {
722 t.decConnsPerHost(pconn.cacheKey)
723 }
724 } else if !pconn.shouldRetryRequest(req, err) {
725
726
727 if e, ok := err.(nothingWrittenError); ok {
728 err = e.error
729 }
730 if e, ok := err.(transportReadFromServerError); ok {
731 err = e.err
732 }
733 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose.Load() {
734
735
736
737 req.closeBody()
738 }
739 return nil, err
740 }
741 testHookRoundTripRetried()
742
743
744 req, err = rewindBody(req)
745 if err != nil {
746 return nil, err
747 }
748 }
749 }
750
751 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
752 select {
753 case <-req.Cancel:
754 cancel(errRequestCanceled)
755 case <-ctx.Done():
756 }
757 }
758
759 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
760
761 type readTrackingBody struct {
762 io.ReadCloser
763 didRead bool
764 didClose atomic.Bool
765 }
766
767 func (r *readTrackingBody) Read(data []byte) (int, error) {
768 r.didRead = true
769 return r.ReadCloser.Read(data)
770 }
771
772 func (r *readTrackingBody) Close() error {
773 if !r.didClose.CompareAndSwap(false, true) {
774 return nil
775 }
776 return r.ReadCloser.Close()
777 }
778
779
780
781
782
783 func setupRewindBody(req *Request) *Request {
784 if req.Body == nil || req.Body == NoBody {
785 return req
786 }
787 newReq := *req
788 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
789 return &newReq
790 }
791
792
793
794
795
796 func rewindBody(req *Request) (rewound *Request, err error) {
797 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose.Load()) {
798 return req, nil
799 }
800 if !req.Body.(*readTrackingBody).didClose.Load() {
801 req.closeBody()
802 }
803 if req.GetBody == nil {
804 return nil, errCannotRewind
805 }
806 body, err := req.GetBody()
807 if err != nil {
808 return nil, err
809 }
810 newReq := *req
811 newReq.Body = &readTrackingBody{ReadCloser: body}
812 return &newReq, nil
813 }
814
815
816
817
818 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
819 if http2isNoCachedConnError(err) {
820
821
822
823
824
825
826 return true
827 }
828 if err == errMissingHost {
829
830 return false
831 }
832 if !pc.isReused() {
833
834
835
836
837
838
839
840 return false
841 }
842 if _, ok := err.(nothingWrittenError); ok {
843
844
845 return req.outgoingLength() == 0 || req.GetBody != nil
846 }
847 if !req.isReplayable() {
848
849 return false
850 }
851 if _, ok := err.(transportReadFromServerError); ok {
852
853
854 return true
855 }
856 if err == errServerClosedIdle {
857
858
859
860 return true
861 }
862 return false
863 }
864
865
866 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
867
868
869
870
871
872
873
874
875
876
877
878 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
879 t.altMu.Lock()
880 defer t.altMu.Unlock()
881 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
882 if _, exists := oldMap[scheme]; exists {
883 panic("protocol " + scheme + " already registered")
884 }
885 newMap := maps.Clone(oldMap)
886 if newMap == nil {
887 newMap = make(map[string]RoundTripper)
888 }
889 newMap[scheme] = rt
890 t.altProto.Store(newMap)
891 }
892
893
894
895
896
897 func (t *Transport) CloseIdleConnections() {
898 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
899 t.idleMu.Lock()
900 m := t.idleConn
901 t.idleConn = nil
902 t.closeIdle = true
903 t.idleLRU = connLRU{}
904 t.idleMu.Unlock()
905 for _, conns := range m {
906 for _, pconn := range conns {
907 pconn.close(errCloseIdleConns)
908 }
909 }
910 t.connsPerHostMu.Lock()
911 t.dialsInProgress.all(func(w *wantConn) {
912 if w.cancelCtx != nil && !w.waiting() {
913 w.cancelCtx()
914 }
915 })
916 t.connsPerHostMu.Unlock()
917 if t2 := t.h2transport; t2 != nil {
918 t2.CloseIdleConnections()
919 }
920 }
921
922
923 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
924
925
926
927
928
929
930 cancel := func(err error) {
931 origCancel(err)
932 t.reqMu.Lock()
933 delete(t.reqCanceler, req)
934 t.reqMu.Unlock()
935 }
936 t.reqMu.Lock()
937 if t.reqCanceler == nil {
938 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
939 }
940 t.reqCanceler[req] = cancel
941 t.reqMu.Unlock()
942 return cancel
943 }
944
945
946
947
948
949
950
951 func (t *Transport) CancelRequest(req *Request) {
952 t.reqMu.Lock()
953 cancel := t.reqCanceler[req]
954 t.reqMu.Unlock()
955 if cancel != nil {
956 cancel(errRequestCanceled)
957 }
958 }
959
960
961
962
963
964 var (
965 envProxyOnce sync.Once
966 envProxyFuncValue func(*url.URL) (*url.URL, error)
967 )
968
969
970
971 func envProxyFunc() func(*url.URL) (*url.URL, error) {
972 envProxyOnce.Do(func() {
973 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
974 })
975 return envProxyFuncValue
976 }
977
978
979 func resetProxyConfig() {
980 envProxyOnce = sync.Once{}
981 envProxyFuncValue = nil
982 }
983
984 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
985 cm.targetScheme = treq.URL.Scheme
986 cm.targetAddr = canonicalAddr(treq.URL)
987 if t.Proxy != nil {
988 cm.proxyURL, err = t.Proxy(treq.Request)
989 }
990 cm.onlyH1 = treq.requiresHTTP1()
991 return cm, err
992 }
993
994
995
996 func (cm *connectMethod) proxyAuth() string {
997 if cm.proxyURL == nil {
998 return ""
999 }
1000 if u := cm.proxyURL.User; u != nil {
1001 username := u.Username()
1002 password, _ := u.Password()
1003 return "Basic " + basicAuth(username, password)
1004 }
1005 return ""
1006 }
1007
1008
1009 var (
1010 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1011 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1012 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1013 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1014 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1015 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1016 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1017 errIdleConnTimeout = errors.New("http: idle connection timeout")
1018
1019
1020
1021
1022
1023 errServerClosedIdle = errors.New("http: server closed idle connection")
1024 )
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034 type transportReadFromServerError struct {
1035 err error
1036 }
1037
1038 func (e transportReadFromServerError) Unwrap() error { return e.err }
1039
1040 func (e transportReadFromServerError) Error() string {
1041 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1042 }
1043
1044 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1045 if err := t.tryPutIdleConn(pconn); err != nil {
1046 pconn.close(err)
1047 }
1048 }
1049
1050 func (t *Transport) maxIdleConnsPerHost() int {
1051 if v := t.MaxIdleConnsPerHost; v != 0 {
1052 return v
1053 }
1054 return DefaultMaxIdleConnsPerHost
1055 }
1056
1057
1058
1059
1060
1061
1062 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1063 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1064 return errKeepAlivesDisabled
1065 }
1066 if pconn.isBroken() {
1067 return errConnBroken
1068 }
1069 pconn.markReused()
1070
1071 t.idleMu.Lock()
1072 defer t.idleMu.Unlock()
1073
1074
1075
1076
1077 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1078 return nil
1079 }
1080
1081
1082
1083
1084
1085 key := pconn.cacheKey
1086 if q, ok := t.idleConnWait[key]; ok {
1087 done := false
1088 if pconn.alt == nil {
1089
1090
1091 for q.len() > 0 {
1092 w := q.popFront()
1093 if w.tryDeliver(pconn, nil, time.Time{}) {
1094 done = true
1095 break
1096 }
1097 }
1098 } else {
1099
1100
1101
1102
1103 for q.len() > 0 {
1104 w := q.popFront()
1105 w.tryDeliver(pconn, nil, time.Time{})
1106 }
1107 }
1108 if q.len() == 0 {
1109 delete(t.idleConnWait, key)
1110 } else {
1111 t.idleConnWait[key] = q
1112 }
1113 if done {
1114 return nil
1115 }
1116 }
1117
1118 if t.closeIdle {
1119 return errCloseIdle
1120 }
1121 if t.idleConn == nil {
1122 t.idleConn = make(map[connectMethodKey][]*persistConn)
1123 }
1124 idles := t.idleConn[key]
1125 if len(idles) >= t.maxIdleConnsPerHost() {
1126 return errTooManyIdleHost
1127 }
1128 for _, exist := range idles {
1129 if exist == pconn {
1130 log.Fatalf("dup idle pconn %p in freelist", pconn)
1131 }
1132 }
1133 t.idleConn[key] = append(idles, pconn)
1134 t.idleLRU.add(pconn)
1135 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1136 oldest := t.idleLRU.removeOldest()
1137 oldest.close(errTooManyIdle)
1138 t.removeIdleConnLocked(oldest)
1139 }
1140
1141
1142
1143
1144 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1145 if pconn.idleTimer != nil {
1146 pconn.idleTimer.Reset(t.IdleConnTimeout)
1147 } else {
1148 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1149 }
1150 }
1151 pconn.idleAt = time.Now()
1152 return nil
1153 }
1154
1155
1156
1157
1158 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1159 if t.DisableKeepAlives {
1160 return false
1161 }
1162
1163 t.idleMu.Lock()
1164 defer t.idleMu.Unlock()
1165
1166
1167
1168 t.closeIdle = false
1169
1170 if w == nil {
1171
1172 return false
1173 }
1174
1175
1176
1177
1178 var oldTime time.Time
1179 if t.IdleConnTimeout > 0 {
1180 oldTime = time.Now().Add(-t.IdleConnTimeout)
1181 }
1182
1183
1184 if list, ok := t.idleConn[w.key]; ok {
1185 stop := false
1186 delivered := false
1187 for len(list) > 0 && !stop {
1188 pconn := list[len(list)-1]
1189
1190
1191
1192
1193 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1194 if tooOld {
1195
1196
1197
1198 go pconn.closeConnIfStillIdle()
1199 }
1200 if pconn.isBroken() || tooOld {
1201
1202
1203
1204
1205
1206 list = list[:len(list)-1]
1207 continue
1208 }
1209 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1210 if delivered {
1211 if pconn.alt != nil {
1212
1213
1214 } else {
1215
1216
1217 t.idleLRU.remove(pconn)
1218 list = list[:len(list)-1]
1219 }
1220 }
1221 stop = true
1222 }
1223 if len(list) > 0 {
1224 t.idleConn[w.key] = list
1225 } else {
1226 delete(t.idleConn, w.key)
1227 }
1228 if stop {
1229 return delivered
1230 }
1231 }
1232
1233
1234 if t.idleConnWait == nil {
1235 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1236 }
1237 q := t.idleConnWait[w.key]
1238 q.cleanFrontNotWaiting()
1239 q.pushBack(w)
1240 t.idleConnWait[w.key] = q
1241 return false
1242 }
1243
1244
1245 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1246 t.idleMu.Lock()
1247 defer t.idleMu.Unlock()
1248 return t.removeIdleConnLocked(pconn)
1249 }
1250
1251
1252 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1253 if pconn.idleTimer != nil {
1254 pconn.idleTimer.Stop()
1255 }
1256 t.idleLRU.remove(pconn)
1257 key := pconn.cacheKey
1258 pconns := t.idleConn[key]
1259 var removed bool
1260 switch len(pconns) {
1261 case 0:
1262
1263 case 1:
1264 if pconns[0] == pconn {
1265 delete(t.idleConn, key)
1266 removed = true
1267 }
1268 default:
1269 for i, v := range pconns {
1270 if v != pconn {
1271 continue
1272 }
1273
1274
1275 copy(pconns[i:], pconns[i+1:])
1276 t.idleConn[key] = pconns[:len(pconns)-1]
1277 removed = true
1278 break
1279 }
1280 }
1281 return removed
1282 }
1283
1284 var zeroDialer net.Dialer
1285
1286 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1287 if t.DialContext != nil {
1288 c, err := t.DialContext(ctx, network, addr)
1289 if c == nil && err == nil {
1290 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1291 }
1292 return c, err
1293 }
1294 if t.Dial != nil {
1295 c, err := t.Dial(network, addr)
1296 if c == nil && err == nil {
1297 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1298 }
1299 return c, err
1300 }
1301 return zeroDialer.DialContext(ctx, network, addr)
1302 }
1303
1304
1305
1306
1307
1308
1309
1310 type wantConn struct {
1311 cm connectMethod
1312 key connectMethodKey
1313
1314
1315
1316
1317 beforeDial func()
1318 afterDial func()
1319
1320 mu sync.Mutex
1321 ctx context.Context
1322 cancelCtx context.CancelFunc
1323 done bool
1324 result chan connOrError
1325 }
1326
1327 type connOrError struct {
1328 pc *persistConn
1329 err error
1330 idleAt time.Time
1331 }
1332
1333
1334 func (w *wantConn) waiting() bool {
1335 w.mu.Lock()
1336 defer w.mu.Unlock()
1337
1338 return !w.done
1339 }
1340
1341
1342 func (w *wantConn) getCtxForDial() context.Context {
1343 w.mu.Lock()
1344 defer w.mu.Unlock()
1345
1346 return w.ctx
1347 }
1348
1349
1350 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1351 w.mu.Lock()
1352 defer w.mu.Unlock()
1353
1354 if w.done {
1355 return false
1356 }
1357 if (pc == nil) == (err == nil) {
1358 panic("net/http: internal error: misuse of tryDeliver")
1359 }
1360 w.ctx = nil
1361 w.done = true
1362
1363 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1364 close(w.result)
1365
1366 return true
1367 }
1368
1369
1370
1371 func (w *wantConn) cancel(t *Transport) {
1372 w.mu.Lock()
1373 var pc *persistConn
1374 if w.done {
1375 if r, ok := <-w.result; ok {
1376 pc = r.pc
1377 }
1378 } else {
1379 close(w.result)
1380 }
1381 w.ctx = nil
1382 w.done = true
1383 w.mu.Unlock()
1384
1385 if pc != nil {
1386 t.putOrCloseIdleConn(pc)
1387 }
1388 }
1389
1390
1391 type wantConnQueue struct {
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402 head []*wantConn
1403 headPos int
1404 tail []*wantConn
1405 }
1406
1407
1408 func (q *wantConnQueue) len() int {
1409 return len(q.head) - q.headPos + len(q.tail)
1410 }
1411
1412
1413 func (q *wantConnQueue) pushBack(w *wantConn) {
1414 q.tail = append(q.tail, w)
1415 }
1416
1417
1418 func (q *wantConnQueue) popFront() *wantConn {
1419 if q.headPos >= len(q.head) {
1420 if len(q.tail) == 0 {
1421 return nil
1422 }
1423
1424 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1425 }
1426 w := q.head[q.headPos]
1427 q.head[q.headPos] = nil
1428 q.headPos++
1429 return w
1430 }
1431
1432
1433 func (q *wantConnQueue) peekFront() *wantConn {
1434 if q.headPos < len(q.head) {
1435 return q.head[q.headPos]
1436 }
1437 if len(q.tail) > 0 {
1438 return q.tail[0]
1439 }
1440 return nil
1441 }
1442
1443
1444
1445 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1446 for {
1447 w := q.peekFront()
1448 if w == nil || w.waiting() {
1449 return cleaned
1450 }
1451 q.popFront()
1452 cleaned = true
1453 }
1454 }
1455
1456
1457 func (q *wantConnQueue) cleanFrontCanceled() {
1458 for {
1459 w := q.peekFront()
1460 if w == nil || w.cancelCtx != nil {
1461 return
1462 }
1463 q.popFront()
1464 }
1465 }
1466
1467
1468
1469 func (q *wantConnQueue) all(f func(*wantConn)) {
1470 for _, w := range q.head[q.headPos:] {
1471 f(w)
1472 }
1473 for _, w := range q.tail {
1474 f(w)
1475 }
1476 }
1477
1478 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1479 if t.DialTLSContext != nil {
1480 conn, err = t.DialTLSContext(ctx, network, addr)
1481 } else {
1482 conn, err = t.DialTLS(network, addr)
1483 }
1484 if conn == nil && err == nil {
1485 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1486 }
1487 return
1488 }
1489
1490
1491
1492
1493
1494 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1495 req := treq.Request
1496 trace := treq.trace
1497 ctx := req.Context()
1498 if trace != nil && trace.GetConn != nil {
1499 trace.GetConn(cm.addr())
1500 }
1501
1502
1503
1504
1505
1506
1507 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1508
1509 w := &wantConn{
1510 cm: cm,
1511 key: cm.key(),
1512 ctx: dialCtx,
1513 cancelCtx: dialCancel,
1514 result: make(chan connOrError, 1),
1515 beforeDial: testHookPrePendingDial,
1516 afterDial: testHookPostPendingDial,
1517 }
1518 defer func() {
1519 if err != nil {
1520 w.cancel(t)
1521 }
1522 }()
1523
1524
1525 if delivered := t.queueForIdleConn(w); !delivered {
1526 t.queueForDial(w)
1527 }
1528
1529
1530 select {
1531 case r := <-w.result:
1532
1533
1534 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1535 info := httptrace.GotConnInfo{
1536 Conn: r.pc.conn,
1537 Reused: r.pc.isReused(),
1538 }
1539 if !r.idleAt.IsZero() {
1540 info.WasIdle = true
1541 info.IdleTime = time.Since(r.idleAt)
1542 }
1543 trace.GotConn(info)
1544 }
1545 if r.err != nil {
1546
1547
1548
1549 select {
1550 case <-treq.ctx.Done():
1551 err := context.Cause(treq.ctx)
1552 if err == errRequestCanceled {
1553 err = errRequestCanceledConn
1554 }
1555 return nil, err
1556 default:
1557
1558 }
1559 }
1560 return r.pc, r.err
1561 case <-treq.ctx.Done():
1562 err := context.Cause(treq.ctx)
1563 if err == errRequestCanceled {
1564 err = errRequestCanceledConn
1565 }
1566 return nil, err
1567 }
1568 }
1569
1570
1571
1572 func (t *Transport) queueForDial(w *wantConn) {
1573 w.beforeDial()
1574
1575 t.connsPerHostMu.Lock()
1576 defer t.connsPerHostMu.Unlock()
1577
1578 if t.MaxConnsPerHost <= 0 {
1579 t.startDialConnForLocked(w)
1580 return
1581 }
1582
1583 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1584 if t.connsPerHost == nil {
1585 t.connsPerHost = make(map[connectMethodKey]int)
1586 }
1587 t.connsPerHost[w.key] = n + 1
1588 t.startDialConnForLocked(w)
1589 return
1590 }
1591
1592 if t.connsPerHostWait == nil {
1593 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1594 }
1595 q := t.connsPerHostWait[w.key]
1596 q.cleanFrontNotWaiting()
1597 q.pushBack(w)
1598 t.connsPerHostWait[w.key] = q
1599 }
1600
1601
1602
1603 func (t *Transport) startDialConnForLocked(w *wantConn) {
1604 t.dialsInProgress.cleanFrontCanceled()
1605 t.dialsInProgress.pushBack(w)
1606 go func() {
1607 t.dialConnFor(w)
1608 t.connsPerHostMu.Lock()
1609 defer t.connsPerHostMu.Unlock()
1610 w.cancelCtx = nil
1611 }()
1612 }
1613
1614
1615
1616
1617 func (t *Transport) dialConnFor(w *wantConn) {
1618 defer w.afterDial()
1619 ctx := w.getCtxForDial()
1620 if ctx == nil {
1621 t.decConnsPerHost(w.key)
1622 return
1623 }
1624
1625 pc, err := t.dialConn(ctx, w.cm)
1626 delivered := w.tryDeliver(pc, err, time.Time{})
1627 if err == nil && (!delivered || pc.alt != nil) {
1628
1629
1630
1631 t.putOrCloseIdleConn(pc)
1632 }
1633 if err != nil {
1634 t.decConnsPerHost(w.key)
1635 }
1636 }
1637
1638
1639
1640 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1641 if t.MaxConnsPerHost <= 0 {
1642 return
1643 }
1644
1645 t.connsPerHostMu.Lock()
1646 defer t.connsPerHostMu.Unlock()
1647 n := t.connsPerHost[key]
1648 if n == 0 {
1649
1650
1651 panic("net/http: internal error: connCount underflow")
1652 }
1653
1654
1655
1656
1657
1658 if q := t.connsPerHostWait[key]; q.len() > 0 {
1659 done := false
1660 for q.len() > 0 {
1661 w := q.popFront()
1662 if w.waiting() {
1663 t.startDialConnForLocked(w)
1664 done = true
1665 break
1666 }
1667 }
1668 if q.len() == 0 {
1669 delete(t.connsPerHostWait, key)
1670 } else {
1671
1672
1673 t.connsPerHostWait[key] = q
1674 }
1675 if done {
1676 return
1677 }
1678 }
1679
1680
1681 if n--; n == 0 {
1682 delete(t.connsPerHost, key)
1683 } else {
1684 t.connsPerHost[key] = n
1685 }
1686 }
1687
1688
1689
1690
1691 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1692
1693 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1694 if cfg.ServerName == "" {
1695 cfg.ServerName = name
1696 }
1697 if pconn.cacheKey.onlyH1 {
1698 cfg.NextProtos = nil
1699 }
1700 plainConn := pconn.conn
1701 tlsConn := tls.Client(plainConn, cfg)
1702 errc := make(chan error, 2)
1703 var timer *time.Timer
1704 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1705 timer = time.AfterFunc(d, func() {
1706 errc <- tlsHandshakeTimeoutError{}
1707 })
1708 }
1709 go func() {
1710 if trace != nil && trace.TLSHandshakeStart != nil {
1711 trace.TLSHandshakeStart()
1712 }
1713 err := tlsConn.HandshakeContext(ctx)
1714 if timer != nil {
1715 timer.Stop()
1716 }
1717 errc <- err
1718 }()
1719 if err := <-errc; err != nil {
1720 plainConn.Close()
1721 if err == (tlsHandshakeTimeoutError{}) {
1722
1723
1724 <-errc
1725 }
1726 if trace != nil && trace.TLSHandshakeDone != nil {
1727 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1728 }
1729 return err
1730 }
1731 cs := tlsConn.ConnectionState()
1732 if trace != nil && trace.TLSHandshakeDone != nil {
1733 trace.TLSHandshakeDone(cs, nil)
1734 }
1735 pconn.tlsState = &cs
1736 pconn.conn = tlsConn
1737 return nil
1738 }
1739
1740 type erringRoundTripper interface {
1741 RoundTripErr() error
1742 }
1743
1744 var testHookProxyConnectTimeout = context.WithTimeout
1745
1746 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1747 pconn = &persistConn{
1748 t: t,
1749 cacheKey: cm.key(),
1750 reqch: make(chan requestAndChan, 1),
1751 writech: make(chan writeRequest, 1),
1752 closech: make(chan struct{}),
1753 writeErrCh: make(chan error, 1),
1754 writeLoopDone: make(chan struct{}),
1755 }
1756 trace := httptrace.ContextClientTrace(ctx)
1757 wrapErr := func(err error) error {
1758 if cm.proxyURL != nil {
1759
1760 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1761 }
1762 return err
1763 }
1764 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1765 var err error
1766 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1767 if err != nil {
1768 return nil, wrapErr(err)
1769 }
1770 if tc, ok := pconn.conn.(*tls.Conn); ok {
1771
1772
1773 if trace != nil && trace.TLSHandshakeStart != nil {
1774 trace.TLSHandshakeStart()
1775 }
1776 if err := tc.HandshakeContext(ctx); err != nil {
1777 go pconn.conn.Close()
1778 if trace != nil && trace.TLSHandshakeDone != nil {
1779 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1780 }
1781 return nil, err
1782 }
1783 cs := tc.ConnectionState()
1784 if trace != nil && trace.TLSHandshakeDone != nil {
1785 trace.TLSHandshakeDone(cs, nil)
1786 }
1787 pconn.tlsState = &cs
1788 }
1789 } else {
1790 conn, err := t.dial(ctx, "tcp", cm.addr())
1791 if err != nil {
1792 return nil, wrapErr(err)
1793 }
1794 pconn.conn = conn
1795 if cm.scheme() == "https" {
1796 var firstTLSHost string
1797 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1798 return nil, wrapErr(err)
1799 }
1800 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1801 return nil, wrapErr(err)
1802 }
1803 }
1804 }
1805
1806
1807 switch {
1808 case cm.proxyURL == nil:
1809
1810 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1811 conn := pconn.conn
1812 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1813 if u := cm.proxyURL.User; u != nil {
1814 auth := &socksUsernamePassword{
1815 Username: u.Username(),
1816 }
1817 auth.Password, _ = u.Password()
1818 d.AuthMethods = []socksAuthMethod{
1819 socksAuthMethodNotRequired,
1820 socksAuthMethodUsernamePassword,
1821 }
1822 d.Authenticate = auth.Authenticate
1823 }
1824 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1825 conn.Close()
1826 return nil, err
1827 }
1828 case cm.targetScheme == "http":
1829 pconn.isProxy = true
1830 if pa := cm.proxyAuth(); pa != "" {
1831 pconn.mutateHeaderFunc = func(h Header) {
1832 h.Set("Proxy-Authorization", pa)
1833 }
1834 }
1835 case cm.targetScheme == "https":
1836 conn := pconn.conn
1837 var hdr Header
1838 if t.GetProxyConnectHeader != nil {
1839 var err error
1840 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1841 if err != nil {
1842 conn.Close()
1843 return nil, err
1844 }
1845 } else {
1846 hdr = t.ProxyConnectHeader
1847 }
1848 if hdr == nil {
1849 hdr = make(Header)
1850 }
1851 if pa := cm.proxyAuth(); pa != "" {
1852 hdr = hdr.Clone()
1853 hdr.Set("Proxy-Authorization", pa)
1854 }
1855 connectReq := &Request{
1856 Method: "CONNECT",
1857 URL: &url.URL{Opaque: cm.targetAddr},
1858 Host: cm.targetAddr,
1859 Header: hdr,
1860 }
1861
1862
1863
1864
1865 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1866 defer cancel()
1867
1868 didReadResponse := make(chan struct{})
1869 var (
1870 resp *Response
1871 err error
1872 )
1873
1874 go func() {
1875 defer close(didReadResponse)
1876 err = connectReq.Write(conn)
1877 if err != nil {
1878 return
1879 }
1880
1881
1882 br := bufio.NewReader(&io.LimitedReader{R: conn, N: t.maxHeaderResponseSize()})
1883 resp, err = ReadResponse(br, connectReq)
1884 }()
1885 select {
1886 case <-connectCtx.Done():
1887 conn.Close()
1888 <-didReadResponse
1889 return nil, connectCtx.Err()
1890 case <-didReadResponse:
1891
1892 }
1893 if err != nil {
1894 conn.Close()
1895 return nil, err
1896 }
1897
1898 if t.OnProxyConnectResponse != nil {
1899 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1900 if err != nil {
1901 conn.Close()
1902 return nil, err
1903 }
1904 }
1905
1906 if resp.StatusCode != 200 {
1907 _, text, ok := strings.Cut(resp.Status, " ")
1908 conn.Close()
1909 if !ok {
1910 return nil, errors.New("unknown status code")
1911 }
1912 return nil, errors.New(text)
1913 }
1914 }
1915
1916 if cm.proxyURL != nil && cm.targetScheme == "https" {
1917 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1918 return nil, err
1919 }
1920 }
1921
1922
1923 unencryptedHTTP2 := pconn.tlsState == nil &&
1924 t.Protocols != nil &&
1925 t.Protocols.UnencryptedHTTP2() &&
1926 !t.Protocols.HTTP1()
1927 if unencryptedHTTP2 {
1928 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1929 if !ok {
1930 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1931 }
1932 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1933 if e, ok := alt.(erringRoundTripper); ok {
1934
1935 return nil, e.RoundTripErr()
1936 }
1937 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1938 }
1939
1940 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1941 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1942 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1943 if e, ok := alt.(erringRoundTripper); ok {
1944
1945 return nil, e.RoundTripErr()
1946 }
1947 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1948 }
1949 }
1950
1951 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1952 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1953
1954 go pconn.readLoop()
1955 go pconn.writeLoop()
1956 return pconn, nil
1957 }
1958
1959
1960
1961
1962
1963
1964
1965 type persistConnWriter struct {
1966 pc *persistConn
1967 }
1968
1969 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1970 n, err = w.pc.conn.Write(p)
1971 w.pc.nwrite += int64(n)
1972 return
1973 }
1974
1975
1976
1977
1978 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1979 n, err = io.Copy(w.pc.conn, r)
1980 w.pc.nwrite += n
1981 return
1982 }
1983
1984 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002 type connectMethod struct {
2003 _ incomparable
2004 proxyURL *url.URL
2005 targetScheme string
2006
2007
2008
2009 targetAddr string
2010 onlyH1 bool
2011 }
2012
2013 func (cm *connectMethod) key() connectMethodKey {
2014 proxyStr := ""
2015 targetAddr := cm.targetAddr
2016 if cm.proxyURL != nil {
2017 proxyStr = cm.proxyURL.String()
2018 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2019 targetAddr = ""
2020 }
2021 }
2022 return connectMethodKey{
2023 proxy: proxyStr,
2024 scheme: cm.targetScheme,
2025 addr: targetAddr,
2026 onlyH1: cm.onlyH1,
2027 }
2028 }
2029
2030
2031 func (cm *connectMethod) scheme() string {
2032 if cm.proxyURL != nil {
2033 return cm.proxyURL.Scheme
2034 }
2035 return cm.targetScheme
2036 }
2037
2038
2039 func (cm *connectMethod) addr() string {
2040 if cm.proxyURL != nil {
2041 return canonicalAddr(cm.proxyURL)
2042 }
2043 return cm.targetAddr
2044 }
2045
2046
2047
2048 func (cm *connectMethod) tlsHost() string {
2049 h := cm.targetAddr
2050 if hasPort(h) {
2051 h = h[:strings.LastIndex(h, ":")]
2052 }
2053 return h
2054 }
2055
2056
2057
2058
2059 type connectMethodKey struct {
2060 proxy, scheme, addr string
2061 onlyH1 bool
2062 }
2063
2064 func (k connectMethodKey) String() string {
2065
2066 var h1 string
2067 if k.onlyH1 {
2068 h1 = ",h1"
2069 }
2070 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2071 }
2072
2073
2074
2075 type persistConn struct {
2076
2077
2078
2079 alt RoundTripper
2080
2081 t *Transport
2082 cacheKey connectMethodKey
2083 conn net.Conn
2084 tlsState *tls.ConnectionState
2085 br *bufio.Reader
2086 bw *bufio.Writer
2087 nwrite int64
2088 reqch chan requestAndChan
2089 writech chan writeRequest
2090 closech chan struct{}
2091 isProxy bool
2092 sawEOF bool
2093 readLimit int64
2094
2095
2096
2097
2098 writeErrCh chan error
2099
2100 writeLoopDone chan struct{}
2101
2102
2103 idleAt time.Time
2104 idleTimer *time.Timer
2105
2106 mu sync.Mutex
2107 numExpectedResponses int
2108 closed error
2109 canceledErr error
2110 broken bool
2111 reused bool
2112
2113
2114
2115 mutateHeaderFunc func(Header)
2116 }
2117
2118 func (pc *persistConn) maxHeaderResponseSize() int64 {
2119 return pc.t.maxHeaderResponseSize()
2120 }
2121
2122 func (pc *persistConn) Read(p []byte) (n int, err error) {
2123 if pc.readLimit <= 0 {
2124 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2125 }
2126 if int64(len(p)) > pc.readLimit {
2127 p = p[:pc.readLimit]
2128 }
2129 n, err = pc.conn.Read(p)
2130 if err == io.EOF {
2131 pc.sawEOF = true
2132 }
2133 pc.readLimit -= int64(n)
2134 return
2135 }
2136
2137
2138 func (pc *persistConn) isBroken() bool {
2139 pc.mu.Lock()
2140 b := pc.closed != nil
2141 pc.mu.Unlock()
2142 return b
2143 }
2144
2145
2146
2147 func (pc *persistConn) canceled() error {
2148 pc.mu.Lock()
2149 defer pc.mu.Unlock()
2150 return pc.canceledErr
2151 }
2152
2153
2154 func (pc *persistConn) isReused() bool {
2155 pc.mu.Lock()
2156 r := pc.reused
2157 pc.mu.Unlock()
2158 return r
2159 }
2160
2161 func (pc *persistConn) cancelRequest(err error) {
2162 pc.mu.Lock()
2163 defer pc.mu.Unlock()
2164 pc.canceledErr = err
2165 pc.closeLocked(errRequestCanceled)
2166 }
2167
2168
2169
2170
2171 func (pc *persistConn) closeConnIfStillIdle() {
2172 t := pc.t
2173 t.idleMu.Lock()
2174 defer t.idleMu.Unlock()
2175 if _, ok := t.idleLRU.m[pc]; !ok {
2176
2177 return
2178 }
2179 t.removeIdleConnLocked(pc)
2180 pc.close(errIdleConnTimeout)
2181 }
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2192 if err == nil {
2193 return nil
2194 }
2195
2196
2197
2198
2199
2200
2201
2202
2203 <-pc.writeLoopDone
2204
2205
2206
2207
2208 if cerr := pc.canceled(); cerr != nil {
2209 return cerr
2210 }
2211
2212
2213 req.mu.Lock()
2214 reqErr := req.err
2215 req.mu.Unlock()
2216 if reqErr != nil {
2217 return reqErr
2218 }
2219
2220 if err == errServerClosedIdle {
2221
2222 return err
2223 }
2224
2225 if _, ok := err.(transportReadFromServerError); ok {
2226 if pc.nwrite == startBytesWritten {
2227 return nothingWrittenError{err}
2228 }
2229
2230 return err
2231 }
2232 if pc.isBroken() {
2233 if pc.nwrite == startBytesWritten {
2234 return nothingWrittenError{err}
2235 }
2236 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2237 }
2238 return err
2239 }
2240
2241
2242
2243
2244 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2245
2246 func (pc *persistConn) readLoop() {
2247 closeErr := errReadLoopExiting
2248 defer func() {
2249 pc.close(closeErr)
2250 pc.t.removeIdleConn(pc)
2251 }()
2252
2253 tryPutIdleConn := func(treq *transportRequest) bool {
2254 trace := treq.trace
2255 if err := pc.t.tryPutIdleConn(pc); err != nil {
2256 closeErr = err
2257 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2258 trace.PutIdleConn(err)
2259 }
2260 return false
2261 }
2262 if trace != nil && trace.PutIdleConn != nil {
2263 trace.PutIdleConn(nil)
2264 }
2265 return true
2266 }
2267
2268
2269
2270
2271 eofc := make(chan struct{})
2272 defer close(eofc)
2273
2274
2275 testHookMu.Lock()
2276 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2277 testHookMu.Unlock()
2278
2279 alive := true
2280 for alive {
2281 pc.readLimit = pc.maxHeaderResponseSize()
2282 _, err := pc.br.Peek(1)
2283
2284 pc.mu.Lock()
2285 if pc.numExpectedResponses == 0 {
2286 pc.readLoopPeekFailLocked(err)
2287 pc.mu.Unlock()
2288 return
2289 }
2290 pc.mu.Unlock()
2291
2292 rc := <-pc.reqch
2293 trace := rc.treq.trace
2294
2295 var resp *Response
2296 if err == nil {
2297 resp, err = pc.readResponse(rc, trace)
2298 } else {
2299 err = transportReadFromServerError{err}
2300 closeErr = err
2301 }
2302
2303 if err != nil {
2304 if pc.readLimit <= 0 {
2305 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2306 }
2307
2308 select {
2309 case rc.ch <- responseAndError{err: err}:
2310 case <-rc.callerGone:
2311 return
2312 }
2313 return
2314 }
2315 pc.readLimit = maxInt64
2316
2317 pc.mu.Lock()
2318 pc.numExpectedResponses--
2319 pc.mu.Unlock()
2320
2321 bodyWritable := resp.bodyIsWritable()
2322 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2323
2324 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2325
2326
2327
2328 alive = false
2329 }
2330
2331 if !hasBody || bodyWritable {
2332
2333
2334
2335
2336
2337 alive = alive &&
2338 !pc.sawEOF &&
2339 pc.wroteRequest() &&
2340 tryPutIdleConn(rc.treq)
2341
2342 if bodyWritable {
2343 closeErr = errCallerOwnsConn
2344 }
2345
2346 select {
2347 case rc.ch <- responseAndError{res: resp}:
2348 case <-rc.callerGone:
2349 return
2350 }
2351
2352 rc.treq.cancel(errRequestDone)
2353
2354
2355
2356
2357 testHookReadLoopBeforeNextRead()
2358 continue
2359 }
2360
2361 waitForBodyRead := make(chan bool, 2)
2362 body := &bodyEOFSignal{
2363 body: resp.Body,
2364 earlyCloseFn: func() error {
2365 waitForBodyRead <- false
2366 <-eofc
2367 return nil
2368
2369 },
2370 fn: func(err error) error {
2371 isEOF := err == io.EOF
2372 waitForBodyRead <- isEOF
2373 if isEOF {
2374 <-eofc
2375 } else if err != nil {
2376 if cerr := pc.canceled(); cerr != nil {
2377 return cerr
2378 }
2379 }
2380 return err
2381 },
2382 }
2383
2384 resp.Body = body
2385 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2386 resp.Body = &gzipReader{body: body}
2387 resp.Header.Del("Content-Encoding")
2388 resp.Header.Del("Content-Length")
2389 resp.ContentLength = -1
2390 resp.Uncompressed = true
2391 }
2392
2393 select {
2394 case rc.ch <- responseAndError{res: resp}:
2395 case <-rc.callerGone:
2396 return
2397 }
2398
2399
2400
2401
2402 select {
2403 case bodyEOF := <-waitForBodyRead:
2404 alive = alive &&
2405 bodyEOF &&
2406 !pc.sawEOF &&
2407 pc.wroteRequest() &&
2408 tryPutIdleConn(rc.treq)
2409 if bodyEOF {
2410 eofc <- struct{}{}
2411 }
2412 case <-rc.treq.ctx.Done():
2413 alive = false
2414 pc.cancelRequest(context.Cause(rc.treq.ctx))
2415 case <-pc.closech:
2416 alive = false
2417 }
2418
2419 rc.treq.cancel(errRequestDone)
2420 testHookReadLoopBeforeNextRead()
2421 }
2422 }
2423
2424 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2425 if pc.closed != nil {
2426 return
2427 }
2428 if n := pc.br.Buffered(); n > 0 {
2429 buf, _ := pc.br.Peek(n)
2430 if is408Message(buf) {
2431 pc.closeLocked(errServerClosedIdle)
2432 return
2433 } else {
2434 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2435 }
2436 }
2437 if peekErr == io.EOF {
2438
2439 pc.closeLocked(errServerClosedIdle)
2440 } else {
2441 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2442 }
2443 }
2444
2445
2446
2447
2448 func is408Message(buf []byte) bool {
2449 if len(buf) < len("HTTP/1.x 408") {
2450 return false
2451 }
2452 if string(buf[:7]) != "HTTP/1." {
2453 return false
2454 }
2455 return string(buf[8:12]) == " 408"
2456 }
2457
2458
2459
2460
2461 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2462 if trace != nil && trace.GotFirstResponseByte != nil {
2463 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2464 trace.GotFirstResponseByte()
2465 }
2466 }
2467
2468 continueCh := rc.continueCh
2469 for {
2470 resp, err = ReadResponse(pc.br, rc.treq.Request)
2471 if err != nil {
2472 return
2473 }
2474 resCode := resp.StatusCode
2475 if continueCh != nil && resCode == StatusContinue {
2476 if trace != nil && trace.Got100Continue != nil {
2477 trace.Got100Continue()
2478 }
2479 continueCh <- struct{}{}
2480 continueCh = nil
2481 }
2482 is1xx := 100 <= resCode && resCode <= 199
2483
2484 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2485 if is1xxNonTerminal {
2486 if trace != nil && trace.Got1xxResponse != nil {
2487 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2488 return nil, err
2489 }
2490
2491
2492
2493
2494
2495
2496
2497 pc.readLimit = pc.maxHeaderResponseSize()
2498 }
2499 continue
2500 }
2501 break
2502 }
2503 if resp.isProtocolSwitch() {
2504 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2505 }
2506 if continueCh != nil {
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519 if resp.Close || rc.treq.Request.Close {
2520 close(continueCh)
2521 } else {
2522 continueCh <- struct{}{}
2523 }
2524 }
2525
2526 resp.TLS = pc.tlsState
2527 return
2528 }
2529
2530
2531
2532
2533 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2534 if continueCh == nil {
2535 return nil
2536 }
2537 return func() bool {
2538 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2539 defer timer.Stop()
2540
2541 select {
2542 case _, ok := <-continueCh:
2543 return ok
2544 case <-timer.C:
2545 return true
2546 case <-pc.closech:
2547 return false
2548 }
2549 }
2550 }
2551
2552 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2553 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2554 if br.Buffered() != 0 {
2555 body.br = br
2556 }
2557 return body
2558 }
2559
2560
2561
2562
2563
2564
2565 type readWriteCloserBody struct {
2566 _ incomparable
2567 br *bufio.Reader
2568 io.ReadWriteCloser
2569 }
2570
2571 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2572 if b.br != nil {
2573 if n := b.br.Buffered(); len(p) > n {
2574 p = p[:n]
2575 }
2576 n, err = b.br.Read(p)
2577 if b.br.Buffered() == 0 {
2578 b.br = nil
2579 }
2580 return n, err
2581 }
2582 return b.ReadWriteCloser.Read(p)
2583 }
2584
2585 func (b *readWriteCloserBody) CloseWrite() error {
2586 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2587 return cw.CloseWrite()
2588 }
2589 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2590 }
2591
2592
2593 type nothingWrittenError struct {
2594 error
2595 }
2596
2597 func (nwe nothingWrittenError) Unwrap() error {
2598 return nwe.error
2599 }
2600
2601 func (pc *persistConn) writeLoop() {
2602 defer close(pc.writeLoopDone)
2603 for {
2604 select {
2605 case wr := <-pc.writech:
2606 startBytesWritten := pc.nwrite
2607 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2608 if bre, ok := err.(requestBodyReadError); ok {
2609 err = bre.error
2610
2611
2612
2613
2614
2615
2616
2617 wr.req.setError(err)
2618 }
2619 if err == nil {
2620 err = pc.bw.Flush()
2621 }
2622 if err != nil {
2623 if pc.nwrite == startBytesWritten {
2624 err = nothingWrittenError{err}
2625 }
2626 }
2627 pc.writeErrCh <- err
2628 wr.ch <- err
2629 if err != nil {
2630 pc.close(err)
2631 return
2632 }
2633 case <-pc.closech:
2634 return
2635 }
2636 }
2637 }
2638
2639
2640
2641
2642
2643
2644
2645 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2646
2647
2648
2649 func (pc *persistConn) wroteRequest() bool {
2650 select {
2651 case err := <-pc.writeErrCh:
2652
2653
2654 return err == nil
2655 default:
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2667 defer t.Stop()
2668 select {
2669 case err := <-pc.writeErrCh:
2670 return err == nil
2671 case <-t.C:
2672 return false
2673 }
2674 }
2675 }
2676
2677
2678
2679 type responseAndError struct {
2680 _ incomparable
2681 res *Response
2682 err error
2683 }
2684
2685 type requestAndChan struct {
2686 _ incomparable
2687 treq *transportRequest
2688 ch chan responseAndError
2689
2690
2691
2692
2693 addedGzip bool
2694
2695
2696
2697
2698
2699 continueCh chan<- struct{}
2700
2701 callerGone <-chan struct{}
2702 }
2703
2704
2705
2706
2707
2708 type writeRequest struct {
2709 req *transportRequest
2710 ch chan<- error
2711
2712
2713
2714
2715 continueCh <-chan struct{}
2716 }
2717
2718
2719
2720 type timeoutError struct {
2721 err string
2722 }
2723
2724 func (e *timeoutError) Error() string { return e.err }
2725 func (e *timeoutError) Timeout() bool { return true }
2726 func (e *timeoutError) Temporary() bool { return true }
2727 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2728
2729 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2730
2731
2732
2733 var errRequestCanceled = http2errRequestCanceled
2734 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2735
2736
2737
2738 var errRequestDone = errors.New("net/http: request completed")
2739
2740 func nop() {}
2741
2742
2743 var (
2744 testHookEnterRoundTrip = nop
2745 testHookWaitResLoop = nop
2746 testHookRoundTripRetried = nop
2747 testHookPrePendingDial = nop
2748 testHookPostPendingDial = nop
2749
2750 testHookMu sync.Locker = fakeLocker{}
2751 testHookReadLoopBeforeNextRead = nop
2752 )
2753
2754 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2755 testHookEnterRoundTrip()
2756 pc.mu.Lock()
2757 pc.numExpectedResponses++
2758 headerFn := pc.mutateHeaderFunc
2759 pc.mu.Unlock()
2760
2761 if headerFn != nil {
2762 headerFn(req.extraHeaders())
2763 }
2764
2765
2766
2767
2768
2769 requestedGzip := false
2770 if !pc.t.DisableCompression &&
2771 req.Header.Get("Accept-Encoding") == "" &&
2772 req.Header.Get("Range") == "" &&
2773 req.Method != "HEAD" {
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786 requestedGzip = true
2787 req.extraHeaders().Set("Accept-Encoding", "gzip")
2788 }
2789
2790 var continueCh chan struct{}
2791 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2792 continueCh = make(chan struct{}, 1)
2793 }
2794
2795 if pc.t.DisableKeepAlives &&
2796 !req.wantsClose() &&
2797 !isProtocolSwitchHeader(req.Header) {
2798 req.extraHeaders().Set("Connection", "close")
2799 }
2800
2801 gone := make(chan struct{})
2802 defer close(gone)
2803
2804 const debugRoundTrip = false
2805
2806
2807
2808
2809 startBytesWritten := pc.nwrite
2810 writeErrCh := make(chan error, 1)
2811 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2812
2813 resc := make(chan responseAndError)
2814 pc.reqch <- requestAndChan{
2815 treq: req,
2816 ch: resc,
2817 addedGzip: requestedGzip,
2818 continueCh: continueCh,
2819 callerGone: gone,
2820 }
2821
2822 handleResponse := func(re responseAndError) (*Response, error) {
2823 if (re.res == nil) == (re.err == nil) {
2824 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2825 }
2826 if debugRoundTrip {
2827 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2828 }
2829 if re.err != nil {
2830 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2831 }
2832 return re.res, nil
2833 }
2834
2835 var respHeaderTimer <-chan time.Time
2836 ctxDoneChan := req.ctx.Done()
2837 pcClosed := pc.closech
2838 for {
2839 testHookWaitResLoop()
2840 select {
2841 case err := <-writeErrCh:
2842 if debugRoundTrip {
2843 req.logf("writeErrCh recv: %T/%#v", err, err)
2844 }
2845 if err != nil {
2846 pc.close(fmt.Errorf("write error: %w", err))
2847 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2848 }
2849 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2850 if debugRoundTrip {
2851 req.logf("starting timer for %v", d)
2852 }
2853 timer := time.NewTimer(d)
2854 defer timer.Stop()
2855 respHeaderTimer = timer.C
2856 }
2857 case <-pcClosed:
2858 select {
2859 case re := <-resc:
2860
2861
2862
2863 return handleResponse(re)
2864 default:
2865 }
2866 if debugRoundTrip {
2867 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2868 }
2869 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2870 case <-respHeaderTimer:
2871 if debugRoundTrip {
2872 req.logf("timeout waiting for response headers.")
2873 }
2874 pc.close(errTimeout)
2875 return nil, errTimeout
2876 case re := <-resc:
2877 return handleResponse(re)
2878 case <-ctxDoneChan:
2879 select {
2880 case re := <-resc:
2881
2882
2883
2884 return handleResponse(re)
2885 default:
2886 }
2887 pc.cancelRequest(context.Cause(req.ctx))
2888 }
2889 }
2890 }
2891
2892
2893
2894 type tLogKey struct{}
2895
2896 func (tr *transportRequest) logf(format string, args ...any) {
2897 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2898 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2899 }
2900 }
2901
2902
2903
2904 func (pc *persistConn) markReused() {
2905 pc.mu.Lock()
2906 pc.reused = true
2907 pc.mu.Unlock()
2908 }
2909
2910
2911
2912
2913
2914
2915 func (pc *persistConn) close(err error) {
2916 pc.mu.Lock()
2917 defer pc.mu.Unlock()
2918 pc.closeLocked(err)
2919 }
2920
2921 func (pc *persistConn) closeLocked(err error) {
2922 if err == nil {
2923 panic("nil error")
2924 }
2925 pc.broken = true
2926 if pc.closed == nil {
2927 pc.closed = err
2928 pc.t.decConnsPerHost(pc.cacheKey)
2929
2930
2931 if pc.alt == nil {
2932 if err != errCallerOwnsConn {
2933 pc.conn.Close()
2934 }
2935 close(pc.closech)
2936 }
2937 }
2938 pc.mutateHeaderFunc = nil
2939 }
2940
2941 func schemePort(scheme string) string {
2942 switch scheme {
2943 case "http":
2944 return "80"
2945 case "https":
2946 return "443"
2947 case "socks5", "socks5h":
2948 return "1080"
2949 default:
2950 return ""
2951 }
2952 }
2953
2954 func idnaASCIIFromURL(url *url.URL) string {
2955 addr := url.Hostname()
2956 if v, err := idnaASCII(addr); err == nil {
2957 addr = v
2958 }
2959 return addr
2960 }
2961
2962
2963 func canonicalAddr(url *url.URL) string {
2964 port := url.Port()
2965 if port == "" {
2966 port = schemePort(url.Scheme)
2967 }
2968 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2969 }
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982 type bodyEOFSignal struct {
2983 body io.ReadCloser
2984 mu sync.Mutex
2985 closed bool
2986 rerr error
2987 fn func(error) error
2988 earlyCloseFn func() error
2989 }
2990
2991 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2992 var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
2993
2994 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2995 es.mu.Lock()
2996 closed, rerr := es.closed, es.rerr
2997 es.mu.Unlock()
2998 if closed {
2999 return 0, errReadOnClosedResBody
3000 }
3001 if rerr != nil {
3002 return 0, rerr
3003 }
3004
3005 n, err = es.body.Read(p)
3006 if err != nil {
3007 es.mu.Lock()
3008 defer es.mu.Unlock()
3009 if es.rerr == nil {
3010 es.rerr = err
3011 }
3012 err = es.condfn(err)
3013 }
3014 return
3015 }
3016
3017 func (es *bodyEOFSignal) Close() error {
3018 es.mu.Lock()
3019 defer es.mu.Unlock()
3020 if es.closed {
3021 return nil
3022 }
3023 es.closed = true
3024 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3025 return es.earlyCloseFn()
3026 }
3027 err := es.body.Close()
3028 return es.condfn(err)
3029 }
3030
3031
3032 func (es *bodyEOFSignal) condfn(err error) error {
3033 if es.fn == nil {
3034 return err
3035 }
3036 err = es.fn(err)
3037 es.fn = nil
3038 return err
3039 }
3040
3041
3042
3043
3044
3045 type gzipReader struct {
3046 _ incomparable
3047 body *bodyEOFSignal
3048 mu sync.Mutex
3049 zr *gzip.Reader
3050 zerr error
3051 }
3052
3053 type eofReader struct{}
3054
3055 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3056 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3057
3058 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3059
3060
3061 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3062 zr := gzipPool.Get().(*gzip.Reader)
3063 if err := zr.Reset(r); err != nil {
3064 gzipPoolPut(zr)
3065 return nil, err
3066 }
3067 return zr, nil
3068 }
3069
3070
3071 func gzipPoolPut(zr *gzip.Reader) {
3072
3073
3074 var r flate.Reader = eofReader{}
3075 zr.Reset(r)
3076 gzipPool.Put(zr)
3077 }
3078
3079
3080
3081 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3082 gz.mu.Lock()
3083 defer gz.mu.Unlock()
3084 if gz.zerr != nil {
3085 return nil, gz.zerr
3086 }
3087 if gz.zr == nil {
3088 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3089 if gz.zerr != nil {
3090 return nil, gz.zerr
3091 }
3092 }
3093 ret := gz.zr
3094 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3095 return ret, nil
3096 }
3097
3098
3099 func (gz *gzipReader) release(zr *gzip.Reader) {
3100 gz.mu.Lock()
3101 defer gz.mu.Unlock()
3102 if gz.zerr == errConcurrentReadOnResBody {
3103 gz.zr, gz.zerr = zr, nil
3104 } else {
3105 gzipPoolPut(zr)
3106 }
3107 }
3108
3109
3110
3111 func (gz *gzipReader) close() {
3112 gz.mu.Lock()
3113 defer gz.mu.Unlock()
3114 if gz.zerr == nil && gz.zr != nil {
3115 gzipPoolPut(gz.zr)
3116 gz.zr = nil
3117 }
3118 gz.zerr = errReadOnClosedResBody
3119 }
3120
3121 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3122 zr, err := gz.acquire()
3123 if err != nil {
3124 return 0, err
3125 }
3126 defer gz.release(zr)
3127
3128 return zr.Read(p)
3129 }
3130
3131 func (gz *gzipReader) Close() error {
3132 gz.close()
3133
3134 return gz.body.Close()
3135 }
3136
3137 type tlsHandshakeTimeoutError struct{}
3138
3139 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3140 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3141 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3142
3143
3144
3145
3146 type fakeLocker struct{}
3147
3148 func (fakeLocker) Lock() {}
3149 func (fakeLocker) Unlock() {}
3150
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3165 if cfg == nil {
3166 return &tls.Config{}
3167 }
3168 return cfg.Clone()
3169 }
3170
3171 type connLRU struct {
3172 ll *list.List
3173 m map[*persistConn]*list.Element
3174 }
3175
3176
3177 func (cl *connLRU) add(pc *persistConn) {
3178 if cl.ll == nil {
3179 cl.ll = list.New()
3180 cl.m = make(map[*persistConn]*list.Element)
3181 }
3182 ele := cl.ll.PushFront(pc)
3183 if _, ok := cl.m[pc]; ok {
3184 panic("persistConn was already in LRU")
3185 }
3186 cl.m[pc] = ele
3187 }
3188
3189 func (cl *connLRU) removeOldest() *persistConn {
3190 ele := cl.ll.Back()
3191 pc := ele.Value.(*persistConn)
3192 cl.ll.Remove(ele)
3193 delete(cl.m, pc)
3194 return pc
3195 }
3196
3197
3198 func (cl *connLRU) remove(pc *persistConn) {
3199 if ele, ok := cl.m[pc]; ok {
3200 cl.ll.Remove(ele)
3201 delete(cl.m, pc)
3202 }
3203 }
3204
3205
3206 func (cl *connLRU) len() int {
3207 return len(cl.m)
3208 }
3209
View as plain text