Source file src/net/http/internal/http2/transport.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code.
     6  
     7  package http2
     8  
     9  import (
    10  	"bufio"
    11  	"bytes"
    12  	"compress/flate"
    13  	"compress/gzip"
    14  	"context"
    15  	"crypto/rand"
    16  	"crypto/tls"
    17  	"errors"
    18  	"fmt"
    19  	"io"
    20  	"io/fs"
    21  	"log"
    22  	"math"
    23  	"math/bits"
    24  	mathrand "math/rand"
    25  	"net"
    26  	"net/http/httptrace"
    27  	"net/http/internal"
    28  	"net/http/internal/httpcommon"
    29  	"net/textproto"
    30  	"slices"
    31  	"strconv"
    32  	"strings"
    33  	"sync"
    34  	"sync/atomic"
    35  	"time"
    36  
    37  	"golang.org/x/net/http/httpguts"
    38  	"golang.org/x/net/http2/hpack"
    39  	"golang.org/x/net/idna"
    40  )
    41  
    42  const (
    43  	// transportDefaultConnFlow is how many connection-level flow control
    44  	// tokens we give the server at start-up, past the default 64k.
    45  	transportDefaultConnFlow = 1 << 30
    46  
    47  	// transportDefaultStreamFlow is how many stream-level flow
    48  	// control tokens we announce to the peer, and how many bytes
    49  	// we buffer per stream.
    50  	transportDefaultStreamFlow = 4 << 20
    51  
    52  	defaultUserAgent = "Go-http-client/2.0"
    53  
    54  	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
    55  	// it's received servers initial SETTINGS frame, which corresponds with the
    56  	// spec's minimum recommended value.
    57  	initialMaxConcurrentStreams = 100
    58  
    59  	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
    60  	// if the server doesn't include one in its initial SETTINGS frame.
    61  	defaultMaxConcurrentStreams = 1000
    62  )
    63  
    64  // Transport is an HTTP/2 Transport.
    65  //
    66  // A Transport internally caches connections to servers. It is safe
    67  // for concurrent use by multiple goroutines.
    68  type Transport struct {
    69  	// DialTLSContext specifies an optional dial function with context for
    70  	// creating TLS connections for requests.
    71  	//
    72  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    73  	//
    74  	// If the returned net.Conn has a ConnectionState method like tls.Conn,
    75  	// it will be used to set http.Response.TLS.
    76  	DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
    77  
    78  	// DialTLS specifies an optional dial function for creating
    79  	// TLS connections for requests.
    80  	//
    81  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    82  	//
    83  	// Deprecated: Use DialTLSContext instead, which allows the transport
    84  	// to cancel dials as soon as they are no longer needed.
    85  	// If both are set, DialTLSContext takes priority.
    86  	DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
    87  
    88  	// TLSClientConfig specifies the TLS configuration to use with
    89  	// tls.Client. If nil, the default configuration is used.
    90  	TLSClientConfig *tls.Config
    91  
    92  	// ConnPool optionally specifies an alternate connection pool to use.
    93  	// If nil, the default is used.
    94  	ConnPool ClientConnPool
    95  
    96  	// DisableCompression, if true, prevents the Transport from
    97  	// requesting compression with an "Accept-Encoding: gzip"
    98  	// request header when the Request contains no existing
    99  	// Accept-Encoding value. If the Transport requests gzip on
   100  	// its own and gets a gzipped response, it's transparently
   101  	// decoded in the Response.Body. However, if the user
   102  	// explicitly requested gzip it is not automatically
   103  	// uncompressed.
   104  	DisableCompression bool
   105  
   106  	// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
   107  	// plain-text "http" scheme. Note that this does not enable h2c support.
   108  	AllowHTTP bool
   109  
   110  	// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
   111  	// send in the initial settings frame. It is how many bytes
   112  	// of response headers are allowed. Unlike the http2 spec, zero here
   113  	// means to use a default limit (currently 10MB). If you actually
   114  	// want to advertise an unlimited value to the peer, Transport
   115  	// interprets the highest possible value here (0xffffffff or 1<<32-1)
   116  	// to mean no limit.
   117  	MaxHeaderListSize uint32
   118  
   119  	// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
   120  	// initial settings frame. It is the size in bytes of the largest frame
   121  	// payload that the sender is willing to receive. If 0, no setting is
   122  	// sent, and the value is provided by the peer, which should be 16384
   123  	// according to the spec:
   124  	// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
   125  	// Values are bounded in the range 16k to 16M.
   126  	MaxReadFrameSize uint32
   127  
   128  	// MaxDecoderHeaderTableSize optionally specifies the http2
   129  	// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
   130  	// informs the remote endpoint of the maximum size of the header compression
   131  	// table used to decode header blocks, in octets. If zero, the default value
   132  	// of 4096 is used.
   133  	MaxDecoderHeaderTableSize uint32
   134  
   135  	// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
   136  	// header compression table used for encoding request headers. Received
   137  	// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
   138  	// the default value of 4096 is used.
   139  	MaxEncoderHeaderTableSize uint32
   140  
   141  	// StrictMaxConcurrentStreams controls whether the server's
   142  	// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
   143  	// globally. If false, new TCP connections are created to the
   144  	// server as needed to keep each under the per-connection
   145  	// SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
   146  	// server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
   147  	// a global limit and callers of RoundTrip block when needed,
   148  	// waiting for their turn.
   149  	StrictMaxConcurrentStreams bool
   150  
   151  	// IdleConnTimeout is the maximum amount of time an idle
   152  	// (keep-alive) connection will remain idle before closing
   153  	// itself.
   154  	// Zero means no limit.
   155  	IdleConnTimeout time.Duration
   156  
   157  	// ReadIdleTimeout is the timeout after which a health check using ping
   158  	// frame will be carried out if no frame is received on the connection.
   159  	// Note that a ping response will is considered a received frame, so if
   160  	// there is no other traffic on the connection, the health check will
   161  	// be performed every ReadIdleTimeout interval.
   162  	// If zero, no health check is performed.
   163  	ReadIdleTimeout time.Duration
   164  
   165  	// PingTimeout is the timeout after which the connection will be closed
   166  	// if a response to Ping is not received.
   167  	// Defaults to 15s.
   168  	PingTimeout time.Duration
   169  
   170  	// WriteByteTimeout is the timeout after which the connection will be
   171  	// closed no data can be written to it. The timeout begins when data is
   172  	// available to write, and is extended whenever any bytes are written.
   173  	WriteByteTimeout time.Duration
   174  
   175  	// CountError, if non-nil, is called on HTTP/2 transport errors.
   176  	// It's intended to increment a metric for monitoring, such
   177  	// as an expvar or Prometheus metric.
   178  	// The errType consists of only ASCII word characters.
   179  	CountError func(errType string)
   180  
   181  	t1 TransportConfig
   182  
   183  	connPoolOnce  sync.Once
   184  	connPoolOrDef ClientConnPool // non-nil version of ConnPool
   185  
   186  	*transportTestHooks
   187  }
   188  
   189  // Hook points used for testing.
   190  // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
   191  // Inside tests, see the testSyncHooks function docs.
   192  
   193  type transportTestHooks struct {
   194  	newclientconn func(*ClientConn)
   195  }
   196  
   197  func (t *Transport) maxHeaderListSize() uint32 {
   198  	n := t.t1.MaxResponseHeaderBytes()
   199  	if n > 0 {
   200  		n = adjustHTTP1MaxHeaderSize(n)
   201  	}
   202  	if n <= 0 {
   203  		return 10 << 20
   204  	}
   205  	if n >= 0xffffffff {
   206  		return 0
   207  	}
   208  	return uint32(n)
   209  }
   210  
   211  func (t *Transport) disableCompression() bool {
   212  	return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
   213  }
   214  
   215  func NewTransport(t1 TransportConfig) *Transport {
   216  	connPool := new(clientConnPool)
   217  	t2 := &Transport{
   218  		ConnPool: noDialClientConnPool{connPool},
   219  		t1:       t1,
   220  	}
   221  	connPool.t = t2
   222  	return t2
   223  }
   224  
   225  func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
   226  	connPool, ok := t.ConnPool.(noDialClientConnPool)
   227  	if !ok {
   228  		go c.Close()
   229  		return nil
   230  	}
   231  	addr := authorityAddr(scheme, authority)
   232  	used, err := connPool.addConnIfNeeded(addr, t, c)
   233  	if !used {
   234  		go c.Close()
   235  	}
   236  	return err
   237  }
   238  
   239  // unencryptedTransport is a Transport with a RoundTrip method that
   240  // always permits http:// URLs.
   241  type unencryptedTransport Transport
   242  
   243  func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   244  	return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
   245  }
   246  
   247  func (t *Transport) connPool() ClientConnPool {
   248  	t.connPoolOnce.Do(t.initConnPool)
   249  	return t.connPoolOrDef
   250  }
   251  
   252  func (t *Transport) initConnPool() {
   253  	if t.ConnPool != nil {
   254  		t.connPoolOrDef = t.ConnPool
   255  	} else {
   256  		t.connPoolOrDef = &clientConnPool{t: t}
   257  	}
   258  }
   259  
   260  // ClientConn is the state of a single HTTP/2 client connection to an
   261  // HTTP/2 server.
   262  type ClientConn struct {
   263  	t             *Transport
   264  	tconn         net.Conn             // usually *tls.Conn, except specialized impls
   265  	tlsState      *tls.ConnectionState // nil only for specialized impls
   266  	atomicReused  uint32               // whether conn is being reused; atomic
   267  	singleUse     bool                 // whether being used for a single http.Request
   268  	getConnCalled bool                 // used by clientConnPool
   269  
   270  	// readLoop goroutine fields:
   271  	readerDone chan struct{} // closed on error
   272  	readerErr  error         // set before readerDone is closed
   273  
   274  	idleTimeout time.Duration // or 0 for never
   275  	idleTimer   *time.Timer
   276  
   277  	mu               sync.Mutex // guards following
   278  	cond             *sync.Cond // hold mu; broadcast on flow/closed changes
   279  	flow             outflow    // our conn-level flow control quota (cs.outflow is per stream)
   280  	inflow           inflow     // peer's conn-level flow control
   281  	doNotReuse       bool       // whether conn is marked to not be reused for any future requests
   282  	closing          bool
   283  	closed           bool
   284  	closedOnIdle     bool                     // true if conn was closed for idleness
   285  	seenSettings     bool                     // true if we've seen a settings frame, false otherwise
   286  	seenSettingsChan chan struct{}            // closed when seenSettings is true or frame reading fails
   287  	wantSettingsAck  bool                     // we sent a SETTINGS frame and haven't heard back
   288  	goAway           *GoAwayFrame             // if non-nil, the GoAwayFrame we received
   289  	goAwayDebug      string                   // goAway frame's debug data, retained as a string
   290  	streams          map[uint32]*clientStream // client-initiated
   291  	streamsReserved  int                      // incr by ReserveNewRequest; decr on RoundTrip
   292  	nextStreamID     uint32
   293  	pendingRequests  int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
   294  	pings            map[[8]byte]chan struct{} // in flight ping data to notification channel
   295  	br               *bufio.Reader
   296  	lastActive       time.Time
   297  	lastIdle         time.Time // time last idle
   298  	// Settings from peer: (also guarded by wmu)
   299  	maxFrameSize                uint32
   300  	maxConcurrentStreams        uint32
   301  	peerMaxHeaderListSize       uint64
   302  	peerMaxHeaderTableSize      uint32
   303  	initialWindowSize           uint32
   304  	initialStreamRecvWindowSize int32
   305  	readIdleTimeout             time.Duration
   306  	pingTimeout                 time.Duration
   307  	extendedConnectAllowed      bool
   308  	strictMaxConcurrentStreams  bool
   309  
   310  	// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
   311  	// gRPC strictly limits the number of PING frames that it will receive.
   312  	// The default is two pings per two hours, but the limit resets every time
   313  	// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
   314  	//
   315  	// rstStreamPingsBlocked is set after receiving a response to a PING frame
   316  	// bundled with an RST_STREAM (see pendingResets below), and cleared after
   317  	// receiving a HEADERS or DATA frame.
   318  	rstStreamPingsBlocked bool
   319  
   320  	// pendingResets is the number of RST_STREAM frames we have sent to the peer,
   321  	// without confirming that the peer has received them. When we send a RST_STREAM,
   322  	// we bundle it with a PING frame, unless a PING is already in flight. We count
   323  	// the reset stream against the connection's concurrency limit until we get
   324  	// a PING response. This limits the number of requests we'll try to send to a
   325  	// completely unresponsive connection.
   326  	pendingResets int
   327  
   328  	// readBeforeStreamID is the smallest stream ID that has not been followed by
   329  	// a frame read from the peer. We use this to determine when a request may
   330  	// have been sent to a completely unresponsive connection:
   331  	// If the request ID is less than readBeforeStreamID, then we have had some
   332  	// indication of life on the connection since sending the request.
   333  	readBeforeStreamID uint32
   334  
   335  	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
   336  	// Write to reqHeaderMu to lock it, read from it to unlock.
   337  	// Lock reqmu BEFORE mu or wmu.
   338  	reqHeaderMu chan struct{}
   339  
   340  	// internalStateHook reports state changes back to the net/http.ClientConn.
   341  	// Note that this is different from the user state hook registered by
   342  	// net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
   343  	// which calls the user hook.
   344  	internalStateHook func()
   345  
   346  	// wmu is held while writing.
   347  	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
   348  	// Only acquire both at the same time when changing peer settings.
   349  	wmu  sync.Mutex
   350  	bw   *bufio.Writer
   351  	fr   *Framer
   352  	werr error        // first write error that has occurred
   353  	hbuf bytes.Buffer // HPACK encoder writes into this
   354  	henc *hpack.Encoder
   355  }
   356  
   357  // clientStream is the state for a single HTTP/2 stream. One of these
   358  // is created for each Transport.RoundTrip call.
   359  type clientStream struct {
   360  	cc *ClientConn
   361  
   362  	// Fields of Request that we may access even after the response body is closed.
   363  	ctx       context.Context
   364  	reqCancel <-chan struct{}
   365  
   366  	trace         *httptrace.ClientTrace // or nil
   367  	ID            uint32
   368  	bufPipe       pipe // buffered pipe with the flow-controlled response payload
   369  	requestedGzip bool
   370  	isHead        bool
   371  
   372  	abortOnce sync.Once
   373  	abort     chan struct{} // closed to signal stream should end immediately
   374  	abortErr  error         // set if abort is closed
   375  
   376  	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
   377  	donec      chan struct{} // closed after the stream is in the closed state
   378  	on100      chan struct{} // buffered; written to if a 100 is received
   379  
   380  	respHeaderRecv chan struct{}   // closed when headers are received
   381  	res            *ClientResponse // set if respHeaderRecv is closed
   382  
   383  	flow        outflow // guarded by cc.mu
   384  	inflow      inflow  // guarded by cc.mu
   385  	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
   386  	readErr     error   // sticky read error; owned by transportResponseBody.Read
   387  
   388  	reqBody              io.ReadCloser
   389  	reqBodyContentLength int64         // -1 means unknown
   390  	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
   391  
   392  	// owned by writeRequest:
   393  	sentEndStream bool // sent an END_STREAM flag to the peer
   394  	sentHeaders   bool
   395  
   396  	// owned by clientConnReadLoop:
   397  	firstByte       bool  // got the first response byte
   398  	pastHeaders     bool  // got first MetaHeadersFrame (actual headers)
   399  	pastTrailers    bool  // got optional second MetaHeadersFrame (trailers)
   400  	readClosed      bool  // peer sent an END_STREAM flag
   401  	readAborted     bool  // read loop reset the stream
   402  	totalHeaderSize int64 // total size of 1xx headers seen
   403  
   404  	trailer    Header  // accumulated trailers
   405  	resTrailer *Header // client's Response.Trailer
   406  
   407  	staticResp ClientResponse
   408  }
   409  
   410  var got1xxFuncForTests func(int, textproto.MIMEHeader) error
   411  
   412  // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
   413  // if any. It returns nil if not set or if the Go version is too old.
   414  func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
   415  	if fn := got1xxFuncForTests; fn != nil {
   416  		return fn
   417  	}
   418  	return traceGot1xxResponseFunc(cs.trace)
   419  }
   420  
   421  func (cs *clientStream) abortStream(err error) {
   422  	cs.cc.mu.Lock()
   423  	defer cs.cc.mu.Unlock()
   424  	cs.abortStreamLocked(err)
   425  }
   426  
   427  func (cs *clientStream) abortStreamLocked(err error) {
   428  	cs.abortOnce.Do(func() {
   429  		cs.abortErr = err
   430  		close(cs.abort)
   431  	})
   432  	if cs.reqBody != nil {
   433  		cs.closeReqBodyLocked()
   434  	}
   435  	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
   436  	if cs.cc.cond != nil {
   437  		// Wake up writeRequestBody if it is waiting on flow control.
   438  		cs.cc.cond.Broadcast()
   439  	}
   440  }
   441  
   442  func (cs *clientStream) abortRequestBodyWrite() {
   443  	cc := cs.cc
   444  	cc.mu.Lock()
   445  	defer cc.mu.Unlock()
   446  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
   447  		cs.closeReqBodyLocked()
   448  		cc.cond.Broadcast()
   449  	}
   450  }
   451  
   452  func (cs *clientStream) closeReqBodyLocked() {
   453  	if cs.reqBodyClosed != nil {
   454  		return
   455  	}
   456  	cs.reqBodyClosed = make(chan struct{})
   457  	reqBodyClosed := cs.reqBodyClosed
   458  	go func() {
   459  		cs.reqBody.Close()
   460  		close(reqBodyClosed)
   461  	}()
   462  }
   463  
   464  type stickyErrWriter struct {
   465  	conn    net.Conn
   466  	timeout time.Duration
   467  	err     *error
   468  }
   469  
   470  func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
   471  	if *sew.err != nil {
   472  		return 0, *sew.err
   473  	}
   474  	n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
   475  	*sew.err = err
   476  	return n, err
   477  }
   478  
   479  // noCachedConnError is the concrete type of ErrNoCachedConn, which
   480  // needs to be detected by net/http regardless of whether it's its
   481  // bundled version (in h2_bundle.go with a rewritten type name) or
   482  // from a user's x/net/http2. As such, as it has a unique method name
   483  // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
   484  // isNoCachedConnError.
   485  type noCachedConnError struct{}
   486  
   487  func (noCachedConnError) IsHTTP2NoCachedConnError() {}
   488  func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
   489  
   490  // isNoCachedConnError reports whether err is of type noCachedConnError
   491  // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
   492  // may coexist in the same running program.
   493  func isNoCachedConnError(err error) bool {
   494  	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
   495  	return ok
   496  }
   497  
   498  var ErrNoCachedConn error = noCachedConnError{}
   499  
   500  // RoundTripOpt are options for the Transport.RoundTripOpt method.
   501  type RoundTripOpt struct {
   502  	// OnlyCachedConn controls whether RoundTripOpt may
   503  	// create a new TCP connection. If set true and
   504  	// no cached connection is available, RoundTripOpt
   505  	// will return ErrNoCachedConn.
   506  	OnlyCachedConn bool
   507  
   508  	allowHTTP bool // allow http:// URLs
   509  }
   510  
   511  func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   512  	return t.RoundTripOpt(req, RoundTripOpt{})
   513  }
   514  
   515  // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
   516  // and returns a host:port. The port 443 is added if needed.
   517  func authorityAddr(scheme string, authority string) (addr string) {
   518  	host, port, err := net.SplitHostPort(authority)
   519  	if err != nil { // authority didn't have a port
   520  		host = authority
   521  		port = ""
   522  	}
   523  	if port == "" { // authority's port was empty
   524  		port = "443"
   525  		if scheme == "http" {
   526  			port = "80"
   527  		}
   528  	}
   529  	if a, err := idna.ToASCII(host); err == nil {
   530  		host = a
   531  	}
   532  	// IPv6 address literal, without a port:
   533  	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
   534  		return host + ":" + port
   535  	}
   536  	return net.JoinHostPort(host, port)
   537  }
   538  
   539  // RoundTripOpt is like RoundTrip, but takes options.
   540  func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
   541  	switch req.URL.Scheme {
   542  	case "https":
   543  		// Always okay.
   544  	case "http":
   545  		if !t.AllowHTTP && !opt.allowHTTP {
   546  			return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
   547  		}
   548  	default:
   549  		return nil, errors.New("http2: unsupported scheme")
   550  	}
   551  
   552  	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
   553  	for retry := 0; ; retry++ {
   554  		cc, err := t.connPool().GetClientConn(req, addr)
   555  		if err != nil {
   556  			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
   557  			return nil, err
   558  		}
   559  		reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
   560  		traceGotConn(req, cc, reused)
   561  		res, err := cc.RoundTrip(req)
   562  		if err != nil && retry <= 6 {
   563  			roundTripErr := err
   564  			if req, err = shouldRetryRequest(req, err); err == nil {
   565  				// After the first retry, do exponential backoff with 10% jitter.
   566  				if retry == 0 {
   567  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   568  					continue
   569  				}
   570  				backoff := float64(uint(1) << (uint(retry) - 1))
   571  				backoff += backoff * (0.1 * mathrand.Float64())
   572  				d := time.Second * time.Duration(backoff)
   573  				tm := time.NewTimer(d)
   574  				select {
   575  				case <-tm.C:
   576  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   577  					continue
   578  				case <-req.Context.Done():
   579  					tm.Stop()
   580  					err = req.Context.Err()
   581  				}
   582  			}
   583  		}
   584  		if err == errClientConnNotEstablished {
   585  			// This ClientConn was created recently,
   586  			// this is the first request to use it,
   587  			// and the connection is closed and not usable.
   588  			//
   589  			// In this state, cc.idleTimer will remove the conn from the pool
   590  			// when it fires. Stop the timer and remove it here so future requests
   591  			// won't try to use this connection.
   592  			//
   593  			// If the timer has already fired and we're racing it, the redundant
   594  			// call to MarkDead is harmless.
   595  			if cc.idleTimer != nil {
   596  				cc.idleTimer.Stop()
   597  			}
   598  			t.connPool().MarkDead(cc)
   599  		}
   600  		if err != nil {
   601  			t.vlogf("RoundTrip failure: %v", err)
   602  			return nil, err
   603  		}
   604  		return res, nil
   605  	}
   606  }
   607  
   608  func (t *Transport) IdleConnStrsForTesting() []string {
   609  	pool, ok := t.connPool().(noDialClientConnPool)
   610  	if !ok {
   611  		return nil
   612  	}
   613  
   614  	var ret []string
   615  	pool.mu.Lock()
   616  	defer pool.mu.Unlock()
   617  	for k, ccs := range pool.conns {
   618  		for _, cc := range ccs {
   619  			if cc.idleState().canTakeNewRequest {
   620  				ret = append(ret, k)
   621  			}
   622  		}
   623  	}
   624  	slices.Sort(ret)
   625  	return ret
   626  }
   627  
   628  // CloseIdleConnections closes any connections which were previously
   629  // connected from previous requests but are now sitting idle.
   630  // It does not interrupt any connections currently in use.
   631  func (t *Transport) CloseIdleConnections() {
   632  	if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
   633  		cp.closeIdleConnections()
   634  	}
   635  }
   636  
   637  var (
   638  	errClientConnClosed         = errors.New("http2: client conn is closed")
   639  	errClientConnUnusable       = errors.New("http2: client conn not usable")
   640  	errClientConnNotEstablished = errors.New("http2: client conn could not be established")
   641  	errClientConnGotGoAway      = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
   642  	errClientConnForceClosed    = errors.New("http2: client connection force closed via ClientConn.Close")
   643  )
   644  
   645  // shouldRetryRequest is called by RoundTrip when a request fails to get
   646  // response headers. It is always called with a non-nil error.
   647  // It returns either a request to retry (either the same request, or a
   648  // modified clone), or an error if the request can't be replayed.
   649  func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
   650  	if !canRetryError(err) {
   651  		return nil, err
   652  	}
   653  	// If the Body is nil (or http.NoBody), it's safe to reuse
   654  	// this request and its Body.
   655  	if req.Body == nil || req.Body == NoBody {
   656  		return req, nil
   657  	}
   658  
   659  	// If the request body can be reset back to its original
   660  	// state via the optional req.GetBody, do that.
   661  	if req.GetBody != nil {
   662  		body, err := req.GetBody()
   663  		if err != nil {
   664  			return nil, err
   665  		}
   666  		newReq := req.Clone()
   667  		newReq.Body = body
   668  		return newReq, nil
   669  	}
   670  
   671  	// The Request.Body can't reset back to the beginning, but we
   672  	// don't seem to have started to read from it yet, so reuse
   673  	// the request directly.
   674  	if err == errClientConnUnusable {
   675  		return req, nil
   676  	}
   677  
   678  	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
   679  }
   680  
   681  func canRetryError(err error) bool {
   682  	if err == errClientConnUnusable || err == errClientConnGotGoAway {
   683  		return true
   684  	}
   685  	if se, ok := err.(StreamError); ok {
   686  		return se.Code == ErrCodeRefusedStream
   687  	}
   688  	return false
   689  }
   690  
   691  func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
   692  	if t.transportTestHooks != nil {
   693  		return t.newClientConn(nil, singleUse, nil)
   694  	}
   695  	host, _, err := net.SplitHostPort(addr)
   696  	if err != nil {
   697  		return nil, err
   698  	}
   699  	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
   700  	if err != nil {
   701  		return nil, err
   702  	}
   703  	return t.newClientConn(tconn, singleUse, nil)
   704  }
   705  
   706  func (t *Transport) newTLSConfig(host string) *tls.Config {
   707  	cfg := new(tls.Config)
   708  	if t.TLSClientConfig != nil {
   709  		*cfg = *t.TLSClientConfig.Clone()
   710  	}
   711  	if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
   712  		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
   713  	}
   714  	if cfg.ServerName == "" {
   715  		cfg.ServerName = host
   716  	}
   717  	return cfg
   718  }
   719  
   720  func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
   721  	if t.DialTLSContext != nil {
   722  		return t.DialTLSContext(ctx, network, addr, tlsCfg)
   723  	} else if t.DialTLS != nil {
   724  		return t.DialTLS(network, addr, tlsCfg)
   725  	}
   726  
   727  	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
   728  	if err != nil {
   729  		return nil, err
   730  	}
   731  	state := tlsCn.ConnectionState()
   732  	if p := state.NegotiatedProtocol; p != NextProtoTLS {
   733  		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
   734  	}
   735  	if !state.NegotiatedProtocolIsMutual {
   736  		return nil, errors.New("http2: could not negotiate protocol mutually")
   737  	}
   738  	return tlsCn, nil
   739  }
   740  
   741  // disableKeepAlives reports whether connections should be closed as
   742  // soon as possible after handling the first request.
   743  func (t *Transport) disableKeepAlives() bool {
   744  	return t.t1 != nil && t.t1.DisableKeepAlives()
   745  }
   746  
   747  func (t *Transport) expectContinueTimeout() time.Duration {
   748  	if t.t1 == nil {
   749  		return 0
   750  	}
   751  	return t.t1.ExpectContinueTimeout()
   752  }
   753  
   754  func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
   755  	cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
   756  	if err != nil {
   757  		return NetHTTPClientConn{}, err
   758  	}
   759  
   760  	// RoundTrip should block when the conn is at its concurrency limit,
   761  	// not return an error. Setting strictMaxConcurrentStreams enables this.
   762  	cc.strictMaxConcurrentStreams = true
   763  
   764  	return NetHTTPClientConn{cc}, nil
   765  }
   766  
   767  func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
   768  	conf := configFromTransport(t)
   769  	cc := &ClientConn{
   770  		t:                           t,
   771  		tconn:                       c,
   772  		readerDone:                  make(chan struct{}),
   773  		nextStreamID:                1,
   774  		maxFrameSize:                16 << 10, // spec default
   775  		initialWindowSize:           65535,    // spec default
   776  		initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
   777  		maxConcurrentStreams:        initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
   778  		strictMaxConcurrentStreams:  conf.StrictMaxConcurrentRequests,
   779  		peerMaxHeaderListSize:       0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
   780  		streams:                     make(map[uint32]*clientStream),
   781  		singleUse:                   singleUse,
   782  		seenSettingsChan:            make(chan struct{}),
   783  		wantSettingsAck:             true,
   784  		readIdleTimeout:             conf.SendPingTimeout,
   785  		pingTimeout:                 conf.PingTimeout,
   786  		pings:                       make(map[[8]byte]chan struct{}),
   787  		reqHeaderMu:                 make(chan struct{}, 1),
   788  		lastActive:                  time.Now(),
   789  		internalStateHook:           internalStateHook,
   790  	}
   791  	if t.transportTestHooks != nil {
   792  		t.transportTestHooks.newclientconn(cc)
   793  		c = cc.tconn
   794  	}
   795  	if VerboseLogs {
   796  		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
   797  	}
   798  
   799  	cc.cond = sync.NewCond(&cc.mu)
   800  	cc.flow.add(int32(initialWindowSize))
   801  
   802  	// TODO: adjust this writer size to account for frame size +
   803  	// MTU + crypto/tls record padding.
   804  	cc.bw = bufio.NewWriter(stickyErrWriter{
   805  		conn:    c,
   806  		timeout: conf.WriteByteTimeout,
   807  		err:     &cc.werr,
   808  	})
   809  	cc.br = bufio.NewReader(c)
   810  	cc.fr = NewFramer(cc.bw, cc.br)
   811  	cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
   812  	if t.CountError != nil {
   813  		cc.fr.countError = t.CountError
   814  	}
   815  	maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
   816  	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
   817  	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
   818  
   819  	cc.henc = hpack.NewEncoder(&cc.hbuf)
   820  	cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
   821  	cc.peerMaxHeaderTableSize = initialHeaderTableSize
   822  
   823  	if cs, ok := c.(connectionStater); ok {
   824  		state := cs.ConnectionState()
   825  		cc.tlsState = &state
   826  	}
   827  
   828  	initialSettings := []Setting{
   829  		{ID: SettingEnablePush, Val: 0},
   830  		{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
   831  	}
   832  	initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
   833  	if max := t.maxHeaderListSize(); max != 0 {
   834  		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
   835  	}
   836  	if maxHeaderTableSize != initialHeaderTableSize {
   837  		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
   838  	}
   839  
   840  	cc.bw.Write(clientPreface)
   841  	cc.fr.WriteSettings(initialSettings...)
   842  	cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
   843  	cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
   844  	cc.bw.Flush()
   845  	if cc.werr != nil {
   846  		cc.Close()
   847  		return nil, cc.werr
   848  	}
   849  
   850  	// Start the idle timer after the connection is fully initialized.
   851  	if d := t.idleConnTimeout(); d != 0 {
   852  		cc.idleTimeout = d
   853  		cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
   854  	}
   855  
   856  	go cc.readLoop()
   857  	return cc, nil
   858  }
   859  
   860  func (cc *ClientConn) healthCheck() {
   861  	pingTimeout := cc.pingTimeout
   862  	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
   863  	// trigger the healthCheck again if there is no frame received.
   864  	ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
   865  	defer cancel()
   866  	cc.vlogf("http2: Transport sending health check")
   867  	err := cc.Ping(ctx)
   868  	if err != nil {
   869  		cc.vlogf("http2: Transport health check failure: %v", err)
   870  		cc.closeForLostPing()
   871  	} else {
   872  		cc.vlogf("http2: Transport health check success")
   873  	}
   874  }
   875  
   876  // SetDoNotReuse marks cc as not reusable for future HTTP requests.
   877  func (cc *ClientConn) SetDoNotReuse() {
   878  	cc.mu.Lock()
   879  	defer cc.mu.Unlock()
   880  	cc.doNotReuse = true
   881  }
   882  
   883  func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
   884  	cc.mu.Lock()
   885  	defer cc.mu.Unlock()
   886  
   887  	old := cc.goAway
   888  	cc.goAway = f
   889  
   890  	// Merge the previous and current GoAway error frames.
   891  	if cc.goAwayDebug == "" {
   892  		cc.goAwayDebug = string(f.DebugData())
   893  	}
   894  	if old != nil && old.ErrCode != ErrCodeNo {
   895  		cc.goAway.ErrCode = old.ErrCode
   896  	}
   897  	last := f.LastStreamID
   898  	for streamID, cs := range cc.streams {
   899  		if streamID <= last {
   900  			// The server's GOAWAY indicates that it received this stream.
   901  			// It will either finish processing it, or close the connection
   902  			// without doing so. Either way, leave the stream alone for now.
   903  			continue
   904  		}
   905  		if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
   906  			// Don't retry the first stream on a connection if we get a non-NO error.
   907  			// If the server is sending an error on a new connection,
   908  			// retrying the request on a new one probably isn't going to work.
   909  			cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
   910  		} else {
   911  			// Aborting the stream with errClentConnGotGoAway indicates that
   912  			// the request should be retried on a new connection.
   913  			cs.abortStreamLocked(errClientConnGotGoAway)
   914  		}
   915  	}
   916  }
   917  
   918  // CanTakeNewRequest reports whether the connection can take a new request,
   919  // meaning it has not been closed or received or sent a GOAWAY.
   920  //
   921  // If the caller is going to immediately make a new request on this
   922  // connection, use ReserveNewRequest instead.
   923  func (cc *ClientConn) CanTakeNewRequest() bool {
   924  	cc.mu.Lock()
   925  	defer cc.mu.Unlock()
   926  	return cc.canTakeNewRequestLocked()
   927  }
   928  
   929  // ReserveNewRequest is like CanTakeNewRequest but also reserves a
   930  // concurrent stream in cc. The reservation is decremented on the
   931  // next call to RoundTrip.
   932  func (cc *ClientConn) ReserveNewRequest() bool {
   933  	cc.mu.Lock()
   934  	defer cc.mu.Unlock()
   935  	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
   936  		return false
   937  	}
   938  	cc.streamsReserved++
   939  	return true
   940  }
   941  
   942  // ClientConnState describes the state of a ClientConn.
   943  type ClientConnState struct {
   944  	// Closed is whether the connection is closed.
   945  	Closed bool
   946  
   947  	// Closing is whether the connection is in the process of
   948  	// closing. It may be closing due to shutdown, being a
   949  	// single-use connection, being marked as DoNotReuse, or
   950  	// having received a GOAWAY frame.
   951  	Closing bool
   952  
   953  	// StreamsActive is how many streams are active.
   954  	StreamsActive int
   955  
   956  	// StreamsReserved is how many streams have been reserved via
   957  	// ClientConn.ReserveNewRequest.
   958  	StreamsReserved int
   959  
   960  	// StreamsPending is how many requests have been sent in excess
   961  	// of the peer's advertised MaxConcurrentStreams setting and
   962  	// are waiting for other streams to complete.
   963  	StreamsPending int
   964  
   965  	// MaxConcurrentStreams is how many concurrent streams the
   966  	// peer advertised as acceptable. Zero means no SETTINGS
   967  	// frame has been received yet.
   968  	MaxConcurrentStreams uint32
   969  
   970  	// LastIdle, if non-zero, is when the connection last
   971  	// transitioned to idle state.
   972  	LastIdle time.Time
   973  }
   974  
   975  // State returns a snapshot of cc's state.
   976  func (cc *ClientConn) State() ClientConnState {
   977  	cc.wmu.Lock()
   978  	maxConcurrent := cc.maxConcurrentStreams
   979  	if !cc.seenSettings {
   980  		maxConcurrent = 0
   981  	}
   982  	cc.wmu.Unlock()
   983  
   984  	cc.mu.Lock()
   985  	defer cc.mu.Unlock()
   986  	return ClientConnState{
   987  		Closed:               cc.closed,
   988  		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
   989  		StreamsActive:        len(cc.streams) + cc.pendingResets,
   990  		StreamsReserved:      cc.streamsReserved,
   991  		StreamsPending:       cc.pendingRequests,
   992  		LastIdle:             cc.lastIdle,
   993  		MaxConcurrentStreams: maxConcurrent,
   994  	}
   995  }
   996  
   997  // clientConnIdleState describes the suitability of a client
   998  // connection to initiate a new RoundTrip request.
   999  type clientConnIdleState struct {
  1000  	canTakeNewRequest bool
  1001  }
  1002  
  1003  func (cc *ClientConn) idleState() clientConnIdleState {
  1004  	cc.mu.Lock()
  1005  	defer cc.mu.Unlock()
  1006  	return cc.idleStateLocked()
  1007  }
  1008  
  1009  func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
  1010  	if cc.singleUse && cc.nextStreamID > 1 {
  1011  		return
  1012  	}
  1013  	var maxConcurrentOkay bool
  1014  	if cc.strictMaxConcurrentStreams {
  1015  		// We'll tell the caller we can take a new request to
  1016  		// prevent the caller from dialing a new TCP
  1017  		// connection, but then we'll block later before
  1018  		// writing it.
  1019  		maxConcurrentOkay = true
  1020  	} else {
  1021  		// We can take a new request if the total of
  1022  		//   - active streams;
  1023  		//   - reservation slots for new streams; and
  1024  		//   - streams for which we have sent a RST_STREAM and a PING,
  1025  		//     but received no subsequent frame
  1026  		// is less than the concurrency limit.
  1027  		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
  1028  	}
  1029  
  1030  	st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
  1031  
  1032  	// If this connection has never been used for a request and is closed,
  1033  	// then let it take a request (which will fail).
  1034  	// If the conn was closed for idleness, we're racing the idle timer;
  1035  	// don't try to use the conn. (Issue #70515.)
  1036  	//
  1037  	// This avoids a situation where an error early in a connection's lifetime
  1038  	// goes unreported.
  1039  	if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
  1040  		st.canTakeNewRequest = true
  1041  	}
  1042  
  1043  	return
  1044  }
  1045  
  1046  func (cc *ClientConn) isUsableLocked() bool {
  1047  	return cc.goAway == nil &&
  1048  		!cc.closed &&
  1049  		!cc.closing &&
  1050  		!cc.doNotReuse &&
  1051  		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
  1052  		!cc.tooIdleLocked()
  1053  }
  1054  
  1055  // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
  1056  //
  1057  // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
  1058  // We only permit reservations up to the conn's concurrency limit.
  1059  // This differs from ClientConn.ReserveNewRequest, which permits reservations
  1060  // past the limit when StrictMaxConcurrentStreams is set.
  1061  func (cc *ClientConn) canReserveLocked() bool {
  1062  	if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
  1063  		return false
  1064  	}
  1065  	if !cc.isUsableLocked() {
  1066  		return false
  1067  	}
  1068  	return true
  1069  }
  1070  
  1071  // currentRequestCountLocked reports the number of concurrency slots currently in use,
  1072  // including active streams, reserved slots, and reset streams waiting for acknowledgement.
  1073  func (cc *ClientConn) currentRequestCountLocked() int {
  1074  	return len(cc.streams) + cc.streamsReserved + cc.pendingResets
  1075  }
  1076  
  1077  func (cc *ClientConn) canTakeNewRequestLocked() bool {
  1078  	st := cc.idleStateLocked()
  1079  	return st.canTakeNewRequest
  1080  }
  1081  
  1082  // availableLocked reports the number of concurrency slots available.
  1083  func (cc *ClientConn) availableLocked() int {
  1084  	if !cc.canTakeNewRequestLocked() {
  1085  		return 0
  1086  	}
  1087  	return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
  1088  }
  1089  
  1090  // tooIdleLocked reports whether this connection has been been sitting idle
  1091  // for too much wall time.
  1092  func (cc *ClientConn) tooIdleLocked() bool {
  1093  	// The Round(0) strips the monontonic clock reading so the
  1094  	// times are compared based on their wall time. We don't want
  1095  	// to reuse a connection that's been sitting idle during
  1096  	// VM/laptop suspend if monotonic time was also frozen.
  1097  	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
  1098  }
  1099  
  1100  // onIdleTimeout is called from a time.AfterFunc goroutine. It will
  1101  // only be called when we're idle, but because we're coming from a new
  1102  // goroutine, there could be a new request coming in at the same time,
  1103  // so this simply calls the synchronized closeIfIdle to shut down this
  1104  // connection. The timer could just call closeIfIdle, but this is more
  1105  // clear.
  1106  func (cc *ClientConn) onIdleTimeout() {
  1107  	cc.closeIfIdle()
  1108  }
  1109  
  1110  func (cc *ClientConn) closeConn() {
  1111  	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
  1112  	defer t.Stop()
  1113  	cc.tconn.Close()
  1114  	cc.maybeCallStateHook()
  1115  }
  1116  
  1117  // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
  1118  // Try to shut it down more aggressively.
  1119  func (cc *ClientConn) forceCloseConn() {
  1120  	tc, ok := cc.tconn.(*tls.Conn)
  1121  	if !ok {
  1122  		return
  1123  	}
  1124  	if nc := tc.NetConn(); nc != nil {
  1125  		nc.Close()
  1126  	}
  1127  }
  1128  
  1129  func (cc *ClientConn) closeIfIdle() {
  1130  	cc.mu.Lock()
  1131  	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
  1132  		cc.mu.Unlock()
  1133  		return
  1134  	}
  1135  	cc.closed = true
  1136  	cc.closedOnIdle = true
  1137  	nextID := cc.nextStreamID
  1138  	// TODO: do clients send GOAWAY too? maybe? Just Close:
  1139  	cc.mu.Unlock()
  1140  
  1141  	if VerboseLogs {
  1142  		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
  1143  	}
  1144  	cc.closeConn()
  1145  }
  1146  
  1147  func (cc *ClientConn) isDoNotReuseAndIdle() bool {
  1148  	cc.mu.Lock()
  1149  	defer cc.mu.Unlock()
  1150  	return cc.doNotReuse && len(cc.streams) == 0
  1151  }
  1152  
  1153  var shutdownEnterWaitStateHook = func() {}
  1154  
  1155  // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1156  func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1157  	if err := cc.sendGoAway(); err != nil {
  1158  		return err
  1159  	}
  1160  	// Wait for all in-flight streams to complete or connection to close
  1161  	done := make(chan struct{})
  1162  	cancelled := false // guarded by cc.mu
  1163  	go func() {
  1164  		cc.mu.Lock()
  1165  		defer cc.mu.Unlock()
  1166  		for {
  1167  			if len(cc.streams) == 0 || cc.closed {
  1168  				cc.closed = true
  1169  				close(done)
  1170  				break
  1171  			}
  1172  			if cancelled {
  1173  				break
  1174  			}
  1175  			cc.cond.Wait()
  1176  		}
  1177  	}()
  1178  	shutdownEnterWaitStateHook()
  1179  	select {
  1180  	case <-done:
  1181  		cc.closeConn()
  1182  		return nil
  1183  	case <-ctx.Done():
  1184  		cc.mu.Lock()
  1185  		// Free the goroutine above
  1186  		cancelled = true
  1187  		cc.cond.Broadcast()
  1188  		cc.mu.Unlock()
  1189  		return ctx.Err()
  1190  	}
  1191  }
  1192  
  1193  func (cc *ClientConn) sendGoAway() error {
  1194  	cc.mu.Lock()
  1195  	closing := cc.closing
  1196  	cc.closing = true
  1197  	maxStreamID := cc.nextStreamID
  1198  	cc.mu.Unlock()
  1199  	if closing {
  1200  		// GOAWAY sent already
  1201  		return nil
  1202  	}
  1203  
  1204  	cc.wmu.Lock()
  1205  	defer cc.wmu.Unlock()
  1206  	// Send a graceful shutdown frame to server
  1207  	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1208  		return err
  1209  	}
  1210  	if err := cc.bw.Flush(); err != nil {
  1211  		return err
  1212  	}
  1213  	// Prevent new requests
  1214  	return nil
  1215  }
  1216  
  1217  // closes the client connection immediately. In-flight requests are interrupted.
  1218  // err is sent to streams.
  1219  func (cc *ClientConn) closeForError(err error) {
  1220  	cc.mu.Lock()
  1221  	cc.closed = true
  1222  	for _, cs := range cc.streams {
  1223  		cs.abortStreamLocked(err)
  1224  	}
  1225  	cc.cond.Broadcast()
  1226  	cc.mu.Unlock()
  1227  	cc.closeConn()
  1228  }
  1229  
  1230  // Close closes the client connection immediately.
  1231  //
  1232  // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1233  func (cc *ClientConn) Close() error {
  1234  	cc.closeForError(errClientConnForceClosed)
  1235  	return nil
  1236  }
  1237  
  1238  // closes the client connection immediately. In-flight requests are interrupted.
  1239  func (cc *ClientConn) closeForLostPing() {
  1240  	err := errors.New("http2: client connection lost")
  1241  	if f := cc.t.CountError; f != nil {
  1242  		f("conn_close_lost_ping")
  1243  	}
  1244  	cc.closeForError(err)
  1245  }
  1246  
  1247  // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1248  // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1249  var errRequestCanceled = internal.ErrRequestCanceled
  1250  
  1251  func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1252  	if cc.t.t1 != nil {
  1253  		return cc.t.t1.ResponseHeaderTimeout()
  1254  	}
  1255  	// No way to do this (yet?) with just an http2.Transport. Probably
  1256  	// no need. Request.Cancel this is the new way. We only need to support
  1257  	// this for compatibility with the old http.Transport fields when
  1258  	// we're doing transparent http2.
  1259  	return 0
  1260  }
  1261  
  1262  // actualContentLength returns a sanitized version of
  1263  // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1264  // means unknown.
  1265  func actualContentLength(req *ClientRequest) int64 {
  1266  	if req.Body == nil || req.Body == NoBody {
  1267  		return 0
  1268  	}
  1269  	if req.ContentLength != 0 {
  1270  		return req.ContentLength
  1271  	}
  1272  	return -1
  1273  }
  1274  
  1275  func (cc *ClientConn) decrStreamReservations() {
  1276  	cc.mu.Lock()
  1277  	defer cc.mu.Unlock()
  1278  	cc.decrStreamReservationsLocked()
  1279  }
  1280  
  1281  func (cc *ClientConn) decrStreamReservationsLocked() {
  1282  	if cc.streamsReserved > 0 {
  1283  		cc.streamsReserved--
  1284  	}
  1285  }
  1286  
  1287  func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  1288  	return cc.roundTrip(req, nil)
  1289  }
  1290  
  1291  func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
  1292  	ctx := req.Context
  1293  	req.stream = clientStream{
  1294  		cc:                   cc,
  1295  		ctx:                  ctx,
  1296  		reqCancel:            req.Cancel,
  1297  		isHead:               req.Method == "HEAD",
  1298  		reqBody:              req.Body,
  1299  		reqBodyContentLength: actualContentLength(req),
  1300  		trace:                httptrace.ContextClientTrace(ctx),
  1301  		peerClosed:           make(chan struct{}),
  1302  		abort:                make(chan struct{}),
  1303  		respHeaderRecv:       make(chan struct{}),
  1304  		donec:                make(chan struct{}),
  1305  		resTrailer:           req.ResTrailer,
  1306  	}
  1307  	cs := &req.stream
  1308  
  1309  	cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
  1310  
  1311  	go cs.doRequest(req, streamf)
  1312  
  1313  	waitDone := func() error {
  1314  		select {
  1315  		case <-cs.donec:
  1316  			return nil
  1317  		case <-ctx.Done():
  1318  			return ctx.Err()
  1319  		case <-cs.reqCancel:
  1320  			return errRequestCanceled
  1321  		}
  1322  	}
  1323  
  1324  	handleResponseHeaders := func() (*ClientResponse, error) {
  1325  		res := cs.res
  1326  		if res.StatusCode > 299 {
  1327  			// On error or status code 3xx, 4xx, 5xx, etc abort any
  1328  			// ongoing write, assuming that the server doesn't care
  1329  			// about our request body. If the server replied with 1xx or
  1330  			// 2xx, however, then assume the server DOES potentially
  1331  			// want our body (e.g. full-duplex streaming:
  1332  			// golang.org/issue/13444). If it turns out the server
  1333  			// doesn't, they'll RST_STREAM us soon enough. This is a
  1334  			// heuristic to avoid adding knobs to Transport. Hopefully
  1335  			// we can keep it.
  1336  			cs.abortRequestBodyWrite()
  1337  		}
  1338  		res.TLS = cc.tlsState
  1339  		if res.Body == NoBody && actualContentLength(req) == 0 {
  1340  			// If there isn't a request or response body still being
  1341  			// written, then wait for the stream to be closed before
  1342  			// RoundTrip returns.
  1343  			if err := waitDone(); err != nil {
  1344  				return nil, err
  1345  			}
  1346  		}
  1347  		return res, nil
  1348  	}
  1349  
  1350  	cancelRequest := func(cs *clientStream, err error) error {
  1351  		cs.cc.mu.Lock()
  1352  		bodyClosed := cs.reqBodyClosed
  1353  		cs.cc.mu.Unlock()
  1354  		// Wait for the request body to be closed.
  1355  		//
  1356  		// If nothing closed the body before now, abortStreamLocked
  1357  		// will have started a goroutine to close it.
  1358  		//
  1359  		// Closing the body before returning avoids a race condition
  1360  		// with net/http checking its readTrackingBody to see if the
  1361  		// body was read from or closed. See golang/go#60041.
  1362  		//
  1363  		// The body is closed in a separate goroutine without the
  1364  		// connection mutex held, but dropping the mutex before waiting
  1365  		// will keep us from holding it indefinitely if the body
  1366  		// close is slow for some reason.
  1367  		if bodyClosed != nil {
  1368  			<-bodyClosed
  1369  		}
  1370  		return err
  1371  	}
  1372  
  1373  	for {
  1374  		select {
  1375  		case <-cs.respHeaderRecv:
  1376  			return handleResponseHeaders()
  1377  		case <-cs.abort:
  1378  			select {
  1379  			case <-cs.respHeaderRecv:
  1380  				// If both cs.respHeaderRecv and cs.abort are signaling,
  1381  				// pick respHeaderRecv. The server probably wrote the
  1382  				// response and immediately reset the stream.
  1383  				// golang.org/issue/49645
  1384  				return handleResponseHeaders()
  1385  			default:
  1386  				waitDone()
  1387  				return nil, cs.abortErr
  1388  			}
  1389  		case <-ctx.Done():
  1390  			err := ctx.Err()
  1391  			cs.abortStream(err)
  1392  			return nil, cancelRequest(cs, err)
  1393  		case <-cs.reqCancel:
  1394  			cs.abortStream(errRequestCanceled)
  1395  			return nil, cancelRequest(cs, errRequestCanceled)
  1396  		}
  1397  	}
  1398  }
  1399  
  1400  // doRequest runs for the duration of the request lifetime.
  1401  //
  1402  // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1403  func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
  1404  	err := cs.writeRequest(req, streamf)
  1405  	cs.cleanupWriteRequest(err)
  1406  }
  1407  
  1408  var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1409  
  1410  // writeRequest sends a request.
  1411  //
  1412  // It returns nil after the request is written, the response read,
  1413  // and the request stream is half-closed by the peer.
  1414  //
  1415  // It returns non-nil if the request ends otherwise.
  1416  // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1417  func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
  1418  	cc := cs.cc
  1419  	ctx := cs.ctx
  1420  
  1421  	// wait for setting frames to be received, a server can change this value later,
  1422  	// but we just wait for the first settings frame
  1423  	var isExtendedConnect bool
  1424  	if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1425  		isExtendedConnect = true
  1426  	}
  1427  
  1428  	// Acquire the new-request lock by writing to reqHeaderMu.
  1429  	// This lock guards the critical section covering allocating a new stream ID
  1430  	// (requires mu) and creating the stream (requires wmu).
  1431  	if cc.reqHeaderMu == nil {
  1432  		panic("RoundTrip on uninitialized ClientConn") // for tests
  1433  	}
  1434  	if isExtendedConnect {
  1435  		select {
  1436  		case <-cs.reqCancel:
  1437  			return errRequestCanceled
  1438  		case <-ctx.Done():
  1439  			return ctx.Err()
  1440  		case <-cc.seenSettingsChan:
  1441  			if !cc.extendedConnectAllowed {
  1442  				return errExtendedConnectNotSupported
  1443  			}
  1444  		}
  1445  	}
  1446  	select {
  1447  	case cc.reqHeaderMu <- struct{}{}:
  1448  	case <-cs.reqCancel:
  1449  		return errRequestCanceled
  1450  	case <-ctx.Done():
  1451  		return ctx.Err()
  1452  	}
  1453  
  1454  	cc.mu.Lock()
  1455  	if cc.idleTimer != nil {
  1456  		cc.idleTimer.Stop()
  1457  	}
  1458  	cc.decrStreamReservationsLocked()
  1459  	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1460  		cc.mu.Unlock()
  1461  		<-cc.reqHeaderMu
  1462  		return err
  1463  	}
  1464  	cc.addStreamLocked(cs) // assigns stream ID
  1465  	if isConnectionCloseRequest(req) {
  1466  		cc.doNotReuse = true
  1467  	}
  1468  	cc.mu.Unlock()
  1469  
  1470  	if streamf != nil {
  1471  		streamf(cs)
  1472  	}
  1473  
  1474  	continueTimeout := cc.t.expectContinueTimeout()
  1475  	if continueTimeout != 0 {
  1476  		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1477  			continueTimeout = 0
  1478  		} else {
  1479  			cs.on100 = make(chan struct{}, 1)
  1480  		}
  1481  	}
  1482  
  1483  	// Past this point (where we send request headers), it is possible for
  1484  	// RoundTrip to return successfully. Since the RoundTrip contract permits
  1485  	// the caller to "mutate or reuse" the Request after closing the Response's Body,
  1486  	// we must take care when referencing the Request from here on.
  1487  	err = cs.encodeAndWriteHeaders(req)
  1488  	<-cc.reqHeaderMu
  1489  	if err != nil {
  1490  		return err
  1491  	}
  1492  
  1493  	hasBody := cs.reqBodyContentLength != 0
  1494  	if !hasBody {
  1495  		cs.sentEndStream = true
  1496  	} else {
  1497  		if continueTimeout != 0 {
  1498  			traceWait100Continue(cs.trace)
  1499  			timer := time.NewTimer(continueTimeout)
  1500  			select {
  1501  			case <-timer.C:
  1502  				err = nil
  1503  			case <-cs.on100:
  1504  				err = nil
  1505  			case <-cs.abort:
  1506  				err = cs.abortErr
  1507  			case <-ctx.Done():
  1508  				err = ctx.Err()
  1509  			case <-cs.reqCancel:
  1510  				err = errRequestCanceled
  1511  			}
  1512  			timer.Stop()
  1513  			if err != nil {
  1514  				traceWroteRequest(cs.trace, err)
  1515  				return err
  1516  			}
  1517  		}
  1518  
  1519  		if err = cs.writeRequestBody(req); err != nil {
  1520  			if err != errStopReqBodyWrite {
  1521  				traceWroteRequest(cs.trace, err)
  1522  				return err
  1523  			}
  1524  		} else {
  1525  			cs.sentEndStream = true
  1526  		}
  1527  	}
  1528  
  1529  	traceWroteRequest(cs.trace, err)
  1530  
  1531  	var respHeaderTimer <-chan time.Time
  1532  	var respHeaderRecv chan struct{}
  1533  	if d := cc.responseHeaderTimeout(); d != 0 {
  1534  		timer := time.NewTimer(d)
  1535  		defer timer.Stop()
  1536  		respHeaderTimer = timer.C
  1537  		respHeaderRecv = cs.respHeaderRecv
  1538  	}
  1539  	// Wait until the peer half-closes its end of the stream,
  1540  	// or until the request is aborted (via context, error, or otherwise),
  1541  	// whichever comes first.
  1542  	for {
  1543  		select {
  1544  		case <-cs.peerClosed:
  1545  			return nil
  1546  		case <-respHeaderTimer:
  1547  			return errTimeout
  1548  		case <-respHeaderRecv:
  1549  			respHeaderRecv = nil
  1550  			respHeaderTimer = nil // keep waiting for END_STREAM
  1551  		case <-cs.abort:
  1552  			return cs.abortErr
  1553  		case <-ctx.Done():
  1554  			return ctx.Err()
  1555  		case <-cs.reqCancel:
  1556  			return errRequestCanceled
  1557  		}
  1558  	}
  1559  }
  1560  
  1561  func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
  1562  	cc := cs.cc
  1563  	ctx := cs.ctx
  1564  
  1565  	cc.wmu.Lock()
  1566  	defer cc.wmu.Unlock()
  1567  
  1568  	// If the request was canceled while waiting for cc.mu, just quit.
  1569  	select {
  1570  	case <-cs.abort:
  1571  		return cs.abortErr
  1572  	case <-ctx.Done():
  1573  		return ctx.Err()
  1574  	case <-cs.reqCancel:
  1575  		return errRequestCanceled
  1576  	default:
  1577  	}
  1578  
  1579  	// Encode headers.
  1580  	//
  1581  	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1582  	// sent by writeRequestBody below, along with any Trailers,
  1583  	// again in form HEADERS{1}, CONTINUATION{0,})
  1584  	cc.hbuf.Reset()
  1585  	res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
  1586  		cc.writeHeader(name, value)
  1587  	})
  1588  	if err != nil {
  1589  		return fmt.Errorf("http2: %w", err)
  1590  	}
  1591  	hdrs := cc.hbuf.Bytes()
  1592  
  1593  	// Write the request.
  1594  	endStream := !res.HasBody && !res.HasTrailers
  1595  	cs.sentHeaders = true
  1596  	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1597  	traceWroteHeaders(cs.trace)
  1598  	return err
  1599  }
  1600  
  1601  func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
  1602  	return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
  1603  		Request: httpcommon.Request{
  1604  			Header:              req.Header,
  1605  			Trailer:             req.Trailer,
  1606  			URL:                 req.URL,
  1607  			Host:                req.Host,
  1608  			Method:              req.Method,
  1609  			ActualContentLength: actualContentLength(req),
  1610  		},
  1611  		AddGzipHeader:         addGzipHeader,
  1612  		PeerMaxHeaderListSize: peerMaxHeaderListSize,
  1613  		DefaultUserAgent:      defaultUserAgent,
  1614  	}, headerf)
  1615  }
  1616  
  1617  // cleanupWriteRequest performs post-request tasks.
  1618  //
  1619  // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1620  // cleanupWriteRequest will send a reset to the peer.
  1621  func (cs *clientStream) cleanupWriteRequest(err error) {
  1622  	cc := cs.cc
  1623  
  1624  	if cs.ID == 0 {
  1625  		// We were canceled before creating the stream, so return our reservation.
  1626  		cc.decrStreamReservations()
  1627  	}
  1628  
  1629  	// TODO: write h12Compare test showing whether
  1630  	// Request.Body is closed by the Transport,
  1631  	// and in multiple cases: server replies <=299 and >299
  1632  	// while still writing request body
  1633  	cc.mu.Lock()
  1634  	mustCloseBody := false
  1635  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1636  		mustCloseBody = true
  1637  		cs.reqBodyClosed = make(chan struct{})
  1638  	}
  1639  	bodyClosed := cs.reqBodyClosed
  1640  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1641  	// Have we read any frames from the connection since sending this request?
  1642  	readSinceStream := cc.readBeforeStreamID > cs.ID
  1643  	cc.mu.Unlock()
  1644  	if mustCloseBody {
  1645  		cs.reqBody.Close()
  1646  		close(bodyClosed)
  1647  	}
  1648  	if bodyClosed != nil {
  1649  		<-bodyClosed
  1650  	}
  1651  
  1652  	if err != nil && cs.sentEndStream {
  1653  		// If the connection is closed immediately after the response is read,
  1654  		// we may be aborted before finishing up here. If the stream was closed
  1655  		// cleanly on both sides, there is no error.
  1656  		select {
  1657  		case <-cs.peerClosed:
  1658  			err = nil
  1659  		default:
  1660  		}
  1661  	}
  1662  	if err != nil {
  1663  		cs.abortStream(err) // possibly redundant, but harmless
  1664  		if cs.sentHeaders {
  1665  			if se, ok := err.(StreamError); ok {
  1666  				if se.Cause != errFromPeer {
  1667  					cc.writeStreamReset(cs.ID, se.Code, false, err)
  1668  				}
  1669  			} else {
  1670  				// We're cancelling an in-flight request.
  1671  				//
  1672  				// This could be due to the server becoming unresponsive.
  1673  				// To avoid sending too many requests on a dead connection,
  1674  				// if we haven't read any frames from the connection since
  1675  				// sending this request, we let it continue to consume
  1676  				// a concurrency slot until we can confirm the server is
  1677  				// still responding.
  1678  				// We do this by sending a PING frame along with the RST_STREAM
  1679  				// (unless a ping is already in flight).
  1680  				//
  1681  				// For simplicity, we don't bother tracking the PING payload:
  1682  				// We reset cc.pendingResets any time we receive a PING ACK.
  1683  				//
  1684  				// We skip this if the conn is going to be closed on idle,
  1685  				// because it's short lived and will probably be closed before
  1686  				// we get the ping response.
  1687  				ping := false
  1688  				if !closeOnIdle && !readSinceStream {
  1689  					cc.mu.Lock()
  1690  					// rstStreamPingsBlocked works around a gRPC behavior:
  1691  					// see comment on the field for details.
  1692  					if !cc.rstStreamPingsBlocked {
  1693  						if cc.pendingResets == 0 {
  1694  							ping = true
  1695  						}
  1696  						cc.pendingResets++
  1697  					}
  1698  					cc.mu.Unlock()
  1699  				}
  1700  				cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1701  			}
  1702  		}
  1703  		cs.bufPipe.CloseWithError(err) // no-op if already closed
  1704  	} else {
  1705  		if cs.sentHeaders && !cs.sentEndStream {
  1706  			cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1707  		}
  1708  		cs.bufPipe.CloseWithError(errRequestCanceled)
  1709  	}
  1710  	if cs.ID != 0 {
  1711  		cc.forgetStreamID(cs.ID)
  1712  	}
  1713  
  1714  	cc.wmu.Lock()
  1715  	werr := cc.werr
  1716  	cc.wmu.Unlock()
  1717  	if werr != nil {
  1718  		cc.Close()
  1719  	}
  1720  
  1721  	close(cs.donec)
  1722  	cc.maybeCallStateHook()
  1723  }
  1724  
  1725  // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1726  // Must hold cc.mu.
  1727  func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1728  	for {
  1729  		if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1730  			// This is the very first request sent to this connection.
  1731  			// Return a fatal error which aborts the retry loop.
  1732  			return errClientConnNotEstablished
  1733  		}
  1734  		cc.lastActive = time.Now()
  1735  		if cc.closed || !cc.canTakeNewRequestLocked() {
  1736  			return errClientConnUnusable
  1737  		}
  1738  		cc.lastIdle = time.Time{}
  1739  		if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1740  			return nil
  1741  		}
  1742  		cc.pendingRequests++
  1743  		cc.cond.Wait()
  1744  		cc.pendingRequests--
  1745  		select {
  1746  		case <-cs.abort:
  1747  			return cs.abortErr
  1748  		default:
  1749  		}
  1750  	}
  1751  }
  1752  
  1753  // requires cc.wmu be held
  1754  func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1755  	first := true // first frame written (HEADERS is first, then CONTINUATION)
  1756  	for len(hdrs) > 0 && cc.werr == nil {
  1757  		chunk := hdrs
  1758  		if len(chunk) > maxFrameSize {
  1759  			chunk = chunk[:maxFrameSize]
  1760  		}
  1761  		hdrs = hdrs[len(chunk):]
  1762  		endHeaders := len(hdrs) == 0
  1763  		if first {
  1764  			cc.fr.WriteHeaders(HeadersFrameParam{
  1765  				StreamID:      streamID,
  1766  				BlockFragment: chunk,
  1767  				EndStream:     endStream,
  1768  				EndHeaders:    endHeaders,
  1769  			})
  1770  			first = false
  1771  		} else {
  1772  			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1773  		}
  1774  	}
  1775  	cc.bw.Flush()
  1776  	return cc.werr
  1777  }
  1778  
  1779  // internal error values; they don't escape to callers
  1780  var (
  1781  	// abort request body write; don't send cancel
  1782  	errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1783  
  1784  	// abort request body write, but send stream reset of cancel.
  1785  	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1786  
  1787  	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1788  )
  1789  
  1790  // frameScratchBufferLen returns the length of a buffer to use for
  1791  // outgoing request bodies to read/write to/from.
  1792  //
  1793  // It returns max(1, min(peer's advertised max frame size,
  1794  // Request.ContentLength+1, 512KB)).
  1795  func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1796  	const max = 512 << 10
  1797  	n := min(int64(maxFrameSize), max)
  1798  	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1799  		// Add an extra byte past the declared content-length to
  1800  		// give the caller's Request.Body io.Reader a chance to
  1801  		// give us more bytes than they declared, so we can catch it
  1802  		// early.
  1803  		n = cl + 1
  1804  	}
  1805  	if n < 1 {
  1806  		return 1
  1807  	}
  1808  	return int(n) // doesn't truncate; max is 512K
  1809  }
  1810  
  1811  // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1812  // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1813  // requests needing big buffers. The size ranges are as follows:
  1814  // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1815  // {256 KB, 512 KB], {512 KB, infinity}
  1816  // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1817  // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1818  // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1819  var bufPools [7]sync.Pool // of *[]byte
  1820  func bufPoolIndex(size int) int {
  1821  	if size <= 16384 {
  1822  		return 0
  1823  	}
  1824  	size -= 1
  1825  	bits := bits.Len(uint(size))
  1826  	index := bits - 14
  1827  	if index >= len(bufPools) {
  1828  		return len(bufPools) - 1
  1829  	}
  1830  	return index
  1831  }
  1832  
  1833  func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
  1834  	cc := cs.cc
  1835  	body := cs.reqBody
  1836  	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1837  
  1838  	hasTrailers := req.Trailer != nil
  1839  	remainLen := cs.reqBodyContentLength
  1840  	hasContentLen := remainLen != -1
  1841  
  1842  	cc.mu.Lock()
  1843  	maxFrameSize := int(cc.maxFrameSize)
  1844  	cc.mu.Unlock()
  1845  
  1846  	// Scratch buffer for reading into & writing from.
  1847  	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1848  	var buf []byte
  1849  	index := bufPoolIndex(scratchLen)
  1850  	if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1851  		defer bufPools[index].Put(bp)
  1852  		buf = *bp
  1853  	} else {
  1854  		buf = make([]byte, scratchLen)
  1855  		defer bufPools[index].Put(&buf)
  1856  	}
  1857  
  1858  	var sawEOF bool
  1859  	for !sawEOF {
  1860  		n, err := body.Read(buf)
  1861  		if hasContentLen {
  1862  			remainLen -= int64(n)
  1863  			if remainLen == 0 && err == nil {
  1864  				// The request body's Content-Length was predeclared and
  1865  				// we just finished reading it all, but the underlying io.Reader
  1866  				// returned the final chunk with a nil error (which is one of
  1867  				// the two valid things a Reader can do at EOF). Because we'd prefer
  1868  				// to send the END_STREAM bit early, double-check that we're actually
  1869  				// at EOF. Subsequent reads should return (0, EOF) at this point.
  1870  				// If either value is different, we return an error in one of two ways below.
  1871  				var scratch [1]byte
  1872  				var n1 int
  1873  				n1, err = body.Read(scratch[:])
  1874  				remainLen -= int64(n1)
  1875  			}
  1876  			if remainLen < 0 {
  1877  				err = errReqBodyTooLong
  1878  				return err
  1879  			}
  1880  		}
  1881  		if err != nil {
  1882  			cc.mu.Lock()
  1883  			bodyClosed := cs.reqBodyClosed != nil
  1884  			cc.mu.Unlock()
  1885  			switch {
  1886  			case bodyClosed:
  1887  				return errStopReqBodyWrite
  1888  			case err == io.EOF:
  1889  				sawEOF = true
  1890  				err = nil
  1891  			default:
  1892  				return err
  1893  			}
  1894  		}
  1895  
  1896  		remain := buf[:n]
  1897  		for len(remain) > 0 && err == nil {
  1898  			var allowed int32
  1899  			allowed, err = cs.awaitFlowControl(len(remain))
  1900  			if err != nil {
  1901  				return err
  1902  			}
  1903  			cc.wmu.Lock()
  1904  			data := remain[:allowed]
  1905  			remain = remain[allowed:]
  1906  			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1907  			err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1908  			if err == nil {
  1909  				// TODO(bradfitz): this flush is for latency, not bandwidth.
  1910  				// Most requests won't need this. Make this opt-in or
  1911  				// opt-out?  Use some heuristic on the body type? Nagel-like
  1912  				// timers?  Based on 'n'? Only last chunk of this for loop,
  1913  				// unless flow control tokens are low? For now, always.
  1914  				// If we change this, see comment below.
  1915  				err = cc.bw.Flush()
  1916  			}
  1917  			cc.wmu.Unlock()
  1918  		}
  1919  		if err != nil {
  1920  			return err
  1921  		}
  1922  	}
  1923  
  1924  	if sentEnd {
  1925  		// Already sent END_STREAM (which implies we have no
  1926  		// trailers) and flushed, because currently all
  1927  		// WriteData frames above get a flush. So we're done.
  1928  		return nil
  1929  	}
  1930  
  1931  	// Since the RoundTrip contract permits the caller to "mutate or reuse"
  1932  	// a request after the Response's Body is closed, verify that this hasn't
  1933  	// happened before accessing the trailers.
  1934  	cc.mu.Lock()
  1935  	trailer := req.Trailer
  1936  	err = cs.abortErr
  1937  	cc.mu.Unlock()
  1938  	if err != nil {
  1939  		return err
  1940  	}
  1941  
  1942  	cc.wmu.Lock()
  1943  	defer cc.wmu.Unlock()
  1944  	var trls []byte
  1945  	if len(trailer) > 0 {
  1946  		trls, err = cc.encodeTrailers(trailer)
  1947  		if err != nil {
  1948  			return err
  1949  		}
  1950  	}
  1951  
  1952  	// Two ways to send END_STREAM: either with trailers, or
  1953  	// with an empty DATA frame.
  1954  	if len(trls) > 0 {
  1955  		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  1956  	} else {
  1957  		err = cc.fr.WriteData(cs.ID, true, nil)
  1958  	}
  1959  	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  1960  		err = ferr
  1961  	}
  1962  	return err
  1963  }
  1964  
  1965  // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  1966  // control tokens from the server.
  1967  // It returns either the non-zero number of tokens taken or an error
  1968  // if the stream is dead.
  1969  func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  1970  	cc := cs.cc
  1971  	ctx := cs.ctx
  1972  	cc.mu.Lock()
  1973  	defer cc.mu.Unlock()
  1974  	for {
  1975  		if cc.closed {
  1976  			return 0, errClientConnClosed
  1977  		}
  1978  		if cs.reqBodyClosed != nil {
  1979  			return 0, errStopReqBodyWrite
  1980  		}
  1981  		select {
  1982  		case <-cs.abort:
  1983  			return 0, cs.abortErr
  1984  		case <-ctx.Done():
  1985  			return 0, ctx.Err()
  1986  		case <-cs.reqCancel:
  1987  			return 0, errRequestCanceled
  1988  		default:
  1989  		}
  1990  		if a := cs.flow.available(); a > 0 {
  1991  			take := a
  1992  			if int(take) > maxBytes {
  1993  
  1994  				take = int32(maxBytes) // can't truncate int; take is int32
  1995  			}
  1996  			if take > int32(cc.maxFrameSize) {
  1997  				take = int32(cc.maxFrameSize)
  1998  			}
  1999  			cs.flow.take(take)
  2000  			return take, nil
  2001  		}
  2002  		cc.cond.Wait()
  2003  	}
  2004  }
  2005  
  2006  // requires cc.wmu be held.
  2007  func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
  2008  	cc.hbuf.Reset()
  2009  
  2010  	hlSize := uint64(0)
  2011  	for k, vv := range trailer {
  2012  		for _, v := range vv {
  2013  			hf := hpack.HeaderField{Name: k, Value: v}
  2014  			hlSize += uint64(hf.Size())
  2015  		}
  2016  	}
  2017  	if hlSize > cc.peerMaxHeaderListSize {
  2018  		return nil, errRequestHeaderListSize
  2019  	}
  2020  
  2021  	for k, vv := range trailer {
  2022  		lowKey, ascii := httpcommon.LowerHeader(k)
  2023  		if !ascii {
  2024  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  2025  			// field names have to be ASCII characters (just as in HTTP/1.x).
  2026  			continue
  2027  		}
  2028  		// Transfer-Encoding, etc.. have already been filtered at the
  2029  		// start of RoundTrip
  2030  		for _, v := range vv {
  2031  			cc.writeHeader(lowKey, v)
  2032  		}
  2033  	}
  2034  	return cc.hbuf.Bytes(), nil
  2035  }
  2036  
  2037  func (cc *ClientConn) writeHeader(name, value string) {
  2038  	if VerboseLogs {
  2039  		log.Printf("http2: Transport encoding header %q = %q", name, value)
  2040  	}
  2041  	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  2042  }
  2043  
  2044  type resAndError struct {
  2045  	_   incomparable
  2046  	res *ClientResponse
  2047  	err error
  2048  }
  2049  
  2050  // requires cc.mu be held.
  2051  func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  2052  	cs.flow.add(int32(cc.initialWindowSize))
  2053  	cs.flow.setConnFlow(&cc.flow)
  2054  	cs.inflow.init(cc.initialStreamRecvWindowSize)
  2055  	cs.ID = cc.nextStreamID
  2056  	cc.nextStreamID += 2
  2057  	cc.streams[cs.ID] = cs
  2058  	if cs.ID == 0 {
  2059  		panic("assigned stream ID 0")
  2060  	}
  2061  }
  2062  
  2063  func (cc *ClientConn) forgetStreamID(id uint32) {
  2064  	cc.mu.Lock()
  2065  	slen := len(cc.streams)
  2066  	delete(cc.streams, id)
  2067  	if len(cc.streams) != slen-1 {
  2068  		panic("forgetting unknown stream id")
  2069  	}
  2070  	cc.lastActive = time.Now()
  2071  	if len(cc.streams) == 0 && cc.idleTimer != nil {
  2072  		cc.idleTimer.Reset(cc.idleTimeout)
  2073  		cc.lastIdle = time.Now()
  2074  	}
  2075  	// Wake up writeRequestBody via clientStream.awaitFlowControl and
  2076  	// wake up RoundTrip if there is a pending request.
  2077  	cc.cond.Broadcast()
  2078  
  2079  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  2080  	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  2081  		if VerboseLogs {
  2082  			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  2083  		}
  2084  		cc.closed = true
  2085  		defer cc.closeConn()
  2086  	}
  2087  
  2088  	cc.mu.Unlock()
  2089  }
  2090  
  2091  // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  2092  type clientConnReadLoop struct {
  2093  	_  incomparable
  2094  	cc *ClientConn
  2095  }
  2096  
  2097  // readLoop runs in its own goroutine and reads and dispatches frames.
  2098  func (cc *ClientConn) readLoop() {
  2099  	rl := &clientConnReadLoop{cc: cc}
  2100  	defer rl.cleanup()
  2101  	cc.readerErr = rl.run()
  2102  	if ce, ok := cc.readerErr.(ConnectionError); ok {
  2103  		cc.wmu.Lock()
  2104  		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  2105  		cc.wmu.Unlock()
  2106  	}
  2107  }
  2108  
  2109  // GoAwayError is returned by the Transport when the server closes the
  2110  // TCP connection after sending a GOAWAY frame.
  2111  type GoAwayError struct {
  2112  	LastStreamID uint32
  2113  	ErrCode      ErrCode
  2114  	DebugData    string
  2115  }
  2116  
  2117  func (e GoAwayError) Error() string {
  2118  	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  2119  		e.LastStreamID, e.ErrCode, e.DebugData)
  2120  }
  2121  
  2122  func isEOFOrNetReadError(err error) bool {
  2123  	if err == io.EOF {
  2124  		return true
  2125  	}
  2126  	ne, ok := err.(*net.OpError)
  2127  	return ok && ne.Op == "read"
  2128  }
  2129  
  2130  func (rl *clientConnReadLoop) cleanup() {
  2131  	cc := rl.cc
  2132  	defer cc.closeConn()
  2133  	defer close(cc.readerDone)
  2134  
  2135  	if cc.idleTimer != nil {
  2136  		cc.idleTimer.Stop()
  2137  	}
  2138  
  2139  	// Close any response bodies if the server closes prematurely.
  2140  	// TODO: also do this if we've written the headers but not
  2141  	// gotten a response yet.
  2142  	err := cc.readerErr
  2143  	cc.mu.Lock()
  2144  	if cc.goAway != nil && isEOFOrNetReadError(err) {
  2145  		err = GoAwayError{
  2146  			LastStreamID: cc.goAway.LastStreamID,
  2147  			ErrCode:      cc.goAway.ErrCode,
  2148  			DebugData:    cc.goAwayDebug,
  2149  		}
  2150  	} else if err == io.EOF {
  2151  		err = io.ErrUnexpectedEOF
  2152  	}
  2153  	cc.closed = true
  2154  
  2155  	// If the connection has never been used, and has been open for only a short time,
  2156  	// leave it in the connection pool for a little while.
  2157  	//
  2158  	// This avoids a situation where new connections are constantly created,
  2159  	// added to the pool, fail, and are removed from the pool, without any error
  2160  	// being surfaced to the user.
  2161  	unusedWaitTime := 5 * time.Second
  2162  	if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
  2163  		unusedWaitTime = cc.idleTimeout
  2164  	}
  2165  	idleTime := time.Now().Sub(cc.lastActive)
  2166  	if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
  2167  		cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
  2168  			cc.t.connPool().MarkDead(cc)
  2169  		})
  2170  	} else {
  2171  		cc.mu.Unlock() // avoid any deadlocks in MarkDead
  2172  		cc.t.connPool().MarkDead(cc)
  2173  		cc.mu.Lock()
  2174  	}
  2175  
  2176  	for _, cs := range cc.streams {
  2177  		select {
  2178  		case <-cs.peerClosed:
  2179  			// The server closed the stream before closing the conn,
  2180  			// so no need to interrupt it.
  2181  		default:
  2182  			cs.abortStreamLocked(err)
  2183  		}
  2184  	}
  2185  	cc.cond.Broadcast()
  2186  	cc.mu.Unlock()
  2187  
  2188  	if !cc.seenSettings {
  2189  		// If we have a pending request that wants extended CONNECT,
  2190  		// let it continue and fail with the connection error.
  2191  		cc.extendedConnectAllowed = true
  2192  		close(cc.seenSettingsChan)
  2193  	}
  2194  }
  2195  
  2196  // countReadFrameError calls Transport.CountError with a string
  2197  // representing err.
  2198  func (cc *ClientConn) countReadFrameError(err error) {
  2199  	f := cc.t.CountError
  2200  	if f == nil || err == nil {
  2201  		return
  2202  	}
  2203  	if ce, ok := err.(ConnectionError); ok {
  2204  		errCode := ErrCode(ce)
  2205  		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2206  		return
  2207  	}
  2208  	if errors.Is(err, io.EOF) {
  2209  		f("read_frame_eof")
  2210  		return
  2211  	}
  2212  	if errors.Is(err, io.ErrUnexpectedEOF) {
  2213  		f("read_frame_unexpected_eof")
  2214  		return
  2215  	}
  2216  	if errors.Is(err, ErrFrameTooLarge) {
  2217  		f("read_frame_too_large")
  2218  		return
  2219  	}
  2220  	f("read_frame_other")
  2221  }
  2222  
  2223  func (rl *clientConnReadLoop) run() error {
  2224  	cc := rl.cc
  2225  	gotSettings := false
  2226  	readIdleTimeout := cc.readIdleTimeout
  2227  	var t *time.Timer
  2228  	if readIdleTimeout != 0 {
  2229  		t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
  2230  	}
  2231  	for {
  2232  		f, err := cc.fr.ReadFrame()
  2233  		if t != nil {
  2234  			t.Reset(readIdleTimeout)
  2235  		}
  2236  		if err != nil {
  2237  			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2238  		}
  2239  		if se, ok := err.(StreamError); ok {
  2240  			if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2241  				if se.Cause == nil {
  2242  					se.Cause = cc.fr.errDetail
  2243  				}
  2244  				rl.endStreamError(cs, se)
  2245  			}
  2246  			continue
  2247  		} else if err != nil {
  2248  			cc.countReadFrameError(err)
  2249  			return err
  2250  		}
  2251  		if VerboseLogs {
  2252  			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2253  		}
  2254  		if !gotSettings {
  2255  			if _, ok := f.(*SettingsFrame); !ok {
  2256  				cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2257  				return ConnectionError(ErrCodeProtocol)
  2258  			}
  2259  			gotSettings = true
  2260  		}
  2261  
  2262  		switch f := f.(type) {
  2263  		case *MetaHeadersFrame:
  2264  			err = rl.processHeaders(f)
  2265  		case *DataFrame:
  2266  			err = rl.processData(f)
  2267  		case *GoAwayFrame:
  2268  			err = rl.processGoAway(f)
  2269  		case *RSTStreamFrame:
  2270  			err = rl.processResetStream(f)
  2271  		case *SettingsFrame:
  2272  			err = rl.processSettings(f)
  2273  		case *PushPromiseFrame:
  2274  			err = rl.processPushPromise(f)
  2275  		case *WindowUpdateFrame:
  2276  			err = rl.processWindowUpdate(f)
  2277  		case *PingFrame:
  2278  			err = rl.processPing(f)
  2279  		default:
  2280  			cc.logf("Transport: unhandled response frame type %T", f)
  2281  		}
  2282  		if err != nil {
  2283  			if VerboseLogs {
  2284  				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2285  			}
  2286  			return err
  2287  		}
  2288  	}
  2289  }
  2290  
  2291  func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2292  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2293  	if cs == nil {
  2294  		// We'd get here if we canceled a request while the
  2295  		// server had its response still in flight. So if this
  2296  		// was just something we canceled, ignore it.
  2297  		return nil
  2298  	}
  2299  	if cs.readClosed {
  2300  		rl.endStreamError(cs, StreamError{
  2301  			StreamID: f.StreamID,
  2302  			Code:     ErrCodeProtocol,
  2303  			Cause:    errors.New("protocol error: headers after END_STREAM"),
  2304  		})
  2305  		return nil
  2306  	}
  2307  	if !cs.firstByte {
  2308  		if cs.trace != nil {
  2309  			// TODO(bradfitz): move first response byte earlier,
  2310  			// when we first read the 9 byte header, not waiting
  2311  			// until all the HEADERS+CONTINUATION frames have been
  2312  			// merged. This works for now.
  2313  			traceFirstResponseByte(cs.trace)
  2314  		}
  2315  		cs.firstByte = true
  2316  	}
  2317  	if !cs.pastHeaders {
  2318  		cs.pastHeaders = true
  2319  	} else {
  2320  		return rl.processTrailers(cs, f)
  2321  	}
  2322  
  2323  	res, err := rl.handleResponse(cs, f)
  2324  	if err != nil {
  2325  		if _, ok := err.(ConnectionError); ok {
  2326  			return err
  2327  		}
  2328  		// Any other error type is a stream error.
  2329  		rl.endStreamError(cs, StreamError{
  2330  			StreamID: f.StreamID,
  2331  			Code:     ErrCodeProtocol,
  2332  			Cause:    err,
  2333  		})
  2334  		return nil // return nil from process* funcs to keep conn alive
  2335  	}
  2336  	if res == nil {
  2337  		// (nil, nil) special case. See handleResponse docs.
  2338  		return nil
  2339  	}
  2340  	cs.res = res
  2341  	close(cs.respHeaderRecv)
  2342  	if f.StreamEnded() {
  2343  		rl.endStream(cs)
  2344  	}
  2345  	return nil
  2346  }
  2347  
  2348  // may return error types nil, or ConnectionError. Any other error value
  2349  // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2350  // is the detail.
  2351  //
  2352  // As a special case, handleResponse may return (nil, nil) to skip the
  2353  // frame (currently only used for 1xx responses).
  2354  func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
  2355  	if f.Truncated {
  2356  		return nil, errResponseHeaderListSize
  2357  	}
  2358  
  2359  	status := f.PseudoValue("status")
  2360  	if status == "" {
  2361  		return nil, errors.New("malformed response from server: missing status pseudo header")
  2362  	}
  2363  	statusCode, err := strconv.Atoi(status)
  2364  	if err != nil {
  2365  		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2366  	}
  2367  
  2368  	regularFields := f.RegularFields()
  2369  	strs := make([]string, len(regularFields))
  2370  	header := make(Header, len(regularFields))
  2371  	res := &cs.staticResp
  2372  	cs.staticResp = ClientResponse{
  2373  		Header:     header,
  2374  		StatusCode: statusCode,
  2375  		Status:     status,
  2376  	}
  2377  	for _, hf := range regularFields {
  2378  		key := httpcommon.CanonicalHeader(hf.Name)
  2379  		if key == "Trailer" {
  2380  			t := res.Trailer
  2381  			if t == nil {
  2382  				t = make(Header)
  2383  				res.Trailer = t
  2384  			}
  2385  			foreachHeaderElement(hf.Value, func(v string) {
  2386  				t[httpcommon.CanonicalHeader(v)] = nil
  2387  			})
  2388  		} else {
  2389  			vv := header[key]
  2390  			if vv == nil && len(strs) > 0 {
  2391  				// More than likely this will be a single-element key.
  2392  				// Most headers aren't multi-valued.
  2393  				// Set the capacity on strs[0] to 1, so any future append
  2394  				// won't extend the slice into the other strings.
  2395  				vv, strs = strs[:1:1], strs[1:]
  2396  				vv[0] = hf.Value
  2397  				header[key] = vv
  2398  			} else {
  2399  				header[key] = append(vv, hf.Value)
  2400  			}
  2401  		}
  2402  	}
  2403  
  2404  	if statusCode >= 100 && statusCode <= 199 {
  2405  		if f.StreamEnded() {
  2406  			return nil, errors.New("1xx informational response with END_STREAM flag")
  2407  		}
  2408  		if fn := cs.get1xxTraceFunc(); fn != nil {
  2409  			// If the 1xx response is being delivered to the user,
  2410  			// then they're responsible for limiting the number
  2411  			// of responses.
  2412  			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2413  				return nil, err
  2414  			}
  2415  		} else {
  2416  			// If the user didn't examine the 1xx response, then we
  2417  			// limit the size of all 1xx headers.
  2418  			//
  2419  			// This differs a bit from the HTTP/1 implementation, which
  2420  			// limits the size of all 1xx headers plus the final response.
  2421  			// Use the larger limit of MaxHeaderListSize and
  2422  			// net/http.Transport.MaxResponseHeaderBytes.
  2423  			limit := int64(cs.cc.t.maxHeaderListSize())
  2424  			if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
  2425  				limit = t1.MaxResponseHeaderBytes()
  2426  			}
  2427  			for _, h := range f.Fields {
  2428  				cs.totalHeaderSize += int64(h.Size())
  2429  			}
  2430  			if cs.totalHeaderSize > limit {
  2431  				if VerboseLogs {
  2432  					log.Printf("http2: 1xx informational responses too large")
  2433  				}
  2434  				return nil, errors.New("header list too large")
  2435  			}
  2436  		}
  2437  		if statusCode == 100 {
  2438  			traceGot100Continue(cs.trace)
  2439  			select {
  2440  			case cs.on100 <- struct{}{}:
  2441  			default:
  2442  			}
  2443  		}
  2444  		cs.pastHeaders = false // do it all again
  2445  		return nil, nil
  2446  	}
  2447  
  2448  	res.ContentLength = -1
  2449  	if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2450  		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2451  			res.ContentLength = int64(cl)
  2452  		} else {
  2453  			// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2454  			// more safe smuggling-wise to ignore.
  2455  		}
  2456  	} else if len(clens) > 1 {
  2457  		// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2458  		// more safe smuggling-wise to ignore.
  2459  	} else if f.StreamEnded() && !cs.isHead {
  2460  		res.ContentLength = 0
  2461  	}
  2462  
  2463  	if cs.isHead {
  2464  		res.Body = NoBody
  2465  		return res, nil
  2466  	}
  2467  
  2468  	if f.StreamEnded() {
  2469  		if res.ContentLength > 0 {
  2470  			res.Body = missingBody{}
  2471  		} else {
  2472  			res.Body = NoBody
  2473  		}
  2474  		return res, nil
  2475  	}
  2476  
  2477  	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2478  	cs.bytesRemain = res.ContentLength
  2479  	res.Body = transportResponseBody{cs}
  2480  
  2481  	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2482  		res.Header.Del("Content-Encoding")
  2483  		res.Header.Del("Content-Length")
  2484  		res.ContentLength = -1
  2485  		res.Body = &gzipReader{body: res.Body}
  2486  		res.Uncompressed = true
  2487  	}
  2488  	return res, nil
  2489  }
  2490  
  2491  func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2492  	if cs.pastTrailers {
  2493  		// Too many HEADERS frames for this stream.
  2494  		return ConnectionError(ErrCodeProtocol)
  2495  	}
  2496  	cs.pastTrailers = true
  2497  	if !f.StreamEnded() {
  2498  		// We expect that any headers for trailers also
  2499  		// has END_STREAM.
  2500  		return ConnectionError(ErrCodeProtocol)
  2501  	}
  2502  	if len(f.PseudoFields()) > 0 {
  2503  		// No pseudo header fields are defined for trailers.
  2504  		// TODO: ConnectionError might be overly harsh? Check.
  2505  		return ConnectionError(ErrCodeProtocol)
  2506  	}
  2507  
  2508  	trailer := make(Header)
  2509  	for _, hf := range f.RegularFields() {
  2510  		key := httpcommon.CanonicalHeader(hf.Name)
  2511  		trailer[key] = append(trailer[key], hf.Value)
  2512  	}
  2513  	cs.trailer = trailer
  2514  
  2515  	rl.endStream(cs)
  2516  	return nil
  2517  }
  2518  
  2519  // transportResponseBody is the concrete type of Transport.RoundTrip's
  2520  // Response.Body. It is an io.ReadCloser.
  2521  type transportResponseBody struct {
  2522  	cs *clientStream
  2523  }
  2524  
  2525  func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2526  	cs := b.cs
  2527  	cc := cs.cc
  2528  
  2529  	if cs.readErr != nil {
  2530  		return 0, cs.readErr
  2531  	}
  2532  	n, err = b.cs.bufPipe.Read(p)
  2533  	if cs.bytesRemain != -1 {
  2534  		if int64(n) > cs.bytesRemain {
  2535  			n = int(cs.bytesRemain)
  2536  			if err == nil {
  2537  				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2538  				cs.abortStream(err)
  2539  			}
  2540  			cs.readErr = err
  2541  			return int(cs.bytesRemain), err
  2542  		}
  2543  		cs.bytesRemain -= int64(n)
  2544  		if err == io.EOF && cs.bytesRemain > 0 {
  2545  			err = io.ErrUnexpectedEOF
  2546  			cs.readErr = err
  2547  			return n, err
  2548  		}
  2549  	}
  2550  	if n == 0 {
  2551  		// No flow control tokens to send back.
  2552  		return
  2553  	}
  2554  
  2555  	cc.mu.Lock()
  2556  	connAdd := cc.inflow.add(n)
  2557  	var streamAdd int32
  2558  	if err == nil { // No need to refresh if the stream is over or failed.
  2559  		streamAdd = cs.inflow.add(n)
  2560  	}
  2561  	cc.mu.Unlock()
  2562  
  2563  	if connAdd != 0 || streamAdd != 0 {
  2564  		cc.wmu.Lock()
  2565  		defer cc.wmu.Unlock()
  2566  		if connAdd != 0 {
  2567  			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2568  		}
  2569  		if streamAdd != 0 {
  2570  			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2571  		}
  2572  		cc.bw.Flush()
  2573  	}
  2574  	return
  2575  }
  2576  
  2577  var errClosedResponseBody = errors.New("http2: response body closed")
  2578  
  2579  func (b transportResponseBody) Close() error {
  2580  	cs := b.cs
  2581  	cc := cs.cc
  2582  
  2583  	cs.bufPipe.BreakWithError(errClosedResponseBody)
  2584  	cs.abortStream(errClosedResponseBody)
  2585  
  2586  	unread := cs.bufPipe.Len()
  2587  	if unread > 0 {
  2588  		cc.mu.Lock()
  2589  		// Return connection-level flow control.
  2590  		connAdd := cc.inflow.add(unread)
  2591  		cc.mu.Unlock()
  2592  
  2593  		// TODO(dneil): Acquiring this mutex can block indefinitely.
  2594  		// Move flow control return to a goroutine?
  2595  		cc.wmu.Lock()
  2596  		// Return connection-level flow control.
  2597  		if connAdd > 0 {
  2598  			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2599  		}
  2600  		cc.bw.Flush()
  2601  		cc.wmu.Unlock()
  2602  	}
  2603  
  2604  	select {
  2605  	case <-cs.donec:
  2606  	case <-cs.ctx.Done():
  2607  		// See golang/go#49366: The net/http package can cancel the
  2608  		// request context after the response body is fully read.
  2609  		// Don't treat this as an error.
  2610  		return nil
  2611  	case <-cs.reqCancel:
  2612  		return errRequestCanceled
  2613  	}
  2614  	return nil
  2615  }
  2616  
  2617  func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2618  	cc := rl.cc
  2619  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2620  	data := f.Data()
  2621  	if cs == nil {
  2622  		cc.mu.Lock()
  2623  		neverSent := cc.nextStreamID
  2624  		cc.mu.Unlock()
  2625  		if f.StreamID >= neverSent {
  2626  			// We never asked for this.
  2627  			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2628  			return ConnectionError(ErrCodeProtocol)
  2629  		}
  2630  		// We probably did ask for this, but canceled. Just ignore it.
  2631  		// TODO: be stricter here? only silently ignore things which
  2632  		// we canceled, but not things which were closed normally
  2633  		// by the peer? Tough without accumulating too much state.
  2634  
  2635  		// But at least return their flow control:
  2636  		if f.Length > 0 {
  2637  			cc.mu.Lock()
  2638  			ok := cc.inflow.take(f.Length)
  2639  			connAdd := cc.inflow.add(int(f.Length))
  2640  			cc.mu.Unlock()
  2641  			if !ok {
  2642  				return ConnectionError(ErrCodeFlowControl)
  2643  			}
  2644  			if connAdd > 0 {
  2645  				cc.wmu.Lock()
  2646  				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2647  				cc.bw.Flush()
  2648  				cc.wmu.Unlock()
  2649  			}
  2650  		}
  2651  		return nil
  2652  	}
  2653  	if cs.readClosed {
  2654  		cc.logf("protocol error: received DATA after END_STREAM")
  2655  		rl.endStreamError(cs, StreamError{
  2656  			StreamID: f.StreamID,
  2657  			Code:     ErrCodeProtocol,
  2658  		})
  2659  		return nil
  2660  	}
  2661  	if !cs.pastHeaders {
  2662  		cc.logf("protocol error: received DATA before a HEADERS frame")
  2663  		rl.endStreamError(cs, StreamError{
  2664  			StreamID: f.StreamID,
  2665  			Code:     ErrCodeProtocol,
  2666  		})
  2667  		return nil
  2668  	}
  2669  	if f.Length > 0 {
  2670  		if cs.isHead && len(data) > 0 {
  2671  			cc.logf("protocol error: received DATA on a HEAD request")
  2672  			rl.endStreamError(cs, StreamError{
  2673  				StreamID: f.StreamID,
  2674  				Code:     ErrCodeProtocol,
  2675  			})
  2676  			return nil
  2677  		}
  2678  		// Check connection-level flow control.
  2679  		cc.mu.Lock()
  2680  		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2681  			cc.mu.Unlock()
  2682  			return ConnectionError(ErrCodeFlowControl)
  2683  		}
  2684  		// Return any padded flow control now, since we won't
  2685  		// refund it later on body reads.
  2686  		var refund int
  2687  		if pad := int(f.Length) - len(data); pad > 0 {
  2688  			refund += pad
  2689  		}
  2690  
  2691  		didReset := false
  2692  		var err error
  2693  		if len(data) > 0 {
  2694  			if _, err = cs.bufPipe.Write(data); err != nil {
  2695  				// Return len(data) now if the stream is already closed,
  2696  				// since data will never be read.
  2697  				didReset = true
  2698  				refund += len(data)
  2699  			}
  2700  		}
  2701  
  2702  		sendConn := cc.inflow.add(refund)
  2703  		var sendStream int32
  2704  		if !didReset {
  2705  			sendStream = cs.inflow.add(refund)
  2706  		}
  2707  		cc.mu.Unlock()
  2708  
  2709  		if sendConn > 0 || sendStream > 0 {
  2710  			cc.wmu.Lock()
  2711  			if sendConn > 0 {
  2712  				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2713  			}
  2714  			if sendStream > 0 {
  2715  				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2716  			}
  2717  			cc.bw.Flush()
  2718  			cc.wmu.Unlock()
  2719  		}
  2720  
  2721  		if err != nil {
  2722  			rl.endStreamError(cs, err)
  2723  			return nil
  2724  		}
  2725  	}
  2726  
  2727  	if f.StreamEnded() {
  2728  		rl.endStream(cs)
  2729  	}
  2730  	return nil
  2731  }
  2732  
  2733  func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  2734  	// TODO: check that any declared content-length matches, like
  2735  	// server.go's (*stream).endStream method.
  2736  	if !cs.readClosed {
  2737  		cs.readClosed = true
  2738  		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  2739  		// race condition: The caller can read io.EOF from Response.Body
  2740  		// and close the body before we close cs.peerClosed, causing
  2741  		// cleanupWriteRequest to send a RST_STREAM.
  2742  		rl.cc.mu.Lock()
  2743  		defer rl.cc.mu.Unlock()
  2744  		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  2745  		close(cs.peerClosed)
  2746  	}
  2747  }
  2748  
  2749  func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  2750  	cs.readAborted = true
  2751  	cs.abortStream(err)
  2752  }
  2753  
  2754  func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
  2755  	cs.readAborted = true
  2756  	cs.abortStreamLocked(err)
  2757  }
  2758  
  2759  // Constants passed to streamByID for documentation purposes.
  2760  const (
  2761  	headerOrDataFrame    = true
  2762  	notHeaderOrDataFrame = false
  2763  )
  2764  
  2765  // streamByID returns the stream with the given id, or nil if no stream has that id.
  2766  // If headerOrData is true, it clears rst.StreamPingsBlocked.
  2767  func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  2768  	rl.cc.mu.Lock()
  2769  	defer rl.cc.mu.Unlock()
  2770  	if headerOrData {
  2771  		// Work around an unfortunate gRPC behavior.
  2772  		// See comment on ClientConn.rstStreamPingsBlocked for details.
  2773  		rl.cc.rstStreamPingsBlocked = false
  2774  	}
  2775  	rl.cc.readBeforeStreamID = rl.cc.nextStreamID
  2776  	cs := rl.cc.streams[id]
  2777  	if cs != nil && !cs.readAborted {
  2778  		return cs
  2779  	}
  2780  	return nil
  2781  }
  2782  
  2783  func (cs *clientStream) copyTrailers() {
  2784  	for k, vv := range cs.trailer {
  2785  		t := cs.resTrailer
  2786  		if *t == nil {
  2787  			*t = make(Header)
  2788  		}
  2789  		(*t)[k] = vv
  2790  	}
  2791  }
  2792  
  2793  func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  2794  	cc := rl.cc
  2795  	cc.t.connPool().MarkDead(cc)
  2796  	if f.ErrCode != 0 {
  2797  		// TODO: deal with GOAWAY more. particularly the error code
  2798  		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  2799  		if fn := cc.t.CountError; fn != nil {
  2800  			fn("recv_goaway_" + f.ErrCode.stringToken())
  2801  		}
  2802  	}
  2803  	cc.setGoAway(f)
  2804  	return nil
  2805  }
  2806  
  2807  func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  2808  	cc := rl.cc
  2809  	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  2810  	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  2811  	cc.wmu.Lock()
  2812  	defer cc.wmu.Unlock()
  2813  
  2814  	if err := rl.processSettingsNoWrite(f); err != nil {
  2815  		return err
  2816  	}
  2817  	if !f.IsAck() {
  2818  		cc.fr.WriteSettingsAck()
  2819  		cc.bw.Flush()
  2820  	}
  2821  	return nil
  2822  }
  2823  
  2824  func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  2825  	cc := rl.cc
  2826  	defer cc.maybeCallStateHook()
  2827  	cc.mu.Lock()
  2828  	defer cc.mu.Unlock()
  2829  
  2830  	if f.IsAck() {
  2831  		if cc.wantSettingsAck {
  2832  			cc.wantSettingsAck = false
  2833  			return nil
  2834  		}
  2835  		return ConnectionError(ErrCodeProtocol)
  2836  	}
  2837  
  2838  	var seenMaxConcurrentStreams bool
  2839  	err := f.ForeachSetting(func(s Setting) error {
  2840  		switch s.ID {
  2841  		case SettingMaxFrameSize:
  2842  			cc.maxFrameSize = s.Val
  2843  		case SettingMaxConcurrentStreams:
  2844  			cc.maxConcurrentStreams = s.Val
  2845  			seenMaxConcurrentStreams = true
  2846  		case SettingMaxHeaderListSize:
  2847  			cc.peerMaxHeaderListSize = uint64(s.Val)
  2848  		case SettingInitialWindowSize:
  2849  			// Values above the maximum flow-control
  2850  			// window size of 2^31-1 MUST be treated as a
  2851  			// connection error (Section 5.4.1) of type
  2852  			// FLOW_CONTROL_ERROR.
  2853  			if s.Val > math.MaxInt32 {
  2854  				return ConnectionError(ErrCodeFlowControl)
  2855  			}
  2856  
  2857  			// Adjust flow control of currently-open
  2858  			// frames by the difference of the old initial
  2859  			// window size and this one.
  2860  			delta := int32(s.Val) - int32(cc.initialWindowSize)
  2861  			for _, cs := range cc.streams {
  2862  				cs.flow.add(delta)
  2863  			}
  2864  			cc.cond.Broadcast()
  2865  
  2866  			cc.initialWindowSize = s.Val
  2867  		case SettingHeaderTableSize:
  2868  			cc.henc.SetMaxDynamicTableSize(s.Val)
  2869  			cc.peerMaxHeaderTableSize = s.Val
  2870  		case SettingEnableConnectProtocol:
  2871  			if err := s.Valid(); err != nil {
  2872  				return err
  2873  			}
  2874  			// If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  2875  			// we require that it do so in the first SETTINGS frame.
  2876  			//
  2877  			// When we attempt to use extended CONNECT, we wait for the first
  2878  			// SETTINGS frame to see if the server supports it. If we let the
  2879  			// server enable the feature with a later SETTINGS frame, then
  2880  			// users will see inconsistent results depending on whether we've
  2881  			// seen that frame or not.
  2882  			if !cc.seenSettings {
  2883  				cc.extendedConnectAllowed = s.Val == 1
  2884  			}
  2885  		default:
  2886  			cc.vlogf("Unhandled Setting: %v", s)
  2887  		}
  2888  		return nil
  2889  	})
  2890  	if err != nil {
  2891  		return err
  2892  	}
  2893  
  2894  	if !cc.seenSettings {
  2895  		if !seenMaxConcurrentStreams {
  2896  			// This was the servers initial SETTINGS frame and it
  2897  			// didn't contain a MAX_CONCURRENT_STREAMS field so
  2898  			// increase the number of concurrent streams this
  2899  			// connection can establish to our default.
  2900  			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  2901  		}
  2902  		close(cc.seenSettingsChan)
  2903  		cc.seenSettings = true
  2904  	}
  2905  
  2906  	return nil
  2907  }
  2908  
  2909  func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  2910  	cc := rl.cc
  2911  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2912  	if f.StreamID != 0 && cs == nil {
  2913  		return nil
  2914  	}
  2915  
  2916  	cc.mu.Lock()
  2917  	defer cc.mu.Unlock()
  2918  
  2919  	fl := &cc.flow
  2920  	if cs != nil {
  2921  		fl = &cs.flow
  2922  	}
  2923  	if !fl.add(int32(f.Increment)) {
  2924  		// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  2925  		if cs != nil {
  2926  			rl.endStreamErrorLocked(cs, StreamError{
  2927  				StreamID: f.StreamID,
  2928  				Code:     ErrCodeFlowControl,
  2929  			})
  2930  			return nil
  2931  		}
  2932  
  2933  		return ConnectionError(ErrCodeFlowControl)
  2934  	}
  2935  	cc.cond.Broadcast()
  2936  	return nil
  2937  }
  2938  
  2939  func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  2940  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2941  	if cs == nil {
  2942  		// TODO: return error if server tries to RST_STREAM an idle stream
  2943  		return nil
  2944  	}
  2945  	serr := streamError(cs.ID, f.ErrCode)
  2946  	serr.Cause = errFromPeer
  2947  	if f.ErrCode == ErrCodeProtocol {
  2948  		rl.cc.SetDoNotReuse()
  2949  	}
  2950  	if fn := cs.cc.t.CountError; fn != nil {
  2951  		fn("recv_rststream_" + f.ErrCode.stringToken())
  2952  	}
  2953  	cs.abortStream(serr)
  2954  
  2955  	cs.bufPipe.CloseWithError(serr)
  2956  	return nil
  2957  }
  2958  
  2959  // Ping sends a PING frame to the server and waits for the ack.
  2960  func (cc *ClientConn) Ping(ctx context.Context) error {
  2961  	c := make(chan struct{})
  2962  	// Generate a random payload
  2963  	var p [8]byte
  2964  	for {
  2965  		if _, err := rand.Read(p[:]); err != nil {
  2966  			return err
  2967  		}
  2968  		cc.mu.Lock()
  2969  		// check for dup before insert
  2970  		if _, found := cc.pings[p]; !found {
  2971  			cc.pings[p] = c
  2972  			cc.mu.Unlock()
  2973  			break
  2974  		}
  2975  		cc.mu.Unlock()
  2976  	}
  2977  	var pingError error
  2978  	errc := make(chan struct{})
  2979  	go func() {
  2980  		cc.wmu.Lock()
  2981  		defer cc.wmu.Unlock()
  2982  		if pingError = cc.fr.WritePing(false, p); pingError != nil {
  2983  			close(errc)
  2984  			return
  2985  		}
  2986  		if pingError = cc.bw.Flush(); pingError != nil {
  2987  			close(errc)
  2988  			return
  2989  		}
  2990  	}()
  2991  	select {
  2992  	case <-c:
  2993  		return nil
  2994  	case <-errc:
  2995  		return pingError
  2996  	case <-ctx.Done():
  2997  		return ctx.Err()
  2998  	case <-cc.readerDone:
  2999  		// connection closed
  3000  		return cc.readerErr
  3001  	}
  3002  }
  3003  
  3004  func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  3005  	if f.IsAck() {
  3006  		cc := rl.cc
  3007  		defer cc.maybeCallStateHook()
  3008  		cc.mu.Lock()
  3009  		defer cc.mu.Unlock()
  3010  		// If ack, notify listener if any
  3011  		if c, ok := cc.pings[f.Data]; ok {
  3012  			close(c)
  3013  			delete(cc.pings, f.Data)
  3014  		}
  3015  		if cc.pendingResets > 0 {
  3016  			// See clientStream.cleanupWriteRequest.
  3017  			cc.pendingResets = 0
  3018  			cc.rstStreamPingsBlocked = true
  3019  			cc.cond.Broadcast()
  3020  		}
  3021  		return nil
  3022  	}
  3023  	cc := rl.cc
  3024  	cc.wmu.Lock()
  3025  	defer cc.wmu.Unlock()
  3026  	if err := cc.fr.WritePing(true, f.Data); err != nil {
  3027  		return err
  3028  	}
  3029  	return cc.bw.Flush()
  3030  }
  3031  
  3032  func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  3033  	// We told the peer we don't want them.
  3034  	// Spec says:
  3035  	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  3036  	// setting of the peer endpoint is set to 0. An endpoint that
  3037  	// has set this setting and has received acknowledgement MUST
  3038  	// treat the receipt of a PUSH_PROMISE frame as a connection
  3039  	// error (Section 5.4.1) of type PROTOCOL_ERROR."
  3040  	return ConnectionError(ErrCodeProtocol)
  3041  }
  3042  
  3043  // writeStreamReset sends a RST_STREAM frame.
  3044  // When ping is true, it also sends a PING frame with a random payload.
  3045  func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  3046  	// TODO: map err to more interesting error codes, once the
  3047  	// HTTP community comes up with some. But currently for
  3048  	// RST_STREAM there's no equivalent to GOAWAY frame's debug
  3049  	// data, and the error codes are all pretty vague ("cancel").
  3050  	cc.wmu.Lock()
  3051  	cc.fr.WriteRSTStream(streamID, code)
  3052  	if ping {
  3053  		var payload [8]byte
  3054  		rand.Read(payload[:])
  3055  		cc.fr.WritePing(false, payload)
  3056  	}
  3057  	cc.bw.Flush()
  3058  	cc.wmu.Unlock()
  3059  }
  3060  
  3061  var (
  3062  	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  3063  	errRequestHeaderListSize  = httpcommon.ErrRequestHeaderListSize
  3064  )
  3065  
  3066  func (cc *ClientConn) logf(format string, args ...any) {
  3067  	cc.t.logf(format, args...)
  3068  }
  3069  
  3070  func (cc *ClientConn) vlogf(format string, args ...any) {
  3071  	cc.t.vlogf(format, args...)
  3072  }
  3073  
  3074  func (t *Transport) vlogf(format string, args ...any) {
  3075  	if VerboseLogs {
  3076  		t.logf(format, args...)
  3077  	}
  3078  }
  3079  
  3080  func (t *Transport) logf(format string, args ...any) {
  3081  	log.Printf(format, args...)
  3082  }
  3083  
  3084  type missingBody struct{}
  3085  
  3086  func (missingBody) Close() error             { return nil }
  3087  func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  3088  
  3089  type erringRoundTripper struct{ err error }
  3090  
  3091  func (rt erringRoundTripper) RoundTripErr() error                               { return rt.err }
  3092  func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
  3093  
  3094  var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
  3095  
  3096  // gzipReader wraps a response body so it can lazily
  3097  // get gzip.Reader from the pool on the first call to Read.
  3098  // After Close is called it puts gzip.Reader to the pool immediately
  3099  // if there is no Read in progress or later when Read completes.
  3100  type gzipReader struct {
  3101  	_    incomparable
  3102  	body io.ReadCloser // underlying Response.Body
  3103  	mu   sync.Mutex    // guards zr and zerr
  3104  	zr   *gzip.Reader  // stores gzip reader from the pool between reads
  3105  	zerr error         // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
  3106  }
  3107  
  3108  type eofReader struct{}
  3109  
  3110  func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
  3111  func (eofReader) ReadByte() (byte, error)  { return 0, io.EOF }
  3112  
  3113  var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
  3114  
  3115  // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
  3116  func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
  3117  	zr := gzipPool.Get().(*gzip.Reader)
  3118  	if err := zr.Reset(r); err != nil {
  3119  		gzipPoolPut(zr)
  3120  		return nil, err
  3121  	}
  3122  	return zr, nil
  3123  }
  3124  
  3125  // gzipPoolPut puts a gzip.Reader back into the pool.
  3126  func gzipPoolPut(zr *gzip.Reader) {
  3127  	// Reset will allocate bufio.Reader if we pass it anything
  3128  	// other than a flate.Reader, so ensure that it's getting one.
  3129  	var r flate.Reader = eofReader{}
  3130  	zr.Reset(r)
  3131  	gzipPool.Put(zr)
  3132  }
  3133  
  3134  // acquire returns a gzip.Reader for reading response body.
  3135  // The reader must be released after use.
  3136  func (gz *gzipReader) acquire() (*gzip.Reader, error) {
  3137  	gz.mu.Lock()
  3138  	defer gz.mu.Unlock()
  3139  	if gz.zerr != nil {
  3140  		return nil, gz.zerr
  3141  	}
  3142  	if gz.zr == nil {
  3143  		gz.zr, gz.zerr = gzipPoolGet(gz.body)
  3144  		if gz.zerr != nil {
  3145  			return nil, gz.zerr
  3146  		}
  3147  	}
  3148  	ret := gz.zr
  3149  	gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
  3150  	return ret, nil
  3151  }
  3152  
  3153  // release returns the gzip.Reader to the pool if Close was called during Read.
  3154  func (gz *gzipReader) release(zr *gzip.Reader) {
  3155  	gz.mu.Lock()
  3156  	defer gz.mu.Unlock()
  3157  	if gz.zerr == errConcurrentReadOnResBody {
  3158  		gz.zr, gz.zerr = zr, nil
  3159  	} else { // fs.ErrClosed
  3160  		gzipPoolPut(zr)
  3161  	}
  3162  }
  3163  
  3164  // close returns the gzip.Reader to the pool immediately or
  3165  // signals release to do so after Read completes.
  3166  func (gz *gzipReader) close() {
  3167  	gz.mu.Lock()
  3168  	defer gz.mu.Unlock()
  3169  	if gz.zerr == nil && gz.zr != nil {
  3170  		gzipPoolPut(gz.zr)
  3171  		gz.zr = nil
  3172  	}
  3173  	gz.zerr = fs.ErrClosed
  3174  }
  3175  
  3176  func (gz *gzipReader) Read(p []byte) (n int, err error) {
  3177  	zr, err := gz.acquire()
  3178  	if err != nil {
  3179  		return 0, err
  3180  	}
  3181  	defer gz.release(zr)
  3182  
  3183  	return zr.Read(p)
  3184  }
  3185  
  3186  func (gz *gzipReader) Close() error {
  3187  	gz.close()
  3188  
  3189  	return gz.body.Close()
  3190  }
  3191  
  3192  // isConnectionCloseRequest reports whether req should use its own
  3193  // connection for a single request and then close the connection.
  3194  func isConnectionCloseRequest(req *ClientRequest) bool {
  3195  	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  3196  }
  3197  
  3198  // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
  3199  // the RoundTripper returned by NewClientConn.
  3200  type NetHTTPClientConn struct {
  3201  	cc *ClientConn
  3202  }
  3203  
  3204  func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  3205  	return cc.cc.RoundTrip(req)
  3206  }
  3207  
  3208  func (cc NetHTTPClientConn) Close() error {
  3209  	return cc.cc.Close()
  3210  }
  3211  
  3212  func (cc NetHTTPClientConn) Err() error {
  3213  	cc.cc.mu.Lock()
  3214  	defer cc.cc.mu.Unlock()
  3215  	if cc.cc.closed {
  3216  		return errors.New("connection closed")
  3217  	}
  3218  	return nil
  3219  }
  3220  
  3221  func (cc NetHTTPClientConn) Reserve() error {
  3222  	defer cc.cc.maybeCallStateHook()
  3223  	cc.cc.mu.Lock()
  3224  	defer cc.cc.mu.Unlock()
  3225  	if !cc.cc.canReserveLocked() {
  3226  		return errors.New("connection is unavailable")
  3227  	}
  3228  	cc.cc.streamsReserved++
  3229  	return nil
  3230  }
  3231  
  3232  func (cc NetHTTPClientConn) Release() {
  3233  	defer cc.cc.maybeCallStateHook()
  3234  	cc.cc.mu.Lock()
  3235  	defer cc.cc.mu.Unlock()
  3236  	// We don't complain if streamsReserved is 0.
  3237  	//
  3238  	// This is consistent with RoundTrip: both Release and RoundTrip will
  3239  	// consume a reservation iff one exists.
  3240  	if cc.cc.streamsReserved > 0 {
  3241  		cc.cc.streamsReserved--
  3242  	}
  3243  }
  3244  
  3245  func (cc NetHTTPClientConn) Available() int {
  3246  	cc.cc.mu.Lock()
  3247  	defer cc.cc.mu.Unlock()
  3248  	return cc.cc.availableLocked()
  3249  }
  3250  
  3251  func (cc NetHTTPClientConn) InFlight() int {
  3252  	cc.cc.mu.Lock()
  3253  	defer cc.cc.mu.Unlock()
  3254  	return cc.cc.currentRequestCountLocked()
  3255  }
  3256  
  3257  func (cc *ClientConn) maybeCallStateHook() {
  3258  	if cc.internalStateHook != nil {
  3259  		cc.internalStateHook()
  3260  	}
  3261  }
  3262  
  3263  func (t *Transport) idleConnTimeout() time.Duration {
  3264  	// to keep things backwards compatible, we use non-zero values of
  3265  	// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
  3266  	// http1 transport, followed by 0
  3267  	if t.IdleConnTimeout != 0 {
  3268  		return t.IdleConnTimeout
  3269  	}
  3270  
  3271  	if t.t1 != nil {
  3272  		return t.t1.IdleConnTimeout()
  3273  	}
  3274  
  3275  	return 0
  3276  }
  3277  
  3278  func traceGetConn(req *ClientRequest, hostPort string) {
  3279  	trace := httptrace.ContextClientTrace(req.Context)
  3280  	if trace == nil || trace.GetConn == nil {
  3281  		return
  3282  	}
  3283  	trace.GetConn(hostPort)
  3284  }
  3285  
  3286  func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
  3287  	trace := httptrace.ContextClientTrace(req.Context)
  3288  	if trace == nil || trace.GotConn == nil {
  3289  		return
  3290  	}
  3291  	ci := httptrace.GotConnInfo{Conn: cc.tconn}
  3292  	ci.Reused = reused
  3293  	cc.mu.Lock()
  3294  	ci.WasIdle = len(cc.streams) == 0 && reused
  3295  	if ci.WasIdle && !cc.lastActive.IsZero() {
  3296  		ci.IdleTime = time.Since(cc.lastActive)
  3297  	}
  3298  	cc.mu.Unlock()
  3299  
  3300  	trace.GotConn(ci)
  3301  }
  3302  
  3303  func traceWroteHeaders(trace *httptrace.ClientTrace) {
  3304  	if trace != nil && trace.WroteHeaders != nil {
  3305  		trace.WroteHeaders()
  3306  	}
  3307  }
  3308  
  3309  func traceGot100Continue(trace *httptrace.ClientTrace) {
  3310  	if trace != nil && trace.Got100Continue != nil {
  3311  		trace.Got100Continue()
  3312  	}
  3313  }
  3314  
  3315  func traceWait100Continue(trace *httptrace.ClientTrace) {
  3316  	if trace != nil && trace.Wait100Continue != nil {
  3317  		trace.Wait100Continue()
  3318  	}
  3319  }
  3320  
  3321  func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  3322  	if trace != nil && trace.WroteRequest != nil {
  3323  		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  3324  	}
  3325  }
  3326  
  3327  func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  3328  	if trace != nil && trace.GotFirstResponseByte != nil {
  3329  		trace.GotFirstResponseByte()
  3330  	}
  3331  }
  3332  
  3333  func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  3334  	if trace != nil {
  3335  		return trace.Got1xxResponse
  3336  	}
  3337  	return nil
  3338  }
  3339  
  3340  // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  3341  // connection.
  3342  func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  3343  	dialer := &tls.Dialer{
  3344  		Config: cfg,
  3345  	}
  3346  	cn, err := dialer.DialContext(ctx, network, addr)
  3347  	if err != nil {
  3348  		return nil, err
  3349  	}
  3350  	tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  3351  	return tlsCn, nil
  3352  }
  3353  

View as plain text