Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "maps"
24 "net"
25 "net/http/httptrace"
26 "net/http/internal/ascii"
27 "net/textproto"
28 "net/url"
29 "reflect"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34 _ "unsafe"
35
36 "golang.org/x/net/http/httpguts"
37 "golang.org/x/net/http/httpproxy"
38 )
39
40
41
42
43
44
45 var DefaultTransport RoundTripper = &Transport{
46 Proxy: ProxyFromEnvironment,
47 DialContext: defaultTransportDialContext(&net.Dialer{
48 Timeout: 30 * time.Second,
49 KeepAlive: 30 * time.Second,
50 }),
51 ForceAttemptHTTP2: true,
52 MaxIdleConns: 100,
53 IdleConnTimeout: 90 * time.Second,
54 TLSHandshakeTimeout: 10 * time.Second,
55 ExpectContinueTimeout: 1 * time.Second,
56 }
57
58
59
60 const DefaultMaxIdleConnsPerHost = 2
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 type Transport struct {
97 idleMu sync.Mutex
98 closeIdle bool
99 idleConn map[connectMethodKey][]*persistConn
100 idleConnWait map[connectMethodKey]wantConnQueue
101 idleLRU connLRU
102
103 reqMu sync.Mutex
104 reqCanceler map[*Request]context.CancelCauseFunc
105
106 altMu sync.Mutex
107 altProto atomic.Value
108
109 connsPerHostMu sync.Mutex
110 connsPerHost map[connectMethodKey]int
111 connsPerHostWait map[connectMethodKey]wantConnQueue
112 dialsInProgress wantConnQueue
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 Proxy func(*Request) (*url.URL, error)
129
130
131
132
133 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
134
135
136
137
138
139
140
141
142
143 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 Dial func(network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163
164
165
166
167 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
168
169
170
171
172
173
174
175 DialTLS func(network, addr string) (net.Conn, error)
176
177
178
179
180
181 TLSClientConfig *tls.Config
182
183
184
185 TLSHandshakeTimeout time.Duration
186
187
188
189
190
191
192 DisableKeepAlives bool
193
194
195
196
197
198
199
200
201
202 DisableCompression bool
203
204
205
206 MaxIdleConns int
207
208
209
210
211 MaxIdleConnsPerHost int
212
213
214
215
216
217
218 MaxConnsPerHost int
219
220
221
222
223
224 IdleConnTimeout time.Duration
225
226
227
228
229
230 ResponseHeaderTimeout time.Duration
231
232
233
234
235
236
237
238
239 ExpectContinueTimeout time.Duration
240
241
242
243
244
245
246
247
248
249
250
251 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
252
253
254
255
256 ProxyConnectHeader Header
257
258
259
260
261
262
263
264
265 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
266
267
268
269
270
271
272 MaxResponseHeaderBytes int64
273
274
275
276
277 WriteBufferSize int
278
279
280
281
282 ReadBufferSize int
283
284
285
286 nextProtoOnce sync.Once
287 h2transport h2Transport
288 tlsNextProtoWasNil bool
289
290
291
292
293
294
295 ForceAttemptHTTP2 bool
296
297
298
299
300
301 HTTP2 *HTTP2Config
302
303
304
305
306
307
308 Protocols *Protocols
309 }
310
311 func (t *Transport) writeBufferSize() int {
312 if t.WriteBufferSize > 0 {
313 return t.WriteBufferSize
314 }
315 return 4 << 10
316 }
317
318 func (t *Transport) readBufferSize() int {
319 if t.ReadBufferSize > 0 {
320 return t.ReadBufferSize
321 }
322 return 4 << 10
323 }
324
325
326 func (t *Transport) Clone() *Transport {
327 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
328 t2 := &Transport{
329 Proxy: t.Proxy,
330 OnProxyConnectResponse: t.OnProxyConnectResponse,
331 DialContext: t.DialContext,
332 Dial: t.Dial,
333 DialTLS: t.DialTLS,
334 DialTLSContext: t.DialTLSContext,
335 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
336 DisableKeepAlives: t.DisableKeepAlives,
337 DisableCompression: t.DisableCompression,
338 MaxIdleConns: t.MaxIdleConns,
339 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
340 MaxConnsPerHost: t.MaxConnsPerHost,
341 IdleConnTimeout: t.IdleConnTimeout,
342 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
343 ExpectContinueTimeout: t.ExpectContinueTimeout,
344 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
345 GetProxyConnectHeader: t.GetProxyConnectHeader,
346 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
347 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
348 WriteBufferSize: t.WriteBufferSize,
349 ReadBufferSize: t.ReadBufferSize,
350 }
351 if t.TLSClientConfig != nil {
352 t2.TLSClientConfig = t.TLSClientConfig.Clone()
353 }
354 if t.HTTP2 != nil {
355 t2.HTTP2 = &HTTP2Config{}
356 *t2.HTTP2 = *t.HTTP2
357 }
358 if t.Protocols != nil {
359 t2.Protocols = &Protocols{}
360 *t2.Protocols = *t.Protocols
361 }
362 if !t.tlsNextProtoWasNil {
363 npm := maps.Clone(t.TLSNextProto)
364 if npm == nil {
365 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
366 }
367 t2.TLSNextProto = npm
368 }
369 return t2
370 }
371
372
373
374
375
376
377
378 type h2Transport interface {
379 CloseIdleConnections()
380 }
381
382 func (t *Transport) hasCustomTLSDialer() bool {
383 return t.DialTLS != nil || t.DialTLSContext != nil
384 }
385
386 var http2client = godebug.New("http2client")
387
388
389
390 func (t *Transport) onceSetNextProtoDefaults() {
391 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
392 if http2client.Value() == "0" {
393 http2client.IncNonDefault()
394 return
395 }
396
397
398
399
400
401
402 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
403 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
404 if v := rv.Field(0); v.CanInterface() {
405 if h2i, ok := v.Interface().(h2Transport); ok {
406 t.h2transport = h2i
407 return
408 }
409 }
410 }
411
412 protocols := t.protocols()
413 if !protocols.HTTP2() {
414 return
415 }
416 if omitBundledHTTP2 {
417 return
418 }
419 t2, err := http2configureTransports(t)
420 if err != nil {
421 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
422 return
423 }
424 t.h2transport = t2
425
426
427
428
429
430
431
432 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
433 const h2max = 1<<32 - 1
434 if limit1 >= h2max {
435 t2.MaxHeaderListSize = h2max
436 } else {
437 t2.MaxHeaderListSize = uint32(limit1)
438 }
439 }
440
441
442
443
444
445
446 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
447 }
448
449 func (t *Transport) protocols() Protocols {
450 if t.Protocols != nil {
451 return *t.Protocols
452 }
453 var p Protocols
454 p.SetHTTP1(true)
455 switch {
456 case t.TLSNextProto != nil:
457
458
459 if t.TLSNextProto["h2"] != nil {
460 p.SetHTTP2(true)
461 }
462 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
463
464
465
466
467
468
469 case http2client.Value() == "0":
470 default:
471 p.SetHTTP2(true)
472 }
473 return p
474 }
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
493 return envProxyFunc()(req.URL)
494 }
495
496
497
498 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
499 return func(*Request) (*url.URL, error) {
500 return fixedURL, nil
501 }
502 }
503
504
505
506
507 type transportRequest struct {
508 *Request
509 extra Header
510 trace *httptrace.ClientTrace
511
512 ctx context.Context
513 cancel context.CancelCauseFunc
514
515 mu sync.Mutex
516 err error
517 }
518
519 func (tr *transportRequest) extraHeaders() Header {
520 if tr.extra == nil {
521 tr.extra = make(Header)
522 }
523 return tr.extra
524 }
525
526 func (tr *transportRequest) setError(err error) {
527 tr.mu.Lock()
528 if tr.err == nil {
529 tr.err = err
530 }
531 tr.mu.Unlock()
532 }
533
534
535
536 func (t *Transport) useRegisteredProtocol(req *Request) bool {
537 if req.URL.Scheme == "https" && req.requiresHTTP1() {
538
539
540
541
542 return false
543 }
544 return true
545 }
546
547
548
549
550 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
551 if !t.useRegisteredProtocol(req) {
552 return nil
553 }
554 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
555 return altProto[req.URL.Scheme]
556 }
557
558 func validateHeaders(hdrs Header) string {
559 for k, vv := range hdrs {
560 if !httpguts.ValidHeaderFieldName(k) {
561 return fmt.Sprintf("field name %q", k)
562 }
563 for _, v := range vv {
564 if !httpguts.ValidHeaderFieldValue(v) {
565
566
567 return fmt.Sprintf("field value for %q", k)
568 }
569 }
570 }
571 return ""
572 }
573
574
575 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
576 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
577 ctx := req.Context()
578 trace := httptrace.ContextClientTrace(ctx)
579
580 if req.URL == nil {
581 req.closeBody()
582 return nil, errors.New("http: nil Request.URL")
583 }
584 if req.Header == nil {
585 req.closeBody()
586 return nil, errors.New("http: nil Request.Header")
587 }
588 scheme := req.URL.Scheme
589 isHTTP := scheme == "http" || scheme == "https"
590 if isHTTP {
591
592 if err := validateHeaders(req.Header); err != "" {
593 req.closeBody()
594 return nil, fmt.Errorf("net/http: invalid header %s", err)
595 }
596
597
598 if err := validateHeaders(req.Trailer); err != "" {
599 req.closeBody()
600 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
601 }
602 }
603
604 origReq := req
605 req = setupRewindBody(req)
606
607 if altRT := t.alternateRoundTripper(req); altRT != nil {
608 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
609 return resp, err
610 }
611 var err error
612 req, err = rewindBody(req)
613 if err != nil {
614 return nil, err
615 }
616 }
617 if !isHTTP {
618 req.closeBody()
619 return nil, badStringError("unsupported protocol scheme", scheme)
620 }
621 if req.Method != "" && !validMethod(req.Method) {
622 req.closeBody()
623 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
624 }
625 if req.URL.Host == "" {
626 req.closeBody()
627 return nil, errors.New("http: no Host in request URL")
628 }
629
630
631
632
633
634
635
636
637
638
639 ctx, cancel := context.WithCancelCause(req.Context())
640
641
642 if origReq.Cancel != nil {
643 go awaitLegacyCancel(ctx, cancel, origReq)
644 }
645
646
647
648
649
650 cancel = t.prepareTransportCancel(origReq, cancel)
651
652 defer func() {
653 if err != nil {
654 cancel(err)
655 }
656 }()
657
658 for {
659 select {
660 case <-ctx.Done():
661 req.closeBody()
662 return nil, context.Cause(ctx)
663 default:
664 }
665
666
667 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
668 cm, err := t.connectMethodForRequest(treq)
669 if err != nil {
670 req.closeBody()
671 return nil, err
672 }
673
674
675
676
677
678 pconn, err := t.getConn(treq, cm)
679 if err != nil {
680 req.closeBody()
681 return nil, err
682 }
683
684 var resp *Response
685 if pconn.alt != nil {
686
687 resp, err = pconn.alt.RoundTrip(req)
688 } else {
689 resp, err = pconn.roundTrip(treq)
690 }
691 if err == nil {
692 if pconn.alt != nil {
693
694
695
696
697
698 cancel(errRequestDone)
699 }
700 resp.Request = origReq
701 return resp, nil
702 }
703
704
705 if http2isNoCachedConnError(err) {
706 if t.removeIdleConn(pconn) {
707 t.decConnsPerHost(pconn.cacheKey)
708 }
709 } else if !pconn.shouldRetryRequest(req, err) {
710
711
712 if e, ok := err.(nothingWrittenError); ok {
713 err = e.error
714 }
715 if e, ok := err.(transportReadFromServerError); ok {
716 err = e.err
717 }
718 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
719
720
721
722 req.closeBody()
723 }
724 return nil, err
725 }
726 testHookRoundTripRetried()
727
728
729 req, err = rewindBody(req)
730 if err != nil {
731 return nil, err
732 }
733 }
734 }
735
736 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
737 select {
738 case <-req.Cancel:
739 cancel(errRequestCanceled)
740 case <-ctx.Done():
741 }
742 }
743
744 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
745
746 type readTrackingBody struct {
747 io.ReadCloser
748 didRead bool
749 didClose bool
750 }
751
752 func (r *readTrackingBody) Read(data []byte) (int, error) {
753 r.didRead = true
754 return r.ReadCloser.Read(data)
755 }
756
757 func (r *readTrackingBody) Close() error {
758 r.didClose = true
759 return r.ReadCloser.Close()
760 }
761
762
763
764
765
766 func setupRewindBody(req *Request) *Request {
767 if req.Body == nil || req.Body == NoBody {
768 return req
769 }
770 newReq := *req
771 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
772 return &newReq
773 }
774
775
776
777
778
779 func rewindBody(req *Request) (rewound *Request, err error) {
780 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
781 return req, nil
782 }
783 if !req.Body.(*readTrackingBody).didClose {
784 req.closeBody()
785 }
786 if req.GetBody == nil {
787 return nil, errCannotRewind
788 }
789 body, err := req.GetBody()
790 if err != nil {
791 return nil, err
792 }
793 newReq := *req
794 newReq.Body = &readTrackingBody{ReadCloser: body}
795 return &newReq, nil
796 }
797
798
799
800
801 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
802 if http2isNoCachedConnError(err) {
803
804
805
806
807
808
809 return true
810 }
811 if err == errMissingHost {
812
813 return false
814 }
815 if !pc.isReused() {
816
817
818
819
820
821
822
823 return false
824 }
825 if _, ok := err.(nothingWrittenError); ok {
826
827
828 return req.outgoingLength() == 0 || req.GetBody != nil
829 }
830 if !req.isReplayable() {
831
832 return false
833 }
834 if _, ok := err.(transportReadFromServerError); ok {
835
836
837 return true
838 }
839 if err == errServerClosedIdle {
840
841
842
843 return true
844 }
845 return false
846 }
847
848
849 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
850
851
852
853
854
855
856
857
858
859
860
861 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
862 t.altMu.Lock()
863 defer t.altMu.Unlock()
864 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
865 if _, exists := oldMap[scheme]; exists {
866 panic("protocol " + scheme + " already registered")
867 }
868 newMap := maps.Clone(oldMap)
869 if newMap == nil {
870 newMap = make(map[string]RoundTripper)
871 }
872 newMap[scheme] = rt
873 t.altProto.Store(newMap)
874 }
875
876
877
878
879
880 func (t *Transport) CloseIdleConnections() {
881 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
882 t.idleMu.Lock()
883 m := t.idleConn
884 t.idleConn = nil
885 t.closeIdle = true
886 t.idleLRU = connLRU{}
887 t.idleMu.Unlock()
888 for _, conns := range m {
889 for _, pconn := range conns {
890 pconn.close(errCloseIdleConns)
891 }
892 }
893 t.connsPerHostMu.Lock()
894 t.dialsInProgress.all(func(w *wantConn) {
895 if w.cancelCtx != nil && !w.waiting() {
896 w.cancelCtx()
897 }
898 })
899 t.connsPerHostMu.Unlock()
900 if t2 := t.h2transport; t2 != nil {
901 t2.CloseIdleConnections()
902 }
903 }
904
905
906 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
907
908
909
910
911
912
913 cancel := func(err error) {
914 origCancel(err)
915 t.reqMu.Lock()
916 delete(t.reqCanceler, req)
917 t.reqMu.Unlock()
918 }
919 t.reqMu.Lock()
920 if t.reqCanceler == nil {
921 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
922 }
923 t.reqCanceler[req] = cancel
924 t.reqMu.Unlock()
925 return cancel
926 }
927
928
929
930
931
932
933
934 func (t *Transport) CancelRequest(req *Request) {
935 t.reqMu.Lock()
936 cancel := t.reqCanceler[req]
937 t.reqMu.Unlock()
938 if cancel != nil {
939 cancel(errRequestCanceled)
940 }
941 }
942
943
944
945
946
947 var (
948 envProxyOnce sync.Once
949 envProxyFuncValue func(*url.URL) (*url.URL, error)
950 )
951
952
953
954 func envProxyFunc() func(*url.URL) (*url.URL, error) {
955 envProxyOnce.Do(func() {
956 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
957 })
958 return envProxyFuncValue
959 }
960
961
962 func resetProxyConfig() {
963 envProxyOnce = sync.Once{}
964 envProxyFuncValue = nil
965 }
966
967 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
968 cm.targetScheme = treq.URL.Scheme
969 cm.targetAddr = canonicalAddr(treq.URL)
970 if t.Proxy != nil {
971 cm.proxyURL, err = t.Proxy(treq.Request)
972 }
973 cm.onlyH1 = treq.requiresHTTP1()
974 return cm, err
975 }
976
977
978
979 func (cm *connectMethod) proxyAuth() string {
980 if cm.proxyURL == nil {
981 return ""
982 }
983 if u := cm.proxyURL.User; u != nil {
984 username := u.Username()
985 password, _ := u.Password()
986 return "Basic " + basicAuth(username, password)
987 }
988 return ""
989 }
990
991
992 var (
993 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
994 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
995 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
996 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
997 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
998 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
999 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1000 errIdleConnTimeout = errors.New("http: idle connection timeout")
1001
1002
1003
1004
1005
1006 errServerClosedIdle = errors.New("http: server closed idle connection")
1007 )
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017 type transportReadFromServerError struct {
1018 err error
1019 }
1020
1021 func (e transportReadFromServerError) Unwrap() error { return e.err }
1022
1023 func (e transportReadFromServerError) Error() string {
1024 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1025 }
1026
1027 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1028 if err := t.tryPutIdleConn(pconn); err != nil {
1029 pconn.close(err)
1030 }
1031 }
1032
1033 func (t *Transport) maxIdleConnsPerHost() int {
1034 if v := t.MaxIdleConnsPerHost; v != 0 {
1035 return v
1036 }
1037 return DefaultMaxIdleConnsPerHost
1038 }
1039
1040
1041
1042
1043
1044
1045 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1046 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1047 return errKeepAlivesDisabled
1048 }
1049 if pconn.isBroken() {
1050 return errConnBroken
1051 }
1052 pconn.markReused()
1053
1054 t.idleMu.Lock()
1055 defer t.idleMu.Unlock()
1056
1057
1058
1059
1060 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1061 return nil
1062 }
1063
1064
1065
1066
1067
1068 key := pconn.cacheKey
1069 if q, ok := t.idleConnWait[key]; ok {
1070 done := false
1071 if pconn.alt == nil {
1072
1073
1074 for q.len() > 0 {
1075 w := q.popFront()
1076 if w.tryDeliver(pconn, nil, time.Time{}) {
1077 done = true
1078 break
1079 }
1080 }
1081 } else {
1082
1083
1084
1085
1086 for q.len() > 0 {
1087 w := q.popFront()
1088 w.tryDeliver(pconn, nil, time.Time{})
1089 }
1090 }
1091 if q.len() == 0 {
1092 delete(t.idleConnWait, key)
1093 } else {
1094 t.idleConnWait[key] = q
1095 }
1096 if done {
1097 return nil
1098 }
1099 }
1100
1101 if t.closeIdle {
1102 return errCloseIdle
1103 }
1104 if t.idleConn == nil {
1105 t.idleConn = make(map[connectMethodKey][]*persistConn)
1106 }
1107 idles := t.idleConn[key]
1108 if len(idles) >= t.maxIdleConnsPerHost() {
1109 return errTooManyIdleHost
1110 }
1111 for _, exist := range idles {
1112 if exist == pconn {
1113 log.Fatalf("dup idle pconn %p in freelist", pconn)
1114 }
1115 }
1116 t.idleConn[key] = append(idles, pconn)
1117 t.idleLRU.add(pconn)
1118 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1119 oldest := t.idleLRU.removeOldest()
1120 oldest.close(errTooManyIdle)
1121 t.removeIdleConnLocked(oldest)
1122 }
1123
1124
1125
1126
1127 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1128 if pconn.idleTimer != nil {
1129 pconn.idleTimer.Reset(t.IdleConnTimeout)
1130 } else {
1131 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1132 }
1133 }
1134 pconn.idleAt = time.Now()
1135 return nil
1136 }
1137
1138
1139
1140
1141 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1142 if t.DisableKeepAlives {
1143 return false
1144 }
1145
1146 t.idleMu.Lock()
1147 defer t.idleMu.Unlock()
1148
1149
1150
1151 t.closeIdle = false
1152
1153 if w == nil {
1154
1155 return false
1156 }
1157
1158
1159
1160
1161 var oldTime time.Time
1162 if t.IdleConnTimeout > 0 {
1163 oldTime = time.Now().Add(-t.IdleConnTimeout)
1164 }
1165
1166
1167 if list, ok := t.idleConn[w.key]; ok {
1168 stop := false
1169 delivered := false
1170 for len(list) > 0 && !stop {
1171 pconn := list[len(list)-1]
1172
1173
1174
1175
1176 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1177 if tooOld {
1178
1179
1180
1181 go pconn.closeConnIfStillIdle()
1182 }
1183 if pconn.isBroken() || tooOld {
1184
1185
1186
1187
1188
1189 list = list[:len(list)-1]
1190 continue
1191 }
1192 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1193 if delivered {
1194 if pconn.alt != nil {
1195
1196
1197 } else {
1198
1199
1200 t.idleLRU.remove(pconn)
1201 list = list[:len(list)-1]
1202 }
1203 }
1204 stop = true
1205 }
1206 if len(list) > 0 {
1207 t.idleConn[w.key] = list
1208 } else {
1209 delete(t.idleConn, w.key)
1210 }
1211 if stop {
1212 return delivered
1213 }
1214 }
1215
1216
1217 if t.idleConnWait == nil {
1218 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1219 }
1220 q := t.idleConnWait[w.key]
1221 q.cleanFrontNotWaiting()
1222 q.pushBack(w)
1223 t.idleConnWait[w.key] = q
1224 return false
1225 }
1226
1227
1228 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1229 t.idleMu.Lock()
1230 defer t.idleMu.Unlock()
1231 return t.removeIdleConnLocked(pconn)
1232 }
1233
1234
1235 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1236 if pconn.idleTimer != nil {
1237 pconn.idleTimer.Stop()
1238 }
1239 t.idleLRU.remove(pconn)
1240 key := pconn.cacheKey
1241 pconns := t.idleConn[key]
1242 var removed bool
1243 switch len(pconns) {
1244 case 0:
1245
1246 case 1:
1247 if pconns[0] == pconn {
1248 delete(t.idleConn, key)
1249 removed = true
1250 }
1251 default:
1252 for i, v := range pconns {
1253 if v != pconn {
1254 continue
1255 }
1256
1257
1258 copy(pconns[i:], pconns[i+1:])
1259 t.idleConn[key] = pconns[:len(pconns)-1]
1260 removed = true
1261 break
1262 }
1263 }
1264 return removed
1265 }
1266
1267 var zeroDialer net.Dialer
1268
1269 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1270 if t.DialContext != nil {
1271 c, err := t.DialContext(ctx, network, addr)
1272 if c == nil && err == nil {
1273 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1274 }
1275 return c, err
1276 }
1277 if t.Dial != nil {
1278 c, err := t.Dial(network, addr)
1279 if c == nil && err == nil {
1280 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1281 }
1282 return c, err
1283 }
1284 return zeroDialer.DialContext(ctx, network, addr)
1285 }
1286
1287
1288
1289
1290
1291
1292
1293 type wantConn struct {
1294 cm connectMethod
1295 key connectMethodKey
1296
1297
1298
1299
1300 beforeDial func()
1301 afterDial func()
1302
1303 mu sync.Mutex
1304 ctx context.Context
1305 cancelCtx context.CancelFunc
1306 done bool
1307 result chan connOrError
1308 }
1309
1310 type connOrError struct {
1311 pc *persistConn
1312 err error
1313 idleAt time.Time
1314 }
1315
1316
1317 func (w *wantConn) waiting() bool {
1318 w.mu.Lock()
1319 defer w.mu.Unlock()
1320
1321 return !w.done
1322 }
1323
1324
1325 func (w *wantConn) getCtxForDial() context.Context {
1326 w.mu.Lock()
1327 defer w.mu.Unlock()
1328
1329 return w.ctx
1330 }
1331
1332
1333 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1334 w.mu.Lock()
1335 defer w.mu.Unlock()
1336
1337 if w.done {
1338 return false
1339 }
1340 if (pc == nil) == (err == nil) {
1341 panic("net/http: internal error: misuse of tryDeliver")
1342 }
1343 w.ctx = nil
1344 w.done = true
1345
1346 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1347 close(w.result)
1348
1349 return true
1350 }
1351
1352
1353
1354 func (w *wantConn) cancel(t *Transport, err error) {
1355 w.mu.Lock()
1356 var pc *persistConn
1357 if w.done {
1358 if r, ok := <-w.result; ok {
1359 pc = r.pc
1360 }
1361 } else {
1362 close(w.result)
1363 }
1364 w.ctx = nil
1365 w.done = true
1366 w.mu.Unlock()
1367
1368 if pc != nil {
1369 t.putOrCloseIdleConn(pc)
1370 }
1371 }
1372
1373
1374 type wantConnQueue struct {
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385 head []*wantConn
1386 headPos int
1387 tail []*wantConn
1388 }
1389
1390
1391 func (q *wantConnQueue) len() int {
1392 return len(q.head) - q.headPos + len(q.tail)
1393 }
1394
1395
1396 func (q *wantConnQueue) pushBack(w *wantConn) {
1397 q.tail = append(q.tail, w)
1398 }
1399
1400
1401 func (q *wantConnQueue) popFront() *wantConn {
1402 if q.headPos >= len(q.head) {
1403 if len(q.tail) == 0 {
1404 return nil
1405 }
1406
1407 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1408 }
1409 w := q.head[q.headPos]
1410 q.head[q.headPos] = nil
1411 q.headPos++
1412 return w
1413 }
1414
1415
1416 func (q *wantConnQueue) peekFront() *wantConn {
1417 if q.headPos < len(q.head) {
1418 return q.head[q.headPos]
1419 }
1420 if len(q.tail) > 0 {
1421 return q.tail[0]
1422 }
1423 return nil
1424 }
1425
1426
1427
1428 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1429 for {
1430 w := q.peekFront()
1431 if w == nil || w.waiting() {
1432 return cleaned
1433 }
1434 q.popFront()
1435 cleaned = true
1436 }
1437 }
1438
1439
1440 func (q *wantConnQueue) cleanFrontCanceled() {
1441 for {
1442 w := q.peekFront()
1443 if w == nil || w.cancelCtx != nil {
1444 return
1445 }
1446 q.popFront()
1447 }
1448 }
1449
1450
1451
1452 func (q *wantConnQueue) all(f func(*wantConn)) {
1453 for _, w := range q.head[q.headPos:] {
1454 f(w)
1455 }
1456 for _, w := range q.tail {
1457 f(w)
1458 }
1459 }
1460
1461 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1462 if t.DialTLSContext != nil {
1463 conn, err = t.DialTLSContext(ctx, network, addr)
1464 } else {
1465 conn, err = t.DialTLS(network, addr)
1466 }
1467 if conn == nil && err == nil {
1468 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1469 }
1470 return
1471 }
1472
1473
1474
1475
1476
1477 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1478 req := treq.Request
1479 trace := treq.trace
1480 ctx := req.Context()
1481 if trace != nil && trace.GetConn != nil {
1482 trace.GetConn(cm.addr())
1483 }
1484
1485
1486
1487
1488
1489
1490 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1491
1492 w := &wantConn{
1493 cm: cm,
1494 key: cm.key(),
1495 ctx: dialCtx,
1496 cancelCtx: dialCancel,
1497 result: make(chan connOrError, 1),
1498 beforeDial: testHookPrePendingDial,
1499 afterDial: testHookPostPendingDial,
1500 }
1501 defer func() {
1502 if err != nil {
1503 w.cancel(t, err)
1504 }
1505 }()
1506
1507
1508 if delivered := t.queueForIdleConn(w); !delivered {
1509 t.queueForDial(w)
1510 }
1511
1512
1513 select {
1514 case r := <-w.result:
1515
1516
1517 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1518 info := httptrace.GotConnInfo{
1519 Conn: r.pc.conn,
1520 Reused: r.pc.isReused(),
1521 }
1522 if !r.idleAt.IsZero() {
1523 info.WasIdle = true
1524 info.IdleTime = time.Since(r.idleAt)
1525 }
1526 trace.GotConn(info)
1527 }
1528 if r.err != nil {
1529
1530
1531
1532 select {
1533 case <-treq.ctx.Done():
1534 err := context.Cause(treq.ctx)
1535 if err == errRequestCanceled {
1536 err = errRequestCanceledConn
1537 }
1538 return nil, err
1539 default:
1540
1541 }
1542 }
1543 return r.pc, r.err
1544 case <-treq.ctx.Done():
1545 err := context.Cause(treq.ctx)
1546 if err == errRequestCanceled {
1547 err = errRequestCanceledConn
1548 }
1549 return nil, err
1550 }
1551 }
1552
1553
1554
1555 func (t *Transport) queueForDial(w *wantConn) {
1556 w.beforeDial()
1557
1558 t.connsPerHostMu.Lock()
1559 defer t.connsPerHostMu.Unlock()
1560
1561 if t.MaxConnsPerHost <= 0 {
1562 t.startDialConnForLocked(w)
1563 return
1564 }
1565
1566 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1567 if t.connsPerHost == nil {
1568 t.connsPerHost = make(map[connectMethodKey]int)
1569 }
1570 t.connsPerHost[w.key] = n + 1
1571 t.startDialConnForLocked(w)
1572 return
1573 }
1574
1575 if t.connsPerHostWait == nil {
1576 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1577 }
1578 q := t.connsPerHostWait[w.key]
1579 q.cleanFrontNotWaiting()
1580 q.pushBack(w)
1581 t.connsPerHostWait[w.key] = q
1582 }
1583
1584
1585
1586 func (t *Transport) startDialConnForLocked(w *wantConn) {
1587 t.dialsInProgress.cleanFrontCanceled()
1588 t.dialsInProgress.pushBack(w)
1589 go func() {
1590 t.dialConnFor(w)
1591 t.connsPerHostMu.Lock()
1592 defer t.connsPerHostMu.Unlock()
1593 w.cancelCtx = nil
1594 }()
1595 }
1596
1597
1598
1599
1600 func (t *Transport) dialConnFor(w *wantConn) {
1601 defer w.afterDial()
1602 ctx := w.getCtxForDial()
1603 if ctx == nil {
1604 t.decConnsPerHost(w.key)
1605 return
1606 }
1607
1608 pc, err := t.dialConn(ctx, w.cm)
1609 delivered := w.tryDeliver(pc, err, time.Time{})
1610 if err == nil && (!delivered || pc.alt != nil) {
1611
1612
1613
1614 t.putOrCloseIdleConn(pc)
1615 }
1616 if err != nil {
1617 t.decConnsPerHost(w.key)
1618 }
1619 }
1620
1621
1622
1623 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1624 if t.MaxConnsPerHost <= 0 {
1625 return
1626 }
1627
1628 t.connsPerHostMu.Lock()
1629 defer t.connsPerHostMu.Unlock()
1630 n := t.connsPerHost[key]
1631 if n == 0 {
1632
1633
1634 panic("net/http: internal error: connCount underflow")
1635 }
1636
1637
1638
1639
1640
1641 if q := t.connsPerHostWait[key]; q.len() > 0 {
1642 done := false
1643 for q.len() > 0 {
1644 w := q.popFront()
1645 if w.waiting() {
1646 t.startDialConnForLocked(w)
1647 done = true
1648 break
1649 }
1650 }
1651 if q.len() == 0 {
1652 delete(t.connsPerHostWait, key)
1653 } else {
1654
1655
1656 t.connsPerHostWait[key] = q
1657 }
1658 if done {
1659 return
1660 }
1661 }
1662
1663
1664 if n--; n == 0 {
1665 delete(t.connsPerHost, key)
1666 } else {
1667 t.connsPerHost[key] = n
1668 }
1669 }
1670
1671
1672
1673
1674 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1675
1676 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1677 if cfg.ServerName == "" {
1678 cfg.ServerName = name
1679 }
1680 if pconn.cacheKey.onlyH1 {
1681 cfg.NextProtos = nil
1682 }
1683 plainConn := pconn.conn
1684 tlsConn := tls.Client(plainConn, cfg)
1685 errc := make(chan error, 2)
1686 var timer *time.Timer
1687 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1688 timer = time.AfterFunc(d, func() {
1689 errc <- tlsHandshakeTimeoutError{}
1690 })
1691 }
1692 go func() {
1693 if trace != nil && trace.TLSHandshakeStart != nil {
1694 trace.TLSHandshakeStart()
1695 }
1696 err := tlsConn.HandshakeContext(ctx)
1697 if timer != nil {
1698 timer.Stop()
1699 }
1700 errc <- err
1701 }()
1702 if err := <-errc; err != nil {
1703 plainConn.Close()
1704 if err == (tlsHandshakeTimeoutError{}) {
1705
1706
1707 <-errc
1708 }
1709 if trace != nil && trace.TLSHandshakeDone != nil {
1710 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1711 }
1712 return err
1713 }
1714 cs := tlsConn.ConnectionState()
1715 if trace != nil && trace.TLSHandshakeDone != nil {
1716 trace.TLSHandshakeDone(cs, nil)
1717 }
1718 pconn.tlsState = &cs
1719 pconn.conn = tlsConn
1720 return nil
1721 }
1722
1723 type erringRoundTripper interface {
1724 RoundTripErr() error
1725 }
1726
1727 var testHookProxyConnectTimeout = context.WithTimeout
1728
1729 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1730 pconn = &persistConn{
1731 t: t,
1732 cacheKey: cm.key(),
1733 reqch: make(chan requestAndChan, 1),
1734 writech: make(chan writeRequest, 1),
1735 closech: make(chan struct{}),
1736 writeErrCh: make(chan error, 1),
1737 writeLoopDone: make(chan struct{}),
1738 }
1739 trace := httptrace.ContextClientTrace(ctx)
1740 wrapErr := func(err error) error {
1741 if cm.proxyURL != nil {
1742
1743 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1744 }
1745 return err
1746 }
1747 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1748 var err error
1749 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1750 if err != nil {
1751 return nil, wrapErr(err)
1752 }
1753 if tc, ok := pconn.conn.(*tls.Conn); ok {
1754
1755
1756 if trace != nil && trace.TLSHandshakeStart != nil {
1757 trace.TLSHandshakeStart()
1758 }
1759 if err := tc.HandshakeContext(ctx); err != nil {
1760 go pconn.conn.Close()
1761 if trace != nil && trace.TLSHandshakeDone != nil {
1762 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1763 }
1764 return nil, err
1765 }
1766 cs := tc.ConnectionState()
1767 if trace != nil && trace.TLSHandshakeDone != nil {
1768 trace.TLSHandshakeDone(cs, nil)
1769 }
1770 pconn.tlsState = &cs
1771 }
1772 } else {
1773 conn, err := t.dial(ctx, "tcp", cm.addr())
1774 if err != nil {
1775 return nil, wrapErr(err)
1776 }
1777 pconn.conn = conn
1778 if cm.scheme() == "https" {
1779 var firstTLSHost string
1780 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1781 return nil, wrapErr(err)
1782 }
1783 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1784 return nil, wrapErr(err)
1785 }
1786 }
1787 }
1788
1789
1790 switch {
1791 case cm.proxyURL == nil:
1792
1793 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1794 conn := pconn.conn
1795 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1796 if u := cm.proxyURL.User; u != nil {
1797 auth := &socksUsernamePassword{
1798 Username: u.Username(),
1799 }
1800 auth.Password, _ = u.Password()
1801 d.AuthMethods = []socksAuthMethod{
1802 socksAuthMethodNotRequired,
1803 socksAuthMethodUsernamePassword,
1804 }
1805 d.Authenticate = auth.Authenticate
1806 }
1807 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1808 conn.Close()
1809 return nil, err
1810 }
1811 case cm.targetScheme == "http":
1812 pconn.isProxy = true
1813 if pa := cm.proxyAuth(); pa != "" {
1814 pconn.mutateHeaderFunc = func(h Header) {
1815 h.Set("Proxy-Authorization", pa)
1816 }
1817 }
1818 case cm.targetScheme == "https":
1819 conn := pconn.conn
1820 var hdr Header
1821 if t.GetProxyConnectHeader != nil {
1822 var err error
1823 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1824 if err != nil {
1825 conn.Close()
1826 return nil, err
1827 }
1828 } else {
1829 hdr = t.ProxyConnectHeader
1830 }
1831 if hdr == nil {
1832 hdr = make(Header)
1833 }
1834 if pa := cm.proxyAuth(); pa != "" {
1835 hdr = hdr.Clone()
1836 hdr.Set("Proxy-Authorization", pa)
1837 }
1838 connectReq := &Request{
1839 Method: "CONNECT",
1840 URL: &url.URL{Opaque: cm.targetAddr},
1841 Host: cm.targetAddr,
1842 Header: hdr,
1843 }
1844
1845
1846
1847
1848 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1849 defer cancel()
1850
1851 didReadResponse := make(chan struct{})
1852 var (
1853 resp *Response
1854 err error
1855 )
1856
1857 go func() {
1858 defer close(didReadResponse)
1859 err = connectReq.Write(conn)
1860 if err != nil {
1861 return
1862 }
1863
1864
1865 br := bufio.NewReader(conn)
1866 resp, err = ReadResponse(br, connectReq)
1867 }()
1868 select {
1869 case <-connectCtx.Done():
1870 conn.Close()
1871 <-didReadResponse
1872 return nil, connectCtx.Err()
1873 case <-didReadResponse:
1874
1875 }
1876 if err != nil {
1877 conn.Close()
1878 return nil, err
1879 }
1880
1881 if t.OnProxyConnectResponse != nil {
1882 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1883 if err != nil {
1884 conn.Close()
1885 return nil, err
1886 }
1887 }
1888
1889 if resp.StatusCode != 200 {
1890 _, text, ok := strings.Cut(resp.Status, " ")
1891 conn.Close()
1892 if !ok {
1893 return nil, errors.New("unknown status code")
1894 }
1895 return nil, errors.New(text)
1896 }
1897 }
1898
1899 if cm.proxyURL != nil && cm.targetScheme == "https" {
1900 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1901 return nil, err
1902 }
1903 }
1904
1905 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1906 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1907 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1908 if e, ok := alt.(erringRoundTripper); ok {
1909
1910 return nil, e.RoundTripErr()
1911 }
1912 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1913 }
1914 }
1915
1916 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1917 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1918
1919 go pconn.readLoop()
1920 go pconn.writeLoop()
1921 return pconn, nil
1922 }
1923
1924
1925
1926
1927
1928
1929
1930 type persistConnWriter struct {
1931 pc *persistConn
1932 }
1933
1934 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1935 n, err = w.pc.conn.Write(p)
1936 w.pc.nwrite += int64(n)
1937 return
1938 }
1939
1940
1941
1942
1943 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1944 n, err = io.Copy(w.pc.conn, r)
1945 w.pc.nwrite += n
1946 return
1947 }
1948
1949 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967 type connectMethod struct {
1968 _ incomparable
1969 proxyURL *url.URL
1970 targetScheme string
1971
1972
1973
1974 targetAddr string
1975 onlyH1 bool
1976 }
1977
1978 func (cm *connectMethod) key() connectMethodKey {
1979 proxyStr := ""
1980 targetAddr := cm.targetAddr
1981 if cm.proxyURL != nil {
1982 proxyStr = cm.proxyURL.String()
1983 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1984 targetAddr = ""
1985 }
1986 }
1987 return connectMethodKey{
1988 proxy: proxyStr,
1989 scheme: cm.targetScheme,
1990 addr: targetAddr,
1991 onlyH1: cm.onlyH1,
1992 }
1993 }
1994
1995
1996 func (cm *connectMethod) scheme() string {
1997 if cm.proxyURL != nil {
1998 return cm.proxyURL.Scheme
1999 }
2000 return cm.targetScheme
2001 }
2002
2003
2004 func (cm *connectMethod) addr() string {
2005 if cm.proxyURL != nil {
2006 return canonicalAddr(cm.proxyURL)
2007 }
2008 return cm.targetAddr
2009 }
2010
2011
2012
2013 func (cm *connectMethod) tlsHost() string {
2014 h := cm.targetAddr
2015 if hasPort(h) {
2016 h = h[:strings.LastIndex(h, ":")]
2017 }
2018 return h
2019 }
2020
2021
2022
2023
2024 type connectMethodKey struct {
2025 proxy, scheme, addr string
2026 onlyH1 bool
2027 }
2028
2029 func (k connectMethodKey) String() string {
2030
2031 var h1 string
2032 if k.onlyH1 {
2033 h1 = ",h1"
2034 }
2035 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2036 }
2037
2038
2039
2040 type persistConn struct {
2041
2042
2043
2044 alt RoundTripper
2045
2046 t *Transport
2047 cacheKey connectMethodKey
2048 conn net.Conn
2049 tlsState *tls.ConnectionState
2050 br *bufio.Reader
2051 bw *bufio.Writer
2052 nwrite int64
2053 reqch chan requestAndChan
2054 writech chan writeRequest
2055 closech chan struct{}
2056 isProxy bool
2057 sawEOF bool
2058 readLimit int64
2059
2060
2061
2062
2063 writeErrCh chan error
2064
2065 writeLoopDone chan struct{}
2066
2067
2068 idleAt time.Time
2069 idleTimer *time.Timer
2070
2071 mu sync.Mutex
2072 numExpectedResponses int
2073 closed error
2074 canceledErr error
2075 broken bool
2076 reused bool
2077
2078
2079
2080 mutateHeaderFunc func(Header)
2081 }
2082
2083 func (pc *persistConn) maxHeaderResponseSize() int64 {
2084 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2085 return v
2086 }
2087 return 10 << 20
2088 }
2089
2090 func (pc *persistConn) Read(p []byte) (n int, err error) {
2091 if pc.readLimit <= 0 {
2092 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2093 }
2094 if int64(len(p)) > pc.readLimit {
2095 p = p[:pc.readLimit]
2096 }
2097 n, err = pc.conn.Read(p)
2098 if err == io.EOF {
2099 pc.sawEOF = true
2100 }
2101 pc.readLimit -= int64(n)
2102 return
2103 }
2104
2105
2106 func (pc *persistConn) isBroken() bool {
2107 pc.mu.Lock()
2108 b := pc.closed != nil
2109 pc.mu.Unlock()
2110 return b
2111 }
2112
2113
2114
2115 func (pc *persistConn) canceled() error {
2116 pc.mu.Lock()
2117 defer pc.mu.Unlock()
2118 return pc.canceledErr
2119 }
2120
2121
2122 func (pc *persistConn) isReused() bool {
2123 pc.mu.Lock()
2124 r := pc.reused
2125 pc.mu.Unlock()
2126 return r
2127 }
2128
2129 func (pc *persistConn) cancelRequest(err error) {
2130 pc.mu.Lock()
2131 defer pc.mu.Unlock()
2132 pc.canceledErr = err
2133 pc.closeLocked(errRequestCanceled)
2134 }
2135
2136
2137
2138
2139 func (pc *persistConn) closeConnIfStillIdle() {
2140 t := pc.t
2141 t.idleMu.Lock()
2142 defer t.idleMu.Unlock()
2143 if _, ok := t.idleLRU.m[pc]; !ok {
2144
2145 return
2146 }
2147 t.removeIdleConnLocked(pc)
2148 pc.close(errIdleConnTimeout)
2149 }
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2160 if err == nil {
2161 return nil
2162 }
2163
2164
2165
2166
2167
2168
2169
2170
2171 <-pc.writeLoopDone
2172
2173
2174
2175
2176 if cerr := pc.canceled(); cerr != nil {
2177 return cerr
2178 }
2179
2180
2181 req.mu.Lock()
2182 reqErr := req.err
2183 req.mu.Unlock()
2184 if reqErr != nil {
2185 return reqErr
2186 }
2187
2188 if err == errServerClosedIdle {
2189
2190 return err
2191 }
2192
2193 if _, ok := err.(transportReadFromServerError); ok {
2194 if pc.nwrite == startBytesWritten {
2195 return nothingWrittenError{err}
2196 }
2197
2198 return err
2199 }
2200 if pc.isBroken() {
2201 if pc.nwrite == startBytesWritten {
2202 return nothingWrittenError{err}
2203 }
2204 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2205 }
2206 return err
2207 }
2208
2209
2210
2211
2212 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2213
2214 func (pc *persistConn) readLoop() {
2215 closeErr := errReadLoopExiting
2216 defer func() {
2217 pc.close(closeErr)
2218 pc.t.removeIdleConn(pc)
2219 }()
2220
2221 tryPutIdleConn := func(treq *transportRequest) bool {
2222 trace := treq.trace
2223 if err := pc.t.tryPutIdleConn(pc); err != nil {
2224 closeErr = err
2225 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2226 trace.PutIdleConn(err)
2227 }
2228 return false
2229 }
2230 if trace != nil && trace.PutIdleConn != nil {
2231 trace.PutIdleConn(nil)
2232 }
2233 return true
2234 }
2235
2236
2237
2238
2239 eofc := make(chan struct{})
2240 defer close(eofc)
2241
2242
2243 testHookMu.Lock()
2244 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2245 testHookMu.Unlock()
2246
2247 alive := true
2248 for alive {
2249 pc.readLimit = pc.maxHeaderResponseSize()
2250 _, err := pc.br.Peek(1)
2251
2252 pc.mu.Lock()
2253 if pc.numExpectedResponses == 0 {
2254 pc.readLoopPeekFailLocked(err)
2255 pc.mu.Unlock()
2256 return
2257 }
2258 pc.mu.Unlock()
2259
2260 rc := <-pc.reqch
2261 trace := rc.treq.trace
2262
2263 var resp *Response
2264 if err == nil {
2265 resp, err = pc.readResponse(rc, trace)
2266 } else {
2267 err = transportReadFromServerError{err}
2268 closeErr = err
2269 }
2270
2271 if err != nil {
2272 if pc.readLimit <= 0 {
2273 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2274 }
2275
2276 select {
2277 case rc.ch <- responseAndError{err: err}:
2278 case <-rc.callerGone:
2279 return
2280 }
2281 return
2282 }
2283 pc.readLimit = maxInt64
2284
2285 pc.mu.Lock()
2286 pc.numExpectedResponses--
2287 pc.mu.Unlock()
2288
2289 bodyWritable := resp.bodyIsWritable()
2290 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2291
2292 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2293
2294
2295
2296 alive = false
2297 }
2298
2299 if !hasBody || bodyWritable {
2300
2301
2302
2303
2304
2305 alive = alive &&
2306 !pc.sawEOF &&
2307 pc.wroteRequest() &&
2308 tryPutIdleConn(rc.treq)
2309
2310 if bodyWritable {
2311 closeErr = errCallerOwnsConn
2312 }
2313
2314 select {
2315 case rc.ch <- responseAndError{res: resp}:
2316 case <-rc.callerGone:
2317 return
2318 }
2319
2320 rc.treq.cancel(errRequestDone)
2321
2322
2323
2324
2325 testHookReadLoopBeforeNextRead()
2326 continue
2327 }
2328
2329 waitForBodyRead := make(chan bool, 2)
2330 body := &bodyEOFSignal{
2331 body: resp.Body,
2332 earlyCloseFn: func() error {
2333 waitForBodyRead <- false
2334 <-eofc
2335 return nil
2336
2337 },
2338 fn: func(err error) error {
2339 isEOF := err == io.EOF
2340 waitForBodyRead <- isEOF
2341 if isEOF {
2342 <-eofc
2343 } else if err != nil {
2344 if cerr := pc.canceled(); cerr != nil {
2345 return cerr
2346 }
2347 }
2348 return err
2349 },
2350 }
2351
2352 resp.Body = body
2353 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2354 resp.Body = &gzipReader{body: body}
2355 resp.Header.Del("Content-Encoding")
2356 resp.Header.Del("Content-Length")
2357 resp.ContentLength = -1
2358 resp.Uncompressed = true
2359 }
2360
2361 select {
2362 case rc.ch <- responseAndError{res: resp}:
2363 case <-rc.callerGone:
2364 return
2365 }
2366
2367
2368
2369
2370 select {
2371 case bodyEOF := <-waitForBodyRead:
2372 alive = alive &&
2373 bodyEOF &&
2374 !pc.sawEOF &&
2375 pc.wroteRequest() &&
2376 tryPutIdleConn(rc.treq)
2377 if bodyEOF {
2378 eofc <- struct{}{}
2379 }
2380 case <-rc.treq.ctx.Done():
2381 alive = false
2382 pc.cancelRequest(context.Cause(rc.treq.ctx))
2383 case <-pc.closech:
2384 alive = false
2385 }
2386
2387 rc.treq.cancel(errRequestDone)
2388 testHookReadLoopBeforeNextRead()
2389 }
2390 }
2391
2392 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2393 if pc.closed != nil {
2394 return
2395 }
2396 if n := pc.br.Buffered(); n > 0 {
2397 buf, _ := pc.br.Peek(n)
2398 if is408Message(buf) {
2399 pc.closeLocked(errServerClosedIdle)
2400 return
2401 } else {
2402 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2403 }
2404 }
2405 if peekErr == io.EOF {
2406
2407 pc.closeLocked(errServerClosedIdle)
2408 } else {
2409 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2410 }
2411 }
2412
2413
2414
2415
2416 func is408Message(buf []byte) bool {
2417 if len(buf) < len("HTTP/1.x 408") {
2418 return false
2419 }
2420 if string(buf[:7]) != "HTTP/1." {
2421 return false
2422 }
2423 return string(buf[8:12]) == " 408"
2424 }
2425
2426
2427
2428
2429 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2430 if trace != nil && trace.GotFirstResponseByte != nil {
2431 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2432 trace.GotFirstResponseByte()
2433 }
2434 }
2435
2436 continueCh := rc.continueCh
2437 for {
2438 resp, err = ReadResponse(pc.br, rc.treq.Request)
2439 if err != nil {
2440 return
2441 }
2442 resCode := resp.StatusCode
2443 if continueCh != nil && resCode == StatusContinue {
2444 if trace != nil && trace.Got100Continue != nil {
2445 trace.Got100Continue()
2446 }
2447 continueCh <- struct{}{}
2448 continueCh = nil
2449 }
2450 is1xx := 100 <= resCode && resCode <= 199
2451
2452 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2453 if is1xxNonTerminal {
2454 if trace != nil && trace.Got1xxResponse != nil {
2455 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2456 return nil, err
2457 }
2458
2459
2460
2461
2462
2463
2464
2465 pc.readLimit = pc.maxHeaderResponseSize()
2466 }
2467 continue
2468 }
2469 break
2470 }
2471 if resp.isProtocolSwitch() {
2472 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2473 }
2474 if continueCh != nil {
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487 if resp.Close || rc.treq.Request.Close {
2488 close(continueCh)
2489 } else {
2490 continueCh <- struct{}{}
2491 }
2492 }
2493
2494 resp.TLS = pc.tlsState
2495 return
2496 }
2497
2498
2499
2500
2501 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2502 if continueCh == nil {
2503 return nil
2504 }
2505 return func() bool {
2506 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2507 defer timer.Stop()
2508
2509 select {
2510 case _, ok := <-continueCh:
2511 return ok
2512 case <-timer.C:
2513 return true
2514 case <-pc.closech:
2515 return false
2516 }
2517 }
2518 }
2519
2520 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2521 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2522 if br.Buffered() != 0 {
2523 body.br = br
2524 }
2525 return body
2526 }
2527
2528
2529
2530
2531
2532
2533 type readWriteCloserBody struct {
2534 _ incomparable
2535 br *bufio.Reader
2536 io.ReadWriteCloser
2537 }
2538
2539 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2540 if b.br != nil {
2541 if n := b.br.Buffered(); len(p) > n {
2542 p = p[:n]
2543 }
2544 n, err = b.br.Read(p)
2545 if b.br.Buffered() == 0 {
2546 b.br = nil
2547 }
2548 return n, err
2549 }
2550 return b.ReadWriteCloser.Read(p)
2551 }
2552
2553
2554 type nothingWrittenError struct {
2555 error
2556 }
2557
2558 func (nwe nothingWrittenError) Unwrap() error {
2559 return nwe.error
2560 }
2561
2562 func (pc *persistConn) writeLoop() {
2563 defer close(pc.writeLoopDone)
2564 for {
2565 select {
2566 case wr := <-pc.writech:
2567 startBytesWritten := pc.nwrite
2568 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2569 if bre, ok := err.(requestBodyReadError); ok {
2570 err = bre.error
2571
2572
2573
2574
2575
2576
2577
2578 wr.req.setError(err)
2579 }
2580 if err == nil {
2581 err = pc.bw.Flush()
2582 }
2583 if err != nil {
2584 if pc.nwrite == startBytesWritten {
2585 err = nothingWrittenError{err}
2586 }
2587 }
2588 pc.writeErrCh <- err
2589 wr.ch <- err
2590 if err != nil {
2591 pc.close(err)
2592 return
2593 }
2594 case <-pc.closech:
2595 return
2596 }
2597 }
2598 }
2599
2600
2601
2602
2603
2604
2605
2606 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2607
2608
2609
2610 func (pc *persistConn) wroteRequest() bool {
2611 select {
2612 case err := <-pc.writeErrCh:
2613
2614
2615 return err == nil
2616 default:
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2628 defer t.Stop()
2629 select {
2630 case err := <-pc.writeErrCh:
2631 return err == nil
2632 case <-t.C:
2633 return false
2634 }
2635 }
2636 }
2637
2638
2639
2640 type responseAndError struct {
2641 _ incomparable
2642 res *Response
2643 err error
2644 }
2645
2646 type requestAndChan struct {
2647 _ incomparable
2648 treq *transportRequest
2649 ch chan responseAndError
2650
2651
2652
2653
2654 addedGzip bool
2655
2656
2657
2658
2659
2660 continueCh chan<- struct{}
2661
2662 callerGone <-chan struct{}
2663 }
2664
2665
2666
2667
2668
2669 type writeRequest struct {
2670 req *transportRequest
2671 ch chan<- error
2672
2673
2674
2675
2676 continueCh <-chan struct{}
2677 }
2678
2679
2680
2681 type timeoutError struct {
2682 err string
2683 }
2684
2685 func (e *timeoutError) Error() string { return e.err }
2686 func (e *timeoutError) Timeout() bool { return true }
2687 func (e *timeoutError) Temporary() bool { return true }
2688 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2689
2690 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2691
2692
2693
2694 var errRequestCanceled = http2errRequestCanceled
2695 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2696
2697
2698
2699 var errRequestDone = errors.New("net/http: request completed")
2700
2701 func nop() {}
2702
2703
2704 var (
2705 testHookEnterRoundTrip = nop
2706 testHookWaitResLoop = nop
2707 testHookRoundTripRetried = nop
2708 testHookPrePendingDial = nop
2709 testHookPostPendingDial = nop
2710
2711 testHookMu sync.Locker = fakeLocker{}
2712 testHookReadLoopBeforeNextRead = nop
2713 )
2714
2715 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2716 testHookEnterRoundTrip()
2717 pc.mu.Lock()
2718 pc.numExpectedResponses++
2719 headerFn := pc.mutateHeaderFunc
2720 pc.mu.Unlock()
2721
2722 if headerFn != nil {
2723 headerFn(req.extraHeaders())
2724 }
2725
2726
2727
2728
2729
2730 requestedGzip := false
2731 if !pc.t.DisableCompression &&
2732 req.Header.Get("Accept-Encoding") == "" &&
2733 req.Header.Get("Range") == "" &&
2734 req.Method != "HEAD" {
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747 requestedGzip = true
2748 req.extraHeaders().Set("Accept-Encoding", "gzip")
2749 }
2750
2751 var continueCh chan struct{}
2752 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2753 continueCh = make(chan struct{}, 1)
2754 }
2755
2756 if pc.t.DisableKeepAlives &&
2757 !req.wantsClose() &&
2758 !isProtocolSwitchHeader(req.Header) {
2759 req.extraHeaders().Set("Connection", "close")
2760 }
2761
2762 gone := make(chan struct{})
2763 defer close(gone)
2764
2765 const debugRoundTrip = false
2766
2767
2768
2769
2770 startBytesWritten := pc.nwrite
2771 writeErrCh := make(chan error, 1)
2772 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2773
2774 resc := make(chan responseAndError)
2775 pc.reqch <- requestAndChan{
2776 treq: req,
2777 ch: resc,
2778 addedGzip: requestedGzip,
2779 continueCh: continueCh,
2780 callerGone: gone,
2781 }
2782
2783 handleResponse := func(re responseAndError) (*Response, error) {
2784 if (re.res == nil) == (re.err == nil) {
2785 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2786 }
2787 if debugRoundTrip {
2788 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2789 }
2790 if re.err != nil {
2791 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2792 }
2793 return re.res, nil
2794 }
2795
2796 var respHeaderTimer <-chan time.Time
2797 ctxDoneChan := req.ctx.Done()
2798 pcClosed := pc.closech
2799 for {
2800 testHookWaitResLoop()
2801 select {
2802 case err := <-writeErrCh:
2803 if debugRoundTrip {
2804 req.logf("writeErrCh recv: %T/%#v", err, err)
2805 }
2806 if err != nil {
2807 pc.close(fmt.Errorf("write error: %w", err))
2808 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2809 }
2810 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2811 if debugRoundTrip {
2812 req.logf("starting timer for %v", d)
2813 }
2814 timer := time.NewTimer(d)
2815 defer timer.Stop()
2816 respHeaderTimer = timer.C
2817 }
2818 case <-pcClosed:
2819 select {
2820 case re := <-resc:
2821
2822
2823
2824 return handleResponse(re)
2825 default:
2826 }
2827 if debugRoundTrip {
2828 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2829 }
2830 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2831 case <-respHeaderTimer:
2832 if debugRoundTrip {
2833 req.logf("timeout waiting for response headers.")
2834 }
2835 pc.close(errTimeout)
2836 return nil, errTimeout
2837 case re := <-resc:
2838 return handleResponse(re)
2839 case <-ctxDoneChan:
2840 select {
2841 case re := <-resc:
2842
2843
2844
2845 return handleResponse(re)
2846 default:
2847 }
2848 pc.cancelRequest(context.Cause(req.ctx))
2849 }
2850 }
2851 }
2852
2853
2854
2855 type tLogKey struct{}
2856
2857 func (tr *transportRequest) logf(format string, args ...any) {
2858 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2859 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2860 }
2861 }
2862
2863
2864
2865 func (pc *persistConn) markReused() {
2866 pc.mu.Lock()
2867 pc.reused = true
2868 pc.mu.Unlock()
2869 }
2870
2871
2872
2873
2874
2875
2876 func (pc *persistConn) close(err error) {
2877 pc.mu.Lock()
2878 defer pc.mu.Unlock()
2879 pc.closeLocked(err)
2880 }
2881
2882 func (pc *persistConn) closeLocked(err error) {
2883 if err == nil {
2884 panic("nil error")
2885 }
2886 pc.broken = true
2887 if pc.closed == nil {
2888 pc.closed = err
2889 pc.t.decConnsPerHost(pc.cacheKey)
2890
2891
2892 if pc.alt == nil {
2893 if err != errCallerOwnsConn {
2894 pc.conn.Close()
2895 }
2896 close(pc.closech)
2897 }
2898 }
2899 pc.mutateHeaderFunc = nil
2900 }
2901
2902 var portMap = map[string]string{
2903 "http": "80",
2904 "https": "443",
2905 "socks5": "1080",
2906 "socks5h": "1080",
2907 }
2908
2909 func idnaASCIIFromURL(url *url.URL) string {
2910 addr := url.Hostname()
2911 if v, err := idnaASCII(addr); err == nil {
2912 addr = v
2913 }
2914 return addr
2915 }
2916
2917
2918 func canonicalAddr(url *url.URL) string {
2919 port := url.Port()
2920 if port == "" {
2921 port = portMap[url.Scheme]
2922 }
2923 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2924 }
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937 type bodyEOFSignal struct {
2938 body io.ReadCloser
2939 mu sync.Mutex
2940 closed bool
2941 rerr error
2942 fn func(error) error
2943 earlyCloseFn func() error
2944 }
2945
2946 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2947
2948 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2949 es.mu.Lock()
2950 closed, rerr := es.closed, es.rerr
2951 es.mu.Unlock()
2952 if closed {
2953 return 0, errReadOnClosedResBody
2954 }
2955 if rerr != nil {
2956 return 0, rerr
2957 }
2958
2959 n, err = es.body.Read(p)
2960 if err != nil {
2961 es.mu.Lock()
2962 defer es.mu.Unlock()
2963 if es.rerr == nil {
2964 es.rerr = err
2965 }
2966 err = es.condfn(err)
2967 }
2968 return
2969 }
2970
2971 func (es *bodyEOFSignal) Close() error {
2972 es.mu.Lock()
2973 defer es.mu.Unlock()
2974 if es.closed {
2975 return nil
2976 }
2977 es.closed = true
2978 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2979 return es.earlyCloseFn()
2980 }
2981 err := es.body.Close()
2982 return es.condfn(err)
2983 }
2984
2985
2986 func (es *bodyEOFSignal) condfn(err error) error {
2987 if es.fn == nil {
2988 return err
2989 }
2990 err = es.fn(err)
2991 es.fn = nil
2992 return err
2993 }
2994
2995
2996
2997 type gzipReader struct {
2998 _ incomparable
2999 body *bodyEOFSignal
3000 zr *gzip.Reader
3001 zerr error
3002 }
3003
3004 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3005 if gz.zr == nil {
3006 if gz.zerr == nil {
3007 gz.zr, gz.zerr = gzip.NewReader(gz.body)
3008 }
3009 if gz.zerr != nil {
3010 return 0, gz.zerr
3011 }
3012 }
3013
3014 gz.body.mu.Lock()
3015 if gz.body.closed {
3016 err = errReadOnClosedResBody
3017 }
3018 gz.body.mu.Unlock()
3019
3020 if err != nil {
3021 return 0, err
3022 }
3023 return gz.zr.Read(p)
3024 }
3025
3026 func (gz *gzipReader) Close() error {
3027 return gz.body.Close()
3028 }
3029
3030 type tlsHandshakeTimeoutError struct{}
3031
3032 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3033 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3034 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3035
3036
3037
3038
3039 type fakeLocker struct{}
3040
3041 func (fakeLocker) Lock() {}
3042 func (fakeLocker) Unlock() {}
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3058 if cfg == nil {
3059 return &tls.Config{}
3060 }
3061 return cfg.Clone()
3062 }
3063
3064 type connLRU struct {
3065 ll *list.List
3066 m map[*persistConn]*list.Element
3067 }
3068
3069
3070 func (cl *connLRU) add(pc *persistConn) {
3071 if cl.ll == nil {
3072 cl.ll = list.New()
3073 cl.m = make(map[*persistConn]*list.Element)
3074 }
3075 ele := cl.ll.PushFront(pc)
3076 if _, ok := cl.m[pc]; ok {
3077 panic("persistConn was already in LRU")
3078 }
3079 cl.m[pc] = ele
3080 }
3081
3082 func (cl *connLRU) removeOldest() *persistConn {
3083 ele := cl.ll.Back()
3084 pc := ele.Value.(*persistConn)
3085 cl.ll.Remove(ele)
3086 delete(cl.m, pc)
3087 return pc
3088 }
3089
3090
3091 func (cl *connLRU) remove(pc *persistConn) {
3092 if ele, ok := cl.m[pc]; ok {
3093 cl.ll.Remove(ele)
3094 delete(cl.m, pc)
3095 }
3096 }
3097
3098
3099 func (cl *connLRU) len() int {
3100 return len(cl.m)
3101 }
3102
View as plain text