Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "net"
24 "net/http/httptrace"
25 "net/http/internal/ascii"
26 "net/textproto"
27 "net/url"
28 "reflect"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33 _ "unsafe"
34
35 "golang.org/x/net/http/httpguts"
36 "golang.org/x/net/http/httpproxy"
37 )
38
39
40
41
42
43
44 var DefaultTransport RoundTripper = &Transport{
45 Proxy: ProxyFromEnvironment,
46 DialContext: defaultTransportDialContext(&net.Dialer{
47 Timeout: 30 * time.Second,
48 KeepAlive: 30 * time.Second,
49 }),
50 ForceAttemptHTTP2: true,
51 MaxIdleConns: 100,
52 IdleConnTimeout: 90 * time.Second,
53 TLSHandshakeTimeout: 10 * time.Second,
54 ExpectContinueTimeout: 1 * time.Second,
55 }
56
57
58
59 const DefaultMaxIdleConnsPerHost = 2
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 type Transport struct {
97 idleMu sync.Mutex
98 closeIdle bool
99 idleConn map[connectMethodKey][]*persistConn
100 idleConnWait map[connectMethodKey]wantConnQueue
101 idleLRU connLRU
102
103 reqMu sync.Mutex
104 reqCanceler map[*Request]context.CancelCauseFunc
105
106 altMu sync.Mutex
107 altProto atomic.Value
108
109 connsPerHostMu sync.Mutex
110 connsPerHost map[connectMethodKey]int
111 connsPerHostWait map[connectMethodKey]wantConnQueue
112 dialsInProgress wantConnQueue
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 Proxy func(*Request) (*url.URL, error)
129
130
131
132
133 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
134
135
136
137
138
139
140
141
142
143 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 Dial func(network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163
164
165
166
167 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
168
169
170
171
172
173
174
175 DialTLS func(network, addr string) (net.Conn, error)
176
177
178
179
180
181 TLSClientConfig *tls.Config
182
183
184
185 TLSHandshakeTimeout time.Duration
186
187
188
189
190
191
192 DisableKeepAlives bool
193
194
195
196
197
198
199
200
201
202 DisableCompression bool
203
204
205
206 MaxIdleConns int
207
208
209
210
211 MaxIdleConnsPerHost int
212
213
214
215
216
217
218 MaxConnsPerHost int
219
220
221
222
223
224 IdleConnTimeout time.Duration
225
226
227
228
229
230 ResponseHeaderTimeout time.Duration
231
232
233
234
235
236
237
238
239 ExpectContinueTimeout time.Duration
240
241
242
243
244
245
246
247
248
249
250
251 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
252
253
254
255
256 ProxyConnectHeader Header
257
258
259
260
261
262
263
264
265 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
266
267
268
269
270
271
272 MaxResponseHeaderBytes int64
273
274
275
276
277 WriteBufferSize int
278
279
280
281
282 ReadBufferSize int
283
284
285
286 nextProtoOnce sync.Once
287 h2transport h2Transport
288 tlsNextProtoWasNil bool
289
290
291
292
293
294
295 ForceAttemptHTTP2 bool
296 }
297
298 func (t *Transport) writeBufferSize() int {
299 if t.WriteBufferSize > 0 {
300 return t.WriteBufferSize
301 }
302 return 4 << 10
303 }
304
305 func (t *Transport) readBufferSize() int {
306 if t.ReadBufferSize > 0 {
307 return t.ReadBufferSize
308 }
309 return 4 << 10
310 }
311
312
313 func (t *Transport) Clone() *Transport {
314 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
315 t2 := &Transport{
316 Proxy: t.Proxy,
317 OnProxyConnectResponse: t.OnProxyConnectResponse,
318 DialContext: t.DialContext,
319 Dial: t.Dial,
320 DialTLS: t.DialTLS,
321 DialTLSContext: t.DialTLSContext,
322 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
323 DisableKeepAlives: t.DisableKeepAlives,
324 DisableCompression: t.DisableCompression,
325 MaxIdleConns: t.MaxIdleConns,
326 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
327 MaxConnsPerHost: t.MaxConnsPerHost,
328 IdleConnTimeout: t.IdleConnTimeout,
329 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
330 ExpectContinueTimeout: t.ExpectContinueTimeout,
331 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
332 GetProxyConnectHeader: t.GetProxyConnectHeader,
333 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
334 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
335 WriteBufferSize: t.WriteBufferSize,
336 ReadBufferSize: t.ReadBufferSize,
337 }
338 if t.TLSClientConfig != nil {
339 t2.TLSClientConfig = t.TLSClientConfig.Clone()
340 }
341 if !t.tlsNextProtoWasNil {
342 npm := map[string]func(authority string, c *tls.Conn) RoundTripper{}
343 for k, v := range t.TLSNextProto {
344 npm[k] = v
345 }
346 t2.TLSNextProto = npm
347 }
348 return t2
349 }
350
351
352
353
354
355
356
357 type h2Transport interface {
358 CloseIdleConnections()
359 }
360
361 func (t *Transport) hasCustomTLSDialer() bool {
362 return t.DialTLS != nil || t.DialTLSContext != nil
363 }
364
365 var http2client = godebug.New("http2client")
366
367
368
369 func (t *Transport) onceSetNextProtoDefaults() {
370 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
371 if http2client.Value() == "0" {
372 http2client.IncNonDefault()
373 return
374 }
375
376
377
378
379
380
381 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
382 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
383 if v := rv.Field(0); v.CanInterface() {
384 if h2i, ok := v.Interface().(h2Transport); ok {
385 t.h2transport = h2i
386 return
387 }
388 }
389 }
390
391 if t.TLSNextProto != nil {
392
393
394 return
395 }
396 if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()) {
397
398
399
400
401
402
403 return
404 }
405 if omitBundledHTTP2 {
406 return
407 }
408 t2, err := http2configureTransports(t)
409 if err != nil {
410 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
411 return
412 }
413 t.h2transport = t2
414
415
416
417
418
419
420
421 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
422 const h2max = 1<<32 - 1
423 if limit1 >= h2max {
424 t2.MaxHeaderListSize = h2max
425 } else {
426 t2.MaxHeaderListSize = uint32(limit1)
427 }
428 }
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
448 return envProxyFunc()(req.URL)
449 }
450
451
452
453 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
454 return func(*Request) (*url.URL, error) {
455 return fixedURL, nil
456 }
457 }
458
459
460
461
462 type transportRequest struct {
463 *Request
464 extra Header
465 trace *httptrace.ClientTrace
466
467 ctx context.Context
468 cancel context.CancelCauseFunc
469
470 mu sync.Mutex
471 err error
472 }
473
474 func (tr *transportRequest) extraHeaders() Header {
475 if tr.extra == nil {
476 tr.extra = make(Header)
477 }
478 return tr.extra
479 }
480
481 func (tr *transportRequest) setError(err error) {
482 tr.mu.Lock()
483 if tr.err == nil {
484 tr.err = err
485 }
486 tr.mu.Unlock()
487 }
488
489
490
491 func (t *Transport) useRegisteredProtocol(req *Request) bool {
492 if req.URL.Scheme == "https" && req.requiresHTTP1() {
493
494
495
496
497 return false
498 }
499 return true
500 }
501
502
503
504
505 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
506 if !t.useRegisteredProtocol(req) {
507 return nil
508 }
509 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
510 return altProto[req.URL.Scheme]
511 }
512
513 func validateHeaders(hdrs Header) string {
514 for k, vv := range hdrs {
515 if !httpguts.ValidHeaderFieldName(k) {
516 return fmt.Sprintf("field name %q", k)
517 }
518 for _, v := range vv {
519 if !httpguts.ValidHeaderFieldValue(v) {
520
521
522 return fmt.Sprintf("field value for %q", k)
523 }
524 }
525 }
526 return ""
527 }
528
529
530 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
531 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
532 ctx := req.Context()
533 trace := httptrace.ContextClientTrace(ctx)
534
535 if req.URL == nil {
536 req.closeBody()
537 return nil, errors.New("http: nil Request.URL")
538 }
539 if req.Header == nil {
540 req.closeBody()
541 return nil, errors.New("http: nil Request.Header")
542 }
543 scheme := req.URL.Scheme
544 isHTTP := scheme == "http" || scheme == "https"
545 if isHTTP {
546
547 if err := validateHeaders(req.Header); err != "" {
548 req.closeBody()
549 return nil, fmt.Errorf("net/http: invalid header %s", err)
550 }
551
552
553 if err := validateHeaders(req.Trailer); err != "" {
554 req.closeBody()
555 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
556 }
557 }
558
559 origReq := req
560 req = setupRewindBody(req)
561
562 if altRT := t.alternateRoundTripper(req); altRT != nil {
563 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
564 return resp, err
565 }
566 var err error
567 req, err = rewindBody(req)
568 if err != nil {
569 return nil, err
570 }
571 }
572 if !isHTTP {
573 req.closeBody()
574 return nil, badStringError("unsupported protocol scheme", scheme)
575 }
576 if req.Method != "" && !validMethod(req.Method) {
577 req.closeBody()
578 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
579 }
580 if req.URL.Host == "" {
581 req.closeBody()
582 return nil, errors.New("http: no Host in request URL")
583 }
584
585
586
587
588
589
590
591
592
593
594 ctx, cancel := context.WithCancelCause(req.Context())
595
596
597 if origReq.Cancel != nil {
598 go awaitLegacyCancel(ctx, cancel, origReq)
599 }
600
601
602
603
604
605 cancel = t.prepareTransportCancel(origReq, cancel)
606
607 defer func() {
608 if err != nil {
609 cancel(err)
610 }
611 }()
612
613 for {
614 select {
615 case <-ctx.Done():
616 req.closeBody()
617 return nil, context.Cause(ctx)
618 default:
619 }
620
621
622 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
623 cm, err := t.connectMethodForRequest(treq)
624 if err != nil {
625 req.closeBody()
626 return nil, err
627 }
628
629
630
631
632
633 pconn, err := t.getConn(treq, cm)
634 if err != nil {
635 req.closeBody()
636 return nil, err
637 }
638
639 var resp *Response
640 if pconn.alt != nil {
641
642 resp, err = pconn.alt.RoundTrip(req)
643 } else {
644 resp, err = pconn.roundTrip(treq)
645 }
646 if err == nil {
647 if pconn.alt != nil {
648
649
650
651
652
653 cancel(errRequestDone)
654 }
655 resp.Request = origReq
656 return resp, nil
657 }
658
659
660 if http2isNoCachedConnError(err) {
661 if t.removeIdleConn(pconn) {
662 t.decConnsPerHost(pconn.cacheKey)
663 }
664 } else if !pconn.shouldRetryRequest(req, err) {
665
666
667 if e, ok := err.(nothingWrittenError); ok {
668 err = e.error
669 }
670 if e, ok := err.(transportReadFromServerError); ok {
671 err = e.err
672 }
673 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
674
675
676
677 req.closeBody()
678 }
679 return nil, err
680 }
681 testHookRoundTripRetried()
682
683
684 req, err = rewindBody(req)
685 if err != nil {
686 return nil, err
687 }
688 }
689 }
690
691 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
692 select {
693 case <-req.Cancel:
694 cancel(errRequestCanceled)
695 case <-ctx.Done():
696 }
697 }
698
699 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
700
701 type readTrackingBody struct {
702 io.ReadCloser
703 didRead bool
704 didClose bool
705 }
706
707 func (r *readTrackingBody) Read(data []byte) (int, error) {
708 r.didRead = true
709 return r.ReadCloser.Read(data)
710 }
711
712 func (r *readTrackingBody) Close() error {
713 r.didClose = true
714 return r.ReadCloser.Close()
715 }
716
717
718
719
720
721 func setupRewindBody(req *Request) *Request {
722 if req.Body == nil || req.Body == NoBody {
723 return req
724 }
725 newReq := *req
726 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
727 return &newReq
728 }
729
730
731
732
733
734 func rewindBody(req *Request) (rewound *Request, err error) {
735 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
736 return req, nil
737 }
738 if !req.Body.(*readTrackingBody).didClose {
739 req.closeBody()
740 }
741 if req.GetBody == nil {
742 return nil, errCannotRewind
743 }
744 body, err := req.GetBody()
745 if err != nil {
746 return nil, err
747 }
748 newReq := *req
749 newReq.Body = &readTrackingBody{ReadCloser: body}
750 return &newReq, nil
751 }
752
753
754
755
756 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
757 if http2isNoCachedConnError(err) {
758
759
760
761
762
763
764 return true
765 }
766 if err == errMissingHost {
767
768 return false
769 }
770 if !pc.isReused() {
771
772
773
774
775
776
777
778 return false
779 }
780 if _, ok := err.(nothingWrittenError); ok {
781
782
783 return req.outgoingLength() == 0 || req.GetBody != nil
784 }
785 if !req.isReplayable() {
786
787 return false
788 }
789 if _, ok := err.(transportReadFromServerError); ok {
790
791
792 return true
793 }
794 if err == errServerClosedIdle {
795
796
797
798 return true
799 }
800 return false
801 }
802
803
804 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
805
806
807
808
809
810
811
812
813
814
815
816 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
817 t.altMu.Lock()
818 defer t.altMu.Unlock()
819 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
820 if _, exists := oldMap[scheme]; exists {
821 panic("protocol " + scheme + " already registered")
822 }
823 newMap := make(map[string]RoundTripper)
824 for k, v := range oldMap {
825 newMap[k] = v
826 }
827 newMap[scheme] = rt
828 t.altProto.Store(newMap)
829 }
830
831
832
833
834
835 func (t *Transport) CloseIdleConnections() {
836 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
837 t.idleMu.Lock()
838 m := t.idleConn
839 t.idleConn = nil
840 t.closeIdle = true
841 t.idleLRU = connLRU{}
842 t.idleMu.Unlock()
843 for _, conns := range m {
844 for _, pconn := range conns {
845 pconn.close(errCloseIdleConns)
846 }
847 }
848 t.connsPerHostMu.Lock()
849 t.dialsInProgress.all(func(w *wantConn) {
850 if w.cancelCtx != nil && !w.waiting() {
851 w.cancelCtx()
852 }
853 })
854 t.connsPerHostMu.Unlock()
855 if t2 := t.h2transport; t2 != nil {
856 t2.CloseIdleConnections()
857 }
858 }
859
860
861 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
862
863
864
865
866
867
868 cancel := func(err error) {
869 origCancel(err)
870 t.reqMu.Lock()
871 delete(t.reqCanceler, req)
872 t.reqMu.Unlock()
873 }
874 t.reqMu.Lock()
875 if t.reqCanceler == nil {
876 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
877 }
878 t.reqCanceler[req] = cancel
879 t.reqMu.Unlock()
880 return cancel
881 }
882
883
884
885
886
887
888
889 func (t *Transport) CancelRequest(req *Request) {
890 t.reqMu.Lock()
891 cancel := t.reqCanceler[req]
892 t.reqMu.Unlock()
893 if cancel != nil {
894 cancel(errRequestCanceled)
895 }
896 }
897
898
899
900
901
902 var (
903 envProxyOnce sync.Once
904 envProxyFuncValue func(*url.URL) (*url.URL, error)
905 )
906
907
908
909 func envProxyFunc() func(*url.URL) (*url.URL, error) {
910 envProxyOnce.Do(func() {
911 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
912 })
913 return envProxyFuncValue
914 }
915
916
917 func resetProxyConfig() {
918 envProxyOnce = sync.Once{}
919 envProxyFuncValue = nil
920 }
921
922 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
923 cm.targetScheme = treq.URL.Scheme
924 cm.targetAddr = canonicalAddr(treq.URL)
925 if t.Proxy != nil {
926 cm.proxyURL, err = t.Proxy(treq.Request)
927 }
928 cm.onlyH1 = treq.requiresHTTP1()
929 return cm, err
930 }
931
932
933
934 func (cm *connectMethod) proxyAuth() string {
935 if cm.proxyURL == nil {
936 return ""
937 }
938 if u := cm.proxyURL.User; u != nil {
939 username := u.Username()
940 password, _ := u.Password()
941 return "Basic " + basicAuth(username, password)
942 }
943 return ""
944 }
945
946
947 var (
948 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
949 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
950 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
951 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
952 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
953 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
954 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
955 errIdleConnTimeout = errors.New("http: idle connection timeout")
956
957
958
959
960
961 errServerClosedIdle = errors.New("http: server closed idle connection")
962 )
963
964
965
966
967
968
969
970
971
972 type transportReadFromServerError struct {
973 err error
974 }
975
976 func (e transportReadFromServerError) Unwrap() error { return e.err }
977
978 func (e transportReadFromServerError) Error() string {
979 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
980 }
981
982 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
983 if err := t.tryPutIdleConn(pconn); err != nil {
984 pconn.close(err)
985 }
986 }
987
988 func (t *Transport) maxIdleConnsPerHost() int {
989 if v := t.MaxIdleConnsPerHost; v != 0 {
990 return v
991 }
992 return DefaultMaxIdleConnsPerHost
993 }
994
995
996
997
998
999
1000 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1001 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1002 return errKeepAlivesDisabled
1003 }
1004 if pconn.isBroken() {
1005 return errConnBroken
1006 }
1007 pconn.markReused()
1008
1009 t.idleMu.Lock()
1010 defer t.idleMu.Unlock()
1011
1012
1013
1014
1015 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1016 return nil
1017 }
1018
1019
1020
1021
1022
1023 key := pconn.cacheKey
1024 if q, ok := t.idleConnWait[key]; ok {
1025 done := false
1026 if pconn.alt == nil {
1027
1028
1029 for q.len() > 0 {
1030 w := q.popFront()
1031 if w.tryDeliver(pconn, nil, time.Time{}) {
1032 done = true
1033 break
1034 }
1035 }
1036 } else {
1037
1038
1039
1040
1041 for q.len() > 0 {
1042 w := q.popFront()
1043 w.tryDeliver(pconn, nil, time.Time{})
1044 }
1045 }
1046 if q.len() == 0 {
1047 delete(t.idleConnWait, key)
1048 } else {
1049 t.idleConnWait[key] = q
1050 }
1051 if done {
1052 return nil
1053 }
1054 }
1055
1056 if t.closeIdle {
1057 return errCloseIdle
1058 }
1059 if t.idleConn == nil {
1060 t.idleConn = make(map[connectMethodKey][]*persistConn)
1061 }
1062 idles := t.idleConn[key]
1063 if len(idles) >= t.maxIdleConnsPerHost() {
1064 return errTooManyIdleHost
1065 }
1066 for _, exist := range idles {
1067 if exist == pconn {
1068 log.Fatalf("dup idle pconn %p in freelist", pconn)
1069 }
1070 }
1071 t.idleConn[key] = append(idles, pconn)
1072 t.idleLRU.add(pconn)
1073 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1074 oldest := t.idleLRU.removeOldest()
1075 oldest.close(errTooManyIdle)
1076 t.removeIdleConnLocked(oldest)
1077 }
1078
1079
1080
1081
1082 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1083 if pconn.idleTimer != nil {
1084 pconn.idleTimer.Reset(t.IdleConnTimeout)
1085 } else {
1086 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1087 }
1088 }
1089 pconn.idleAt = time.Now()
1090 return nil
1091 }
1092
1093
1094
1095
1096 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1097 if t.DisableKeepAlives {
1098 return false
1099 }
1100
1101 t.idleMu.Lock()
1102 defer t.idleMu.Unlock()
1103
1104
1105
1106 t.closeIdle = false
1107
1108 if w == nil {
1109
1110 return false
1111 }
1112
1113
1114
1115
1116 var oldTime time.Time
1117 if t.IdleConnTimeout > 0 {
1118 oldTime = time.Now().Add(-t.IdleConnTimeout)
1119 }
1120
1121
1122 if list, ok := t.idleConn[w.key]; ok {
1123 stop := false
1124 delivered := false
1125 for len(list) > 0 && !stop {
1126 pconn := list[len(list)-1]
1127
1128
1129
1130
1131 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1132 if tooOld {
1133
1134
1135
1136 go pconn.closeConnIfStillIdle()
1137 }
1138 if pconn.isBroken() || tooOld {
1139
1140
1141
1142
1143
1144 list = list[:len(list)-1]
1145 continue
1146 }
1147 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1148 if delivered {
1149 if pconn.alt != nil {
1150
1151
1152 } else {
1153
1154
1155 t.idleLRU.remove(pconn)
1156 list = list[:len(list)-1]
1157 }
1158 }
1159 stop = true
1160 }
1161 if len(list) > 0 {
1162 t.idleConn[w.key] = list
1163 } else {
1164 delete(t.idleConn, w.key)
1165 }
1166 if stop {
1167 return delivered
1168 }
1169 }
1170
1171
1172 if t.idleConnWait == nil {
1173 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1174 }
1175 q := t.idleConnWait[w.key]
1176 q.cleanFrontNotWaiting()
1177 q.pushBack(w)
1178 t.idleConnWait[w.key] = q
1179 return false
1180 }
1181
1182
1183 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1184 t.idleMu.Lock()
1185 defer t.idleMu.Unlock()
1186 return t.removeIdleConnLocked(pconn)
1187 }
1188
1189
1190 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1191 if pconn.idleTimer != nil {
1192 pconn.idleTimer.Stop()
1193 }
1194 t.idleLRU.remove(pconn)
1195 key := pconn.cacheKey
1196 pconns := t.idleConn[key]
1197 var removed bool
1198 switch len(pconns) {
1199 case 0:
1200
1201 case 1:
1202 if pconns[0] == pconn {
1203 delete(t.idleConn, key)
1204 removed = true
1205 }
1206 default:
1207 for i, v := range pconns {
1208 if v != pconn {
1209 continue
1210 }
1211
1212
1213 copy(pconns[i:], pconns[i+1:])
1214 t.idleConn[key] = pconns[:len(pconns)-1]
1215 removed = true
1216 break
1217 }
1218 }
1219 return removed
1220 }
1221
1222 var zeroDialer net.Dialer
1223
1224 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1225 if t.DialContext != nil {
1226 c, err := t.DialContext(ctx, network, addr)
1227 if c == nil && err == nil {
1228 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1229 }
1230 return c, err
1231 }
1232 if t.Dial != nil {
1233 c, err := t.Dial(network, addr)
1234 if c == nil && err == nil {
1235 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1236 }
1237 return c, err
1238 }
1239 return zeroDialer.DialContext(ctx, network, addr)
1240 }
1241
1242
1243
1244
1245
1246
1247
1248 type wantConn struct {
1249 cm connectMethod
1250 key connectMethodKey
1251
1252
1253
1254
1255 beforeDial func()
1256 afterDial func()
1257
1258 mu sync.Mutex
1259 ctx context.Context
1260 cancelCtx context.CancelFunc
1261 done bool
1262 result chan connOrError
1263 }
1264
1265 type connOrError struct {
1266 pc *persistConn
1267 err error
1268 idleAt time.Time
1269 }
1270
1271
1272 func (w *wantConn) waiting() bool {
1273 w.mu.Lock()
1274 defer w.mu.Unlock()
1275
1276 return !w.done
1277 }
1278
1279
1280 func (w *wantConn) getCtxForDial() context.Context {
1281 w.mu.Lock()
1282 defer w.mu.Unlock()
1283
1284 return w.ctx
1285 }
1286
1287
1288 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1289 w.mu.Lock()
1290 defer w.mu.Unlock()
1291
1292 if w.done {
1293 return false
1294 }
1295 if (pc == nil) == (err == nil) {
1296 panic("net/http: internal error: misuse of tryDeliver")
1297 }
1298 w.ctx = nil
1299 w.done = true
1300
1301 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1302 close(w.result)
1303
1304 return true
1305 }
1306
1307
1308
1309 func (w *wantConn) cancel(t *Transport, err error) {
1310 w.mu.Lock()
1311 var pc *persistConn
1312 if w.done {
1313 if r, ok := <-w.result; ok {
1314 pc = r.pc
1315 }
1316 } else {
1317 close(w.result)
1318 }
1319 w.ctx = nil
1320 w.done = true
1321 w.mu.Unlock()
1322
1323 if pc != nil {
1324 t.putOrCloseIdleConn(pc)
1325 }
1326 }
1327
1328
1329 type wantConnQueue struct {
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340 head []*wantConn
1341 headPos int
1342 tail []*wantConn
1343 }
1344
1345
1346 func (q *wantConnQueue) len() int {
1347 return len(q.head) - q.headPos + len(q.tail)
1348 }
1349
1350
1351 func (q *wantConnQueue) pushBack(w *wantConn) {
1352 q.tail = append(q.tail, w)
1353 }
1354
1355
1356 func (q *wantConnQueue) popFront() *wantConn {
1357 if q.headPos >= len(q.head) {
1358 if len(q.tail) == 0 {
1359 return nil
1360 }
1361
1362 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1363 }
1364 w := q.head[q.headPos]
1365 q.head[q.headPos] = nil
1366 q.headPos++
1367 return w
1368 }
1369
1370
1371 func (q *wantConnQueue) peekFront() *wantConn {
1372 if q.headPos < len(q.head) {
1373 return q.head[q.headPos]
1374 }
1375 if len(q.tail) > 0 {
1376 return q.tail[0]
1377 }
1378 return nil
1379 }
1380
1381
1382
1383 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1384 for {
1385 w := q.peekFront()
1386 if w == nil || w.waiting() {
1387 return cleaned
1388 }
1389 q.popFront()
1390 cleaned = true
1391 }
1392 }
1393
1394
1395 func (q *wantConnQueue) cleanFrontCanceled() {
1396 for {
1397 w := q.peekFront()
1398 if w == nil || w.cancelCtx != nil {
1399 return
1400 }
1401 q.popFront()
1402 }
1403 }
1404
1405
1406
1407 func (q *wantConnQueue) all(f func(*wantConn)) {
1408 for _, w := range q.head[q.headPos:] {
1409 f(w)
1410 }
1411 for _, w := range q.tail {
1412 f(w)
1413 }
1414 }
1415
1416 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1417 if t.DialTLSContext != nil {
1418 conn, err = t.DialTLSContext(ctx, network, addr)
1419 } else {
1420 conn, err = t.DialTLS(network, addr)
1421 }
1422 if conn == nil && err == nil {
1423 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1424 }
1425 return
1426 }
1427
1428
1429
1430
1431
1432 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1433 req := treq.Request
1434 trace := treq.trace
1435 ctx := req.Context()
1436 if trace != nil && trace.GetConn != nil {
1437 trace.GetConn(cm.addr())
1438 }
1439
1440
1441
1442
1443
1444
1445 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1446
1447 w := &wantConn{
1448 cm: cm,
1449 key: cm.key(),
1450 ctx: dialCtx,
1451 cancelCtx: dialCancel,
1452 result: make(chan connOrError, 1),
1453 beforeDial: testHookPrePendingDial,
1454 afterDial: testHookPostPendingDial,
1455 }
1456 defer func() {
1457 if err != nil {
1458 w.cancel(t, err)
1459 }
1460 }()
1461
1462
1463 if delivered := t.queueForIdleConn(w); !delivered {
1464 t.queueForDial(w)
1465 }
1466
1467
1468 select {
1469 case r := <-w.result:
1470
1471
1472 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1473 info := httptrace.GotConnInfo{
1474 Conn: r.pc.conn,
1475 Reused: r.pc.isReused(),
1476 }
1477 if !r.idleAt.IsZero() {
1478 info.WasIdle = true
1479 info.IdleTime = time.Since(r.idleAt)
1480 }
1481 trace.GotConn(info)
1482 }
1483 if r.err != nil {
1484
1485
1486
1487 select {
1488 case <-treq.ctx.Done():
1489 err := context.Cause(treq.ctx)
1490 if err == errRequestCanceled {
1491 err = errRequestCanceledConn
1492 }
1493 return nil, err
1494 default:
1495
1496 }
1497 }
1498 return r.pc, r.err
1499 case <-treq.ctx.Done():
1500 err := context.Cause(treq.ctx)
1501 if err == errRequestCanceled {
1502 err = errRequestCanceledConn
1503 }
1504 return nil, err
1505 }
1506 }
1507
1508
1509
1510 func (t *Transport) queueForDial(w *wantConn) {
1511 w.beforeDial()
1512
1513 t.connsPerHostMu.Lock()
1514 defer t.connsPerHostMu.Unlock()
1515
1516 if t.MaxConnsPerHost <= 0 {
1517 t.startDialConnForLocked(w)
1518 return
1519 }
1520
1521 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1522 if t.connsPerHost == nil {
1523 t.connsPerHost = make(map[connectMethodKey]int)
1524 }
1525 t.connsPerHost[w.key] = n + 1
1526 t.startDialConnForLocked(w)
1527 return
1528 }
1529
1530 if t.connsPerHostWait == nil {
1531 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1532 }
1533 q := t.connsPerHostWait[w.key]
1534 q.cleanFrontNotWaiting()
1535 q.pushBack(w)
1536 t.connsPerHostWait[w.key] = q
1537 }
1538
1539
1540
1541 func (t *Transport) startDialConnForLocked(w *wantConn) {
1542 t.dialsInProgress.cleanFrontCanceled()
1543 t.dialsInProgress.pushBack(w)
1544 go func() {
1545 t.dialConnFor(w)
1546 t.connsPerHostMu.Lock()
1547 defer t.connsPerHostMu.Unlock()
1548 w.cancelCtx = nil
1549 }()
1550 }
1551
1552
1553
1554
1555 func (t *Transport) dialConnFor(w *wantConn) {
1556 defer w.afterDial()
1557 ctx := w.getCtxForDial()
1558 if ctx == nil {
1559 t.decConnsPerHost(w.key)
1560 return
1561 }
1562
1563 pc, err := t.dialConn(ctx, w.cm)
1564 delivered := w.tryDeliver(pc, err, time.Time{})
1565 if err == nil && (!delivered || pc.alt != nil) {
1566
1567
1568
1569 t.putOrCloseIdleConn(pc)
1570 }
1571 if err != nil {
1572 t.decConnsPerHost(w.key)
1573 }
1574 }
1575
1576
1577
1578 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1579 if t.MaxConnsPerHost <= 0 {
1580 return
1581 }
1582
1583 t.connsPerHostMu.Lock()
1584 defer t.connsPerHostMu.Unlock()
1585 n := t.connsPerHost[key]
1586 if n == 0 {
1587
1588
1589 panic("net/http: internal error: connCount underflow")
1590 }
1591
1592
1593
1594
1595
1596 if q := t.connsPerHostWait[key]; q.len() > 0 {
1597 done := false
1598 for q.len() > 0 {
1599 w := q.popFront()
1600 if w.waiting() {
1601 t.startDialConnForLocked(w)
1602 done = true
1603 break
1604 }
1605 }
1606 if q.len() == 0 {
1607 delete(t.connsPerHostWait, key)
1608 } else {
1609
1610
1611 t.connsPerHostWait[key] = q
1612 }
1613 if done {
1614 return
1615 }
1616 }
1617
1618
1619 if n--; n == 0 {
1620 delete(t.connsPerHost, key)
1621 } else {
1622 t.connsPerHost[key] = n
1623 }
1624 }
1625
1626
1627
1628
1629 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1630
1631 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1632 if cfg.ServerName == "" {
1633 cfg.ServerName = name
1634 }
1635 if pconn.cacheKey.onlyH1 {
1636 cfg.NextProtos = nil
1637 }
1638 plainConn := pconn.conn
1639 tlsConn := tls.Client(plainConn, cfg)
1640 errc := make(chan error, 2)
1641 var timer *time.Timer
1642 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1643 timer = time.AfterFunc(d, func() {
1644 errc <- tlsHandshakeTimeoutError{}
1645 })
1646 }
1647 go func() {
1648 if trace != nil && trace.TLSHandshakeStart != nil {
1649 trace.TLSHandshakeStart()
1650 }
1651 err := tlsConn.HandshakeContext(ctx)
1652 if timer != nil {
1653 timer.Stop()
1654 }
1655 errc <- err
1656 }()
1657 if err := <-errc; err != nil {
1658 plainConn.Close()
1659 if err == (tlsHandshakeTimeoutError{}) {
1660
1661
1662 <-errc
1663 }
1664 if trace != nil && trace.TLSHandshakeDone != nil {
1665 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1666 }
1667 return err
1668 }
1669 cs := tlsConn.ConnectionState()
1670 if trace != nil && trace.TLSHandshakeDone != nil {
1671 trace.TLSHandshakeDone(cs, nil)
1672 }
1673 pconn.tlsState = &cs
1674 pconn.conn = tlsConn
1675 return nil
1676 }
1677
1678 type erringRoundTripper interface {
1679 RoundTripErr() error
1680 }
1681
1682 var testHookProxyConnectTimeout = context.WithTimeout
1683
1684 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1685 pconn = &persistConn{
1686 t: t,
1687 cacheKey: cm.key(),
1688 reqch: make(chan requestAndChan, 1),
1689 writech: make(chan writeRequest, 1),
1690 closech: make(chan struct{}),
1691 writeErrCh: make(chan error, 1),
1692 writeLoopDone: make(chan struct{}),
1693 }
1694 trace := httptrace.ContextClientTrace(ctx)
1695 wrapErr := func(err error) error {
1696 if cm.proxyURL != nil {
1697
1698 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1699 }
1700 return err
1701 }
1702 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1703 var err error
1704 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1705 if err != nil {
1706 return nil, wrapErr(err)
1707 }
1708 if tc, ok := pconn.conn.(*tls.Conn); ok {
1709
1710
1711 if trace != nil && trace.TLSHandshakeStart != nil {
1712 trace.TLSHandshakeStart()
1713 }
1714 if err := tc.HandshakeContext(ctx); err != nil {
1715 go pconn.conn.Close()
1716 if trace != nil && trace.TLSHandshakeDone != nil {
1717 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1718 }
1719 return nil, err
1720 }
1721 cs := tc.ConnectionState()
1722 if trace != nil && trace.TLSHandshakeDone != nil {
1723 trace.TLSHandshakeDone(cs, nil)
1724 }
1725 pconn.tlsState = &cs
1726 }
1727 } else {
1728 conn, err := t.dial(ctx, "tcp", cm.addr())
1729 if err != nil {
1730 return nil, wrapErr(err)
1731 }
1732 pconn.conn = conn
1733 if cm.scheme() == "https" {
1734 var firstTLSHost string
1735 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1736 return nil, wrapErr(err)
1737 }
1738 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1739 return nil, wrapErr(err)
1740 }
1741 }
1742 }
1743
1744
1745 switch {
1746 case cm.proxyURL == nil:
1747
1748 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1749 conn := pconn.conn
1750 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1751 if u := cm.proxyURL.User; u != nil {
1752 auth := &socksUsernamePassword{
1753 Username: u.Username(),
1754 }
1755 auth.Password, _ = u.Password()
1756 d.AuthMethods = []socksAuthMethod{
1757 socksAuthMethodNotRequired,
1758 socksAuthMethodUsernamePassword,
1759 }
1760 d.Authenticate = auth.Authenticate
1761 }
1762 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1763 conn.Close()
1764 return nil, err
1765 }
1766 case cm.targetScheme == "http":
1767 pconn.isProxy = true
1768 if pa := cm.proxyAuth(); pa != "" {
1769 pconn.mutateHeaderFunc = func(h Header) {
1770 h.Set("Proxy-Authorization", pa)
1771 }
1772 }
1773 case cm.targetScheme == "https":
1774 conn := pconn.conn
1775 var hdr Header
1776 if t.GetProxyConnectHeader != nil {
1777 var err error
1778 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1779 if err != nil {
1780 conn.Close()
1781 return nil, err
1782 }
1783 } else {
1784 hdr = t.ProxyConnectHeader
1785 }
1786 if hdr == nil {
1787 hdr = make(Header)
1788 }
1789 if pa := cm.proxyAuth(); pa != "" {
1790 hdr = hdr.Clone()
1791 hdr.Set("Proxy-Authorization", pa)
1792 }
1793 connectReq := &Request{
1794 Method: "CONNECT",
1795 URL: &url.URL{Opaque: cm.targetAddr},
1796 Host: cm.targetAddr,
1797 Header: hdr,
1798 }
1799
1800
1801
1802
1803 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1804 defer cancel()
1805
1806 didReadResponse := make(chan struct{})
1807 var (
1808 resp *Response
1809 err error
1810 )
1811
1812 go func() {
1813 defer close(didReadResponse)
1814 err = connectReq.Write(conn)
1815 if err != nil {
1816 return
1817 }
1818
1819
1820 br := bufio.NewReader(conn)
1821 resp, err = ReadResponse(br, connectReq)
1822 }()
1823 select {
1824 case <-connectCtx.Done():
1825 conn.Close()
1826 <-didReadResponse
1827 return nil, connectCtx.Err()
1828 case <-didReadResponse:
1829
1830 }
1831 if err != nil {
1832 conn.Close()
1833 return nil, err
1834 }
1835
1836 if t.OnProxyConnectResponse != nil {
1837 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1838 if err != nil {
1839 conn.Close()
1840 return nil, err
1841 }
1842 }
1843
1844 if resp.StatusCode != 200 {
1845 _, text, ok := strings.Cut(resp.Status, " ")
1846 conn.Close()
1847 if !ok {
1848 return nil, errors.New("unknown status code")
1849 }
1850 return nil, errors.New(text)
1851 }
1852 }
1853
1854 if cm.proxyURL != nil && cm.targetScheme == "https" {
1855 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1856 return nil, err
1857 }
1858 }
1859
1860 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1861 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1862 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1863 if e, ok := alt.(erringRoundTripper); ok {
1864
1865 return nil, e.RoundTripErr()
1866 }
1867 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1868 }
1869 }
1870
1871 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1872 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1873
1874 go pconn.readLoop()
1875 go pconn.writeLoop()
1876 return pconn, nil
1877 }
1878
1879
1880
1881
1882
1883
1884
1885 type persistConnWriter struct {
1886 pc *persistConn
1887 }
1888
1889 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1890 n, err = w.pc.conn.Write(p)
1891 w.pc.nwrite += int64(n)
1892 return
1893 }
1894
1895
1896
1897
1898 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1899 n, err = io.Copy(w.pc.conn, r)
1900 w.pc.nwrite += n
1901 return
1902 }
1903
1904 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922 type connectMethod struct {
1923 _ incomparable
1924 proxyURL *url.URL
1925 targetScheme string
1926
1927
1928
1929 targetAddr string
1930 onlyH1 bool
1931 }
1932
1933 func (cm *connectMethod) key() connectMethodKey {
1934 proxyStr := ""
1935 targetAddr := cm.targetAddr
1936 if cm.proxyURL != nil {
1937 proxyStr = cm.proxyURL.String()
1938 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1939 targetAddr = ""
1940 }
1941 }
1942 return connectMethodKey{
1943 proxy: proxyStr,
1944 scheme: cm.targetScheme,
1945 addr: targetAddr,
1946 onlyH1: cm.onlyH1,
1947 }
1948 }
1949
1950
1951 func (cm *connectMethod) scheme() string {
1952 if cm.proxyURL != nil {
1953 return cm.proxyURL.Scheme
1954 }
1955 return cm.targetScheme
1956 }
1957
1958
1959 func (cm *connectMethod) addr() string {
1960 if cm.proxyURL != nil {
1961 return canonicalAddr(cm.proxyURL)
1962 }
1963 return cm.targetAddr
1964 }
1965
1966
1967
1968 func (cm *connectMethod) tlsHost() string {
1969 h := cm.targetAddr
1970 if hasPort(h) {
1971 h = h[:strings.LastIndex(h, ":")]
1972 }
1973 return h
1974 }
1975
1976
1977
1978
1979 type connectMethodKey struct {
1980 proxy, scheme, addr string
1981 onlyH1 bool
1982 }
1983
1984 func (k connectMethodKey) String() string {
1985
1986 var h1 string
1987 if k.onlyH1 {
1988 h1 = ",h1"
1989 }
1990 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
1991 }
1992
1993
1994
1995 type persistConn struct {
1996
1997
1998
1999 alt RoundTripper
2000
2001 t *Transport
2002 cacheKey connectMethodKey
2003 conn net.Conn
2004 tlsState *tls.ConnectionState
2005 br *bufio.Reader
2006 bw *bufio.Writer
2007 nwrite int64
2008 reqch chan requestAndChan
2009 writech chan writeRequest
2010 closech chan struct{}
2011 isProxy bool
2012 sawEOF bool
2013 readLimit int64
2014
2015
2016
2017
2018 writeErrCh chan error
2019
2020 writeLoopDone chan struct{}
2021
2022
2023 idleAt time.Time
2024 idleTimer *time.Timer
2025
2026 mu sync.Mutex
2027 numExpectedResponses int
2028 closed error
2029 canceledErr error
2030 broken bool
2031 reused bool
2032
2033
2034
2035 mutateHeaderFunc func(Header)
2036 }
2037
2038 func (pc *persistConn) maxHeaderResponseSize() int64 {
2039 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2040 return v
2041 }
2042 return 10 << 20
2043 }
2044
2045 func (pc *persistConn) Read(p []byte) (n int, err error) {
2046 if pc.readLimit <= 0 {
2047 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2048 }
2049 if int64(len(p)) > pc.readLimit {
2050 p = p[:pc.readLimit]
2051 }
2052 n, err = pc.conn.Read(p)
2053 if err == io.EOF {
2054 pc.sawEOF = true
2055 }
2056 pc.readLimit -= int64(n)
2057 return
2058 }
2059
2060
2061 func (pc *persistConn) isBroken() bool {
2062 pc.mu.Lock()
2063 b := pc.closed != nil
2064 pc.mu.Unlock()
2065 return b
2066 }
2067
2068
2069
2070 func (pc *persistConn) canceled() error {
2071 pc.mu.Lock()
2072 defer pc.mu.Unlock()
2073 return pc.canceledErr
2074 }
2075
2076
2077 func (pc *persistConn) isReused() bool {
2078 pc.mu.Lock()
2079 r := pc.reused
2080 pc.mu.Unlock()
2081 return r
2082 }
2083
2084 func (pc *persistConn) cancelRequest(err error) {
2085 pc.mu.Lock()
2086 defer pc.mu.Unlock()
2087 pc.canceledErr = err
2088 pc.closeLocked(errRequestCanceled)
2089 }
2090
2091
2092
2093
2094 func (pc *persistConn) closeConnIfStillIdle() {
2095 t := pc.t
2096 t.idleMu.Lock()
2097 defer t.idleMu.Unlock()
2098 if _, ok := t.idleLRU.m[pc]; !ok {
2099
2100 return
2101 }
2102 t.removeIdleConnLocked(pc)
2103 pc.close(errIdleConnTimeout)
2104 }
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2115 if err == nil {
2116 return nil
2117 }
2118
2119
2120
2121
2122
2123
2124
2125
2126 <-pc.writeLoopDone
2127
2128
2129
2130
2131 if cerr := pc.canceled(); cerr != nil {
2132 return cerr
2133 }
2134
2135
2136 req.mu.Lock()
2137 reqErr := req.err
2138 req.mu.Unlock()
2139 if reqErr != nil {
2140 return reqErr
2141 }
2142
2143 if err == errServerClosedIdle {
2144
2145 return err
2146 }
2147
2148 if _, ok := err.(transportReadFromServerError); ok {
2149 if pc.nwrite == startBytesWritten {
2150 return nothingWrittenError{err}
2151 }
2152
2153 return err
2154 }
2155 if pc.isBroken() {
2156 if pc.nwrite == startBytesWritten {
2157 return nothingWrittenError{err}
2158 }
2159 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2160 }
2161 return err
2162 }
2163
2164
2165
2166
2167 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2168
2169 func (pc *persistConn) readLoop() {
2170 closeErr := errReadLoopExiting
2171 defer func() {
2172 pc.close(closeErr)
2173 pc.t.removeIdleConn(pc)
2174 }()
2175
2176 tryPutIdleConn := func(treq *transportRequest) bool {
2177 trace := treq.trace
2178 if err := pc.t.tryPutIdleConn(pc); err != nil {
2179 closeErr = err
2180 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2181 trace.PutIdleConn(err)
2182 }
2183 return false
2184 }
2185 if trace != nil && trace.PutIdleConn != nil {
2186 trace.PutIdleConn(nil)
2187 }
2188 return true
2189 }
2190
2191
2192
2193
2194 eofc := make(chan struct{})
2195 defer close(eofc)
2196
2197
2198 testHookMu.Lock()
2199 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2200 testHookMu.Unlock()
2201
2202 alive := true
2203 for alive {
2204 pc.readLimit = pc.maxHeaderResponseSize()
2205 _, err := pc.br.Peek(1)
2206
2207 pc.mu.Lock()
2208 if pc.numExpectedResponses == 0 {
2209 pc.readLoopPeekFailLocked(err)
2210 pc.mu.Unlock()
2211 return
2212 }
2213 pc.mu.Unlock()
2214
2215 rc := <-pc.reqch
2216 trace := rc.treq.trace
2217
2218 var resp *Response
2219 if err == nil {
2220 resp, err = pc.readResponse(rc, trace)
2221 } else {
2222 err = transportReadFromServerError{err}
2223 closeErr = err
2224 }
2225
2226 if err != nil {
2227 if pc.readLimit <= 0 {
2228 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2229 }
2230
2231 select {
2232 case rc.ch <- responseAndError{err: err}:
2233 case <-rc.callerGone:
2234 return
2235 }
2236 return
2237 }
2238 pc.readLimit = maxInt64
2239
2240 pc.mu.Lock()
2241 pc.numExpectedResponses--
2242 pc.mu.Unlock()
2243
2244 bodyWritable := resp.bodyIsWritable()
2245 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2246
2247 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2248
2249
2250
2251 alive = false
2252 }
2253
2254 if !hasBody || bodyWritable {
2255
2256
2257
2258
2259
2260 alive = alive &&
2261 !pc.sawEOF &&
2262 pc.wroteRequest() &&
2263 tryPutIdleConn(rc.treq)
2264
2265 if bodyWritable {
2266 closeErr = errCallerOwnsConn
2267 }
2268
2269 select {
2270 case rc.ch <- responseAndError{res: resp}:
2271 case <-rc.callerGone:
2272 return
2273 }
2274
2275 rc.treq.cancel(errRequestDone)
2276
2277
2278
2279
2280 testHookReadLoopBeforeNextRead()
2281 continue
2282 }
2283
2284 waitForBodyRead := make(chan bool, 2)
2285 body := &bodyEOFSignal{
2286 body: resp.Body,
2287 earlyCloseFn: func() error {
2288 waitForBodyRead <- false
2289 <-eofc
2290 return nil
2291
2292 },
2293 fn: func(err error) error {
2294 isEOF := err == io.EOF
2295 waitForBodyRead <- isEOF
2296 if isEOF {
2297 <-eofc
2298 } else if err != nil {
2299 if cerr := pc.canceled(); cerr != nil {
2300 return cerr
2301 }
2302 }
2303 return err
2304 },
2305 }
2306
2307 resp.Body = body
2308 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2309 resp.Body = &gzipReader{body: body}
2310 resp.Header.Del("Content-Encoding")
2311 resp.Header.Del("Content-Length")
2312 resp.ContentLength = -1
2313 resp.Uncompressed = true
2314 }
2315
2316 select {
2317 case rc.ch <- responseAndError{res: resp}:
2318 case <-rc.callerGone:
2319 return
2320 }
2321
2322
2323
2324
2325 select {
2326 case bodyEOF := <-waitForBodyRead:
2327 alive = alive &&
2328 bodyEOF &&
2329 !pc.sawEOF &&
2330 pc.wroteRequest() &&
2331 tryPutIdleConn(rc.treq)
2332 if bodyEOF {
2333 eofc <- struct{}{}
2334 }
2335 case <-rc.treq.ctx.Done():
2336 alive = false
2337 pc.cancelRequest(context.Cause(rc.treq.ctx))
2338 case <-pc.closech:
2339 alive = false
2340 }
2341
2342 rc.treq.cancel(errRequestDone)
2343 testHookReadLoopBeforeNextRead()
2344 }
2345 }
2346
2347 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2348 if pc.closed != nil {
2349 return
2350 }
2351 if n := pc.br.Buffered(); n > 0 {
2352 buf, _ := pc.br.Peek(n)
2353 if is408Message(buf) {
2354 pc.closeLocked(errServerClosedIdle)
2355 return
2356 } else {
2357 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2358 }
2359 }
2360 if peekErr == io.EOF {
2361
2362 pc.closeLocked(errServerClosedIdle)
2363 } else {
2364 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2365 }
2366 }
2367
2368
2369
2370
2371 func is408Message(buf []byte) bool {
2372 if len(buf) < len("HTTP/1.x 408") {
2373 return false
2374 }
2375 if string(buf[:7]) != "HTTP/1." {
2376 return false
2377 }
2378 return string(buf[8:12]) == " 408"
2379 }
2380
2381
2382
2383
2384 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2385 if trace != nil && trace.GotFirstResponseByte != nil {
2386 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2387 trace.GotFirstResponseByte()
2388 }
2389 }
2390 num1xx := 0
2391 const max1xxResponses = 5
2392
2393 continueCh := rc.continueCh
2394 for {
2395 resp, err = ReadResponse(pc.br, rc.treq.Request)
2396 if err != nil {
2397 return
2398 }
2399 resCode := resp.StatusCode
2400 if continueCh != nil && resCode == StatusContinue {
2401 if trace != nil && trace.Got100Continue != nil {
2402 trace.Got100Continue()
2403 }
2404 continueCh <- struct{}{}
2405 continueCh = nil
2406 }
2407 is1xx := 100 <= resCode && resCode <= 199
2408
2409 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2410 if is1xxNonTerminal {
2411 num1xx++
2412 if num1xx > max1xxResponses {
2413 return nil, errors.New("net/http: too many 1xx informational responses")
2414 }
2415 pc.readLimit = pc.maxHeaderResponseSize()
2416 if trace != nil && trace.Got1xxResponse != nil {
2417 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2418 return nil, err
2419 }
2420 }
2421 continue
2422 }
2423 break
2424 }
2425 if resp.isProtocolSwitch() {
2426 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2427 }
2428 if continueCh != nil {
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441 if resp.Close || rc.treq.Request.Close {
2442 close(continueCh)
2443 } else {
2444 continueCh <- struct{}{}
2445 }
2446 }
2447
2448 resp.TLS = pc.tlsState
2449 return
2450 }
2451
2452
2453
2454
2455 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2456 if continueCh == nil {
2457 return nil
2458 }
2459 return func() bool {
2460 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2461 defer timer.Stop()
2462
2463 select {
2464 case _, ok := <-continueCh:
2465 return ok
2466 case <-timer.C:
2467 return true
2468 case <-pc.closech:
2469 return false
2470 }
2471 }
2472 }
2473
2474 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2475 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2476 if br.Buffered() != 0 {
2477 body.br = br
2478 }
2479 return body
2480 }
2481
2482
2483
2484
2485
2486
2487 type readWriteCloserBody struct {
2488 _ incomparable
2489 br *bufio.Reader
2490 io.ReadWriteCloser
2491 }
2492
2493 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2494 if b.br != nil {
2495 if n := b.br.Buffered(); len(p) > n {
2496 p = p[:n]
2497 }
2498 n, err = b.br.Read(p)
2499 if b.br.Buffered() == 0 {
2500 b.br = nil
2501 }
2502 return n, err
2503 }
2504 return b.ReadWriteCloser.Read(p)
2505 }
2506
2507
2508 type nothingWrittenError struct {
2509 error
2510 }
2511
2512 func (nwe nothingWrittenError) Unwrap() error {
2513 return nwe.error
2514 }
2515
2516 func (pc *persistConn) writeLoop() {
2517 defer close(pc.writeLoopDone)
2518 for {
2519 select {
2520 case wr := <-pc.writech:
2521 startBytesWritten := pc.nwrite
2522 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2523 if bre, ok := err.(requestBodyReadError); ok {
2524 err = bre.error
2525
2526
2527
2528
2529
2530
2531
2532 wr.req.setError(err)
2533 }
2534 if err == nil {
2535 err = pc.bw.Flush()
2536 }
2537 if err != nil {
2538 if pc.nwrite == startBytesWritten {
2539 err = nothingWrittenError{err}
2540 }
2541 }
2542 pc.writeErrCh <- err
2543 wr.ch <- err
2544 if err != nil {
2545 pc.close(err)
2546 return
2547 }
2548 case <-pc.closech:
2549 return
2550 }
2551 }
2552 }
2553
2554
2555
2556
2557
2558
2559
2560 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2561
2562
2563
2564 func (pc *persistConn) wroteRequest() bool {
2565 select {
2566 case err := <-pc.writeErrCh:
2567
2568
2569 return err == nil
2570 default:
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2582 defer t.Stop()
2583 select {
2584 case err := <-pc.writeErrCh:
2585 return err == nil
2586 case <-t.C:
2587 return false
2588 }
2589 }
2590 }
2591
2592
2593
2594 type responseAndError struct {
2595 _ incomparable
2596 res *Response
2597 err error
2598 }
2599
2600 type requestAndChan struct {
2601 _ incomparable
2602 treq *transportRequest
2603 ch chan responseAndError
2604
2605
2606
2607
2608 addedGzip bool
2609
2610
2611
2612
2613
2614 continueCh chan<- struct{}
2615
2616 callerGone <-chan struct{}
2617 }
2618
2619
2620
2621
2622
2623 type writeRequest struct {
2624 req *transportRequest
2625 ch chan<- error
2626
2627
2628
2629
2630 continueCh <-chan struct{}
2631 }
2632
2633
2634
2635 type timeoutError struct {
2636 err string
2637 }
2638
2639 func (e *timeoutError) Error() string { return e.err }
2640 func (e *timeoutError) Timeout() bool { return true }
2641 func (e *timeoutError) Temporary() bool { return true }
2642 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2643
2644 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2645
2646
2647
2648 var errRequestCanceled = http2errRequestCanceled
2649 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2650
2651
2652
2653 var errRequestDone = errors.New("net/http: request completed")
2654
2655 func nop() {}
2656
2657
2658 var (
2659 testHookEnterRoundTrip = nop
2660 testHookWaitResLoop = nop
2661 testHookRoundTripRetried = nop
2662 testHookPrePendingDial = nop
2663 testHookPostPendingDial = nop
2664
2665 testHookMu sync.Locker = fakeLocker{}
2666 testHookReadLoopBeforeNextRead = nop
2667 )
2668
2669 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2670 testHookEnterRoundTrip()
2671 pc.mu.Lock()
2672 pc.numExpectedResponses++
2673 headerFn := pc.mutateHeaderFunc
2674 pc.mu.Unlock()
2675
2676 if headerFn != nil {
2677 headerFn(req.extraHeaders())
2678 }
2679
2680
2681
2682
2683
2684 requestedGzip := false
2685 if !pc.t.DisableCompression &&
2686 req.Header.Get("Accept-Encoding") == "" &&
2687 req.Header.Get("Range") == "" &&
2688 req.Method != "HEAD" {
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701 requestedGzip = true
2702 req.extraHeaders().Set("Accept-Encoding", "gzip")
2703 }
2704
2705 var continueCh chan struct{}
2706 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2707 continueCh = make(chan struct{}, 1)
2708 }
2709
2710 if pc.t.DisableKeepAlives &&
2711 !req.wantsClose() &&
2712 !isProtocolSwitchHeader(req.Header) {
2713 req.extraHeaders().Set("Connection", "close")
2714 }
2715
2716 gone := make(chan struct{})
2717 defer close(gone)
2718
2719 const debugRoundTrip = false
2720
2721
2722
2723
2724 startBytesWritten := pc.nwrite
2725 writeErrCh := make(chan error, 1)
2726 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2727
2728 resc := make(chan responseAndError)
2729 pc.reqch <- requestAndChan{
2730 treq: req,
2731 ch: resc,
2732 addedGzip: requestedGzip,
2733 continueCh: continueCh,
2734 callerGone: gone,
2735 }
2736
2737 handleResponse := func(re responseAndError) (*Response, error) {
2738 if (re.res == nil) == (re.err == nil) {
2739 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2740 }
2741 if debugRoundTrip {
2742 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2743 }
2744 if re.err != nil {
2745 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2746 }
2747 return re.res, nil
2748 }
2749
2750 var respHeaderTimer <-chan time.Time
2751 ctxDoneChan := req.ctx.Done()
2752 pcClosed := pc.closech
2753 for {
2754 testHookWaitResLoop()
2755 select {
2756 case err := <-writeErrCh:
2757 if debugRoundTrip {
2758 req.logf("writeErrCh recv: %T/%#v", err, err)
2759 }
2760 if err != nil {
2761 pc.close(fmt.Errorf("write error: %w", err))
2762 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2763 }
2764 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2765 if debugRoundTrip {
2766 req.logf("starting timer for %v", d)
2767 }
2768 timer := time.NewTimer(d)
2769 defer timer.Stop()
2770 respHeaderTimer = timer.C
2771 }
2772 case <-pcClosed:
2773 select {
2774 case re := <-resc:
2775
2776
2777
2778 return handleResponse(re)
2779 default:
2780 }
2781 if debugRoundTrip {
2782 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2783 }
2784 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2785 case <-respHeaderTimer:
2786 if debugRoundTrip {
2787 req.logf("timeout waiting for response headers.")
2788 }
2789 pc.close(errTimeout)
2790 return nil, errTimeout
2791 case re := <-resc:
2792 return handleResponse(re)
2793 case <-ctxDoneChan:
2794 select {
2795 case re := <-resc:
2796
2797
2798
2799 return handleResponse(re)
2800 default:
2801 }
2802 pc.cancelRequest(context.Cause(req.ctx))
2803 }
2804 }
2805 }
2806
2807
2808
2809 type tLogKey struct{}
2810
2811 func (tr *transportRequest) logf(format string, args ...any) {
2812 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2813 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2814 }
2815 }
2816
2817
2818
2819 func (pc *persistConn) markReused() {
2820 pc.mu.Lock()
2821 pc.reused = true
2822 pc.mu.Unlock()
2823 }
2824
2825
2826
2827
2828
2829
2830 func (pc *persistConn) close(err error) {
2831 pc.mu.Lock()
2832 defer pc.mu.Unlock()
2833 pc.closeLocked(err)
2834 }
2835
2836 func (pc *persistConn) closeLocked(err error) {
2837 if err == nil {
2838 panic("nil error")
2839 }
2840 pc.broken = true
2841 if pc.closed == nil {
2842 pc.closed = err
2843 pc.t.decConnsPerHost(pc.cacheKey)
2844
2845
2846 if pc.alt == nil {
2847 if err != errCallerOwnsConn {
2848 pc.conn.Close()
2849 }
2850 close(pc.closech)
2851 }
2852 }
2853 pc.mutateHeaderFunc = nil
2854 }
2855
2856 var portMap = map[string]string{
2857 "http": "80",
2858 "https": "443",
2859 "socks5": "1080",
2860 "socks5h": "1080",
2861 }
2862
2863 func idnaASCIIFromURL(url *url.URL) string {
2864 addr := url.Hostname()
2865 if v, err := idnaASCII(addr); err == nil {
2866 addr = v
2867 }
2868 return addr
2869 }
2870
2871
2872 func canonicalAddr(url *url.URL) string {
2873 port := url.Port()
2874 if port == "" {
2875 port = portMap[url.Scheme]
2876 }
2877 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2878 }
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891 type bodyEOFSignal struct {
2892 body io.ReadCloser
2893 mu sync.Mutex
2894 closed bool
2895 rerr error
2896 fn func(error) error
2897 earlyCloseFn func() error
2898 }
2899
2900 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2901
2902 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2903 es.mu.Lock()
2904 closed, rerr := es.closed, es.rerr
2905 es.mu.Unlock()
2906 if closed {
2907 return 0, errReadOnClosedResBody
2908 }
2909 if rerr != nil {
2910 return 0, rerr
2911 }
2912
2913 n, err = es.body.Read(p)
2914 if err != nil {
2915 es.mu.Lock()
2916 defer es.mu.Unlock()
2917 if es.rerr == nil {
2918 es.rerr = err
2919 }
2920 err = es.condfn(err)
2921 }
2922 return
2923 }
2924
2925 func (es *bodyEOFSignal) Close() error {
2926 es.mu.Lock()
2927 defer es.mu.Unlock()
2928 if es.closed {
2929 return nil
2930 }
2931 es.closed = true
2932 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2933 return es.earlyCloseFn()
2934 }
2935 err := es.body.Close()
2936 return es.condfn(err)
2937 }
2938
2939
2940 func (es *bodyEOFSignal) condfn(err error) error {
2941 if es.fn == nil {
2942 return err
2943 }
2944 err = es.fn(err)
2945 es.fn = nil
2946 return err
2947 }
2948
2949
2950
2951 type gzipReader struct {
2952 _ incomparable
2953 body *bodyEOFSignal
2954 zr *gzip.Reader
2955 zerr error
2956 }
2957
2958 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2959 if gz.zr == nil {
2960 if gz.zerr == nil {
2961 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2962 }
2963 if gz.zerr != nil {
2964 return 0, gz.zerr
2965 }
2966 }
2967
2968 gz.body.mu.Lock()
2969 if gz.body.closed {
2970 err = errReadOnClosedResBody
2971 }
2972 gz.body.mu.Unlock()
2973
2974 if err != nil {
2975 return 0, err
2976 }
2977 return gz.zr.Read(p)
2978 }
2979
2980 func (gz *gzipReader) Close() error {
2981 return gz.body.Close()
2982 }
2983
2984 type tlsHandshakeTimeoutError struct{}
2985
2986 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2987 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2988 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2989
2990
2991
2992
2993 type fakeLocker struct{}
2994
2995 func (fakeLocker) Lock() {}
2996 func (fakeLocker) Unlock() {}
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3012 if cfg == nil {
3013 return &tls.Config{}
3014 }
3015 return cfg.Clone()
3016 }
3017
3018 type connLRU struct {
3019 ll *list.List
3020 m map[*persistConn]*list.Element
3021 }
3022
3023
3024 func (cl *connLRU) add(pc *persistConn) {
3025 if cl.ll == nil {
3026 cl.ll = list.New()
3027 cl.m = make(map[*persistConn]*list.Element)
3028 }
3029 ele := cl.ll.PushFront(pc)
3030 if _, ok := cl.m[pc]; ok {
3031 panic("persistConn was already in LRU")
3032 }
3033 cl.m[pc] = ele
3034 }
3035
3036 func (cl *connLRU) removeOldest() *persistConn {
3037 ele := cl.ll.Back()
3038 pc := ele.Value.(*persistConn)
3039 cl.ll.Remove(ele)
3040 delete(cl.m, pc)
3041 return pc
3042 }
3043
3044
3045 func (cl *connLRU) remove(pc *persistConn) {
3046 if ele, ok := cl.m[pc]; ok {
3047 cl.ll.Remove(ele)
3048 delete(cl.m, pc)
3049 }
3050 }
3051
3052
3053 func (cl *connLRU) len() int {
3054 return len(cl.m)
3055 }
3056
View as plain text