Source file src/vendor/golang.org/x/net/quic/stream.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package quic
     6  
     7  import (
     8  	"context"
     9  	"errors"
    10  	"fmt"
    11  	"io"
    12  	"math"
    13  	"sync"
    14  
    15  	"golang.org/x/net/internal/quic/quicwire"
    16  )
    17  
    18  // A Stream is an ordered byte stream.
    19  //
    20  // Streams may be bidirectional, read-only, or write-only.
    21  // Methods inappropriate for a stream's direction
    22  // (for example, [Write] to a read-only stream)
    23  // return errors.
    24  //
    25  // It is not safe to perform concurrent reads from or writes to a stream.
    26  // It is safe, however, to read and write at the same time.
    27  //
    28  // Reads and writes are buffered.
    29  // It is generally not necessary to wrap a stream in a [bufio.ReadWriter]
    30  // or otherwise apply additional buffering.
    31  //
    32  // To cancel reads or writes, use the [SetReadContext] and [SetWriteContext] methods.
    33  type Stream struct {
    34  	id   streamID
    35  	conn *Conn
    36  
    37  	// Contexts used for read/write operations.
    38  	// Intentionally not mutex-guarded, to allow the race detector to catch concurrent access.
    39  	inctx  context.Context
    40  	outctx context.Context
    41  
    42  	// ingate's lock guards receive-related state.
    43  	//
    44  	// The gate condition is set if a read from the stream will not block,
    45  	// either because the stream has available data or because the read will fail.
    46  	ingate      gate
    47  	in          pipe            // received data
    48  	inwin       int64           // last MAX_STREAM_DATA sent to the peer
    49  	insendmax   sentVal         // set when we should send MAX_STREAM_DATA to the peer
    50  	inmaxbuf    int64           // maximum amount of data we will buffer
    51  	insize      int64           // stream final size; -1 before this is known
    52  	inset       rangeset[int64] // received ranges
    53  	inclosed    sentVal         // set by CloseRead
    54  	inresetcode int64           // RESET_STREAM code received from the peer; -1 if not reset
    55  
    56  	// outgate's lock guards send-related state.
    57  	//
    58  	// The gate condition is set if a write to the stream will not block,
    59  	// either because the stream has available flow control or because
    60  	// the write will fail.
    61  	outgate      gate
    62  	out          pipe            // buffered data to send
    63  	outflushed   int64           // offset of last flush call
    64  	outwin       int64           // maximum MAX_STREAM_DATA received from the peer
    65  	outmaxsent   int64           // maximum data offset we've sent to the peer
    66  	outmaxbuf    int64           // maximum amount of data we will buffer
    67  	outunsent    rangeset[int64] // ranges buffered but not yet sent (only flushed data)
    68  	outacked     rangeset[int64] // ranges sent and acknowledged
    69  	outopened    sentVal         // set if we should open the stream
    70  	outclosed    sentVal         // set by CloseWrite
    71  	outblocked   sentVal         // set when a write to the stream is blocked by flow control
    72  	outreset     sentVal         // set by Reset
    73  	outresetcode uint64          // reset code to send in RESET_STREAM
    74  	outdone      chan struct{}   // closed when all data sent
    75  
    76  	// Buffers used for fast path; mutex-guarded, but uncontended in normal operations.
    77  	inbufmu  sync.Mutex
    78  	inbuf    []byte // received data
    79  	inbufoff int    // bytes of inbuf which have been consumed
    80  
    81  	outbufmu  sync.Mutex
    82  	outbuf    []byte // written data
    83  	outbufoff int    // bytes of outbuf which contain data to write
    84  
    85  	// Atomic stream state bits.
    86  	//
    87  	// These bits provide a fast way to coordinate between the
    88  	// send and receive sides of the stream, and the conn's loop.
    89  	//
    90  	// streamIn* bits must be set with ingate held.
    91  	// streamOut* bits must be set with outgate held.
    92  	// streamConn* bits are set by the conn's loop.
    93  	// streamQueue* bits must be set with streamsState.sendMu held.
    94  	state atomicBits[streamState]
    95  
    96  	prev, next *Stream // guarded by streamsState.sendMu
    97  }
    98  
    99  type streamState uint32
   100  
   101  const (
   102  	// streamInSendMeta is set when there are frames to send for the
   103  	// inbound side of the stream. For example, MAX_STREAM_DATA.
   104  	// Inbound frames are never flow-controlled.
   105  	streamInSendMeta = streamState(1 << iota)
   106  
   107  	// streamOutSendMeta is set when there are non-flow-controlled frames
   108  	// to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED.
   109  	// streamOutSendData is set when there are no non-flow-controlled outbound frames
   110  	// and the stream has data to send.
   111  	//
   112  	// At most one of streamOutSendMeta and streamOutSendData is set at any time.
   113  	streamOutSendMeta
   114  	streamOutSendData
   115  
   116  	// streamInDone and streamOutDone are set when the inbound or outbound
   117  	// sides of the stream are finished. When both are set, the stream
   118  	// can be removed from the Conn and forgotten.
   119  	streamInDone
   120  	streamOutDone
   121  
   122  	// streamConnRemoved is set when the stream has been removed from the conn.
   123  	streamConnRemoved
   124  
   125  	// streamQueueMeta and streamQueueData indicate which of the streamsState
   126  	// send queues the conn is currently on.
   127  	streamQueueMeta
   128  	streamQueueData
   129  )
   130  
   131  type streamQueue int
   132  
   133  const (
   134  	noQueue   = streamQueue(iota)
   135  	metaQueue // streamsState.queueMeta
   136  	dataQueue // streamsState.queueData
   137  )
   138  
   139  // streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream
   140  // was implicitly reset when the connection closed. It's out of the range of
   141  // possible reset codes the peer can send.
   142  const streamResetByConnClose = math.MaxInt64
   143  
   144  // wantQueue returns the send queue the stream should be on.
   145  func (s streamState) wantQueue() streamQueue {
   146  	switch {
   147  	case s&(streamInSendMeta|streamOutSendMeta) != 0:
   148  		return metaQueue
   149  	case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
   150  		return metaQueue
   151  	case s&streamOutSendData != 0:
   152  		// The stream has no non-flow-controlled frames to send,
   153  		// but does have data. Put it on the data queue, which is only
   154  		// processed when flow control is available.
   155  		return dataQueue
   156  	}
   157  	return noQueue
   158  }
   159  
   160  // inQueue returns the send queue the stream is currently on.
   161  func (s streamState) inQueue() streamQueue {
   162  	switch {
   163  	case s&streamQueueMeta != 0:
   164  		return metaQueue
   165  	case s&streamQueueData != 0:
   166  		return dataQueue
   167  	}
   168  	return noQueue
   169  }
   170  
   171  // newStream returns a new stream.
   172  //
   173  // The stream's ingate and outgate are locked.
   174  // (We create the stream with locked gates so after the caller
   175  // initializes the flow control window,
   176  // unlocking outgate will set the stream writability state.)
   177  func newStream(c *Conn, id streamID) *Stream {
   178  	s := &Stream{
   179  		conn:        c,
   180  		id:          id,
   181  		insize:      -1, // -1 indicates the stream size is unknown
   182  		inresetcode: -1, // -1 indicates no RESET_STREAM received
   183  		ingate:      newLockedGate(),
   184  		outgate:     newLockedGate(),
   185  		inctx:       context.Background(),
   186  		outctx:      context.Background(),
   187  	}
   188  	if !s.IsReadOnly() {
   189  		s.outdone = make(chan struct{})
   190  	}
   191  	return s
   192  }
   193  
   194  // ID returns the QUIC stream ID of s.
   195  //
   196  // As specified in RFC 9000, the two least significant bits of a stream ID
   197  // indicate the initiator and directionality of the stream. The upper bits are
   198  // the stream number.
   199  func (s *Stream) ID() int64 {
   200  	return int64(s.id)
   201  }
   202  
   203  // SetReadContext sets the context used for reads from the stream.
   204  //
   205  // It is not safe to call SetReadContext concurrently.
   206  func (s *Stream) SetReadContext(ctx context.Context) {
   207  	s.inctx = ctx
   208  }
   209  
   210  // SetWriteContext sets the context used for writes to the stream.
   211  // The write context is also used by Close when waiting for writes to be
   212  // received by the peer.
   213  //
   214  // It is not safe to call SetWriteContext concurrently.
   215  func (s *Stream) SetWriteContext(ctx context.Context) {
   216  	s.outctx = ctx
   217  }
   218  
   219  // IsReadOnly reports whether the stream is read-only
   220  // (a unidirectional stream created by the peer).
   221  func (s *Stream) IsReadOnly() bool {
   222  	return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
   223  }
   224  
   225  // IsWriteOnly reports whether the stream is write-only
   226  // (a unidirectional stream created locally).
   227  func (s *Stream) IsWriteOnly() bool {
   228  	return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
   229  }
   230  
   231  // Read reads data from the stream.
   232  //
   233  // Read returns as soon as at least one byte of data is available.
   234  //
   235  // If the peer closes the stream cleanly, Read returns io.EOF after
   236  // returning all data sent by the peer.
   237  // If the peer aborts reads on the stream, Read returns
   238  // an error wrapping StreamResetCode.
   239  //
   240  // It is not safe to call Read concurrently.
   241  func (s *Stream) Read(b []byte) (n int, err error) {
   242  	if s.IsWriteOnly() {
   243  		return 0, errors.New("read from write-only stream")
   244  	}
   245  
   246  	fastPath := false
   247  	s.inbufmu.Lock()
   248  	if len(s.inbuf) > s.inbufoff {
   249  		// Fast path: If s.inbuf contains unread bytes, return them immediately
   250  		// without taking a lock.
   251  		n = copy(b, s.inbuf[s.inbufoff:])
   252  		s.inbufoff += n
   253  		fastPath = true
   254  	}
   255  	s.inbufmu.Unlock()
   256  	if fastPath {
   257  		return n, nil
   258  	}
   259  
   260  	if err := s.ingate.waitAndLock(s.inctx); err != nil {
   261  		return 0, err
   262  	}
   263  
   264  	if s.inbufoff > 0 {
   265  		// Discard bytes consumed by the fast path above.
   266  		s.in.discardBefore(s.in.start + int64(s.inbufoff))
   267  		s.inbufmu.Lock()
   268  		s.inbufoff = 0
   269  		s.inbuf = nil
   270  		s.inbufmu.Unlock()
   271  	}
   272  
   273  	// bytesRead contains the number of bytes of connection-level flow control to return.
   274  	// We return flow control for bytes read by this Read call, as well as bytes moved
   275  	// to the fast-path read buffer (s.inbuf).
   276  	var bytesRead int64
   277  	defer func() {
   278  		s.inUnlock()
   279  		s.conn.handleStreamBytesReadOffLoop(bytesRead) // must be done with ingate unlocked
   280  	}()
   281  	if s.inresetcode != -1 {
   282  		if s.inresetcode == streamResetByConnClose {
   283  			if err := s.conn.finalError(); err != nil {
   284  				return 0, err
   285  			}
   286  		}
   287  		return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
   288  	}
   289  	if s.inclosed.isSet() {
   290  		return 0, errors.New("read from closed stream")
   291  	}
   292  	if s.insize == s.in.start {
   293  		return 0, io.EOF
   294  	}
   295  	// Getting here indicates the stream contains data to be read.
   296  	if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
   297  		panic("BUG: inconsistent input stream state")
   298  	}
   299  	if size := int(s.inset[0].end - s.in.start); size < len(b) {
   300  		b = b[:size]
   301  	}
   302  	bytesRead = int64(len(b))
   303  	start := s.in.start
   304  	end := start + int64(len(b))
   305  	s.in.copy(start, b)
   306  	s.in.discardBefore(end)
   307  	if end == s.insize {
   308  		// We have read up to the end of the stream.
   309  		// No need to update stream flow control.
   310  		return len(b), io.EOF
   311  	}
   312  
   313  	if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start {
   314  		// If we have more readable bytes available, put the next chunk of data
   315  		// in s.inbuf for lock-free reads.
   316  		s.inbufmu.Lock()
   317  		s.inbuf = s.in.peek(s.inset[0].end - s.in.start)
   318  		s.inbufmu.Unlock()
   319  		bytesRead += int64(len(s.inbuf))
   320  	}
   321  	if s.insize == -1 || s.insize > s.inwin {
   322  		newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf
   323  		addedWindow := newWindow - s.inwin
   324  		if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) {
   325  			// Update stream flow control with a STREAM_MAX_DATA frame.
   326  			s.insendmax.setUnsent()
   327  		}
   328  	}
   329  
   330  	return len(b), nil
   331  }
   332  
   333  // ReadByte reads and returns a single byte from the stream.
   334  //
   335  // It is not safe to call ReadByte concurrently.
   336  func (s *Stream) ReadByte() (byte, error) {
   337  	fastPath := false
   338  	s.inbufmu.Lock()
   339  	var readByte byte
   340  	if len(s.inbuf) > s.inbufoff {
   341  		readByte = s.inbuf[s.inbufoff]
   342  		s.inbufoff++
   343  		fastPath = true
   344  	}
   345  	s.inbufmu.Unlock()
   346  	if fastPath {
   347  		return readByte, nil
   348  	}
   349  
   350  	var b [1]byte
   351  	n, err := s.Read(b[:])
   352  	if n > 0 {
   353  		return b[0], nil
   354  	}
   355  	return 0, err
   356  }
   357  
   358  // shouldUpdateFlowControl determines whether to send a flow control window update.
   359  //
   360  // We want to balance keeping the peer well-supplied with flow control with not sending
   361  // many small updates.
   362  func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
   363  	return addedWindow >= maxWindow/8
   364  }
   365  
   366  // Write writes data to the stream.
   367  //
   368  // Write writes data to the stream write buffer.
   369  // Buffered data is only sent when the buffer is sufficiently full.
   370  // Call the Flush method to ensure buffered data is sent.
   371  func (s *Stream) Write(b []byte) (n int, err error) {
   372  	if s.IsReadOnly() {
   373  		return 0, errors.New("write to read-only stream")
   374  	}
   375  
   376  	fastPath := false
   377  	s.outbufmu.Lock()
   378  	if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
   379  		// Fast path: The data to write fits in s.outbuf.
   380  		copy(s.outbuf[s.outbufoff:], b)
   381  		s.outbufoff += len(b)
   382  		fastPath = true
   383  	}
   384  	s.outbufmu.Unlock()
   385  	if fastPath {
   386  		return len(b), nil
   387  	}
   388  
   389  	canWrite := s.outgate.lock()
   390  	s.flushFastOutputBuffer()
   391  	for {
   392  		// The first time through this loop, we may or may not be write blocked.
   393  		// We exit the loop after writing all data, so on subsequent passes through
   394  		// the loop we are always write blocked.
   395  		if len(b) > 0 && !canWrite {
   396  			// Our send buffer is full. Wait for the peer to ack some data.
   397  			s.outUnlock()
   398  			if err := s.outgate.waitAndLock(s.outctx); err != nil {
   399  				return n, err
   400  			}
   401  			// Successfully returning from waitAndLockGate means we are no longer
   402  			// write blocked. (Unlike traditional condition variables, gates do not
   403  			// have spurious wakeups.)
   404  		}
   405  		if err := s.writeErrorLocked(); err != nil {
   406  			s.outUnlock()
   407  			return n, err
   408  		}
   409  		if len(b) == 0 {
   410  			break
   411  		}
   412  		// Write limit is our send buffer limit.
   413  		// This is a stream offset.
   414  		lim := s.out.start + s.outmaxbuf
   415  		// Amount to write is min(the full buffer, data up to the write limit).
   416  		// This is a number of bytes.
   417  		nn := min(int64(len(b)), lim-s.out.end)
   418  		// Copy the data into the output buffer.
   419  		s.out.writeAt(b[:nn], s.out.end)
   420  		b = b[nn:]
   421  		n += int(nn)
   422  		// Possibly flush the output buffer.
   423  		// We automatically flush if:
   424  		//   - We have enough data to consume the send window.
   425  		//     Sending this data may cause the peer to extend the window.
   426  		//   - We have buffered as much data as we're willing do.
   427  		//     We need to send data to clear out buffer space.
   428  		//   - We have enough data to fill a 1-RTT packet using the smallest
   429  		//     possible maximum datagram size (1200 bytes, less header byte,
   430  		//     connection ID, packet number, and AEAD overhead).
   431  		const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
   432  		shouldFlush := s.out.end >= s.outwin || // peer send window is full
   433  			s.out.end >= lim || // local send buffer is full
   434  			(s.out.end-s.outflushed) >= autoFlushSize // enough data buffered
   435  		if shouldFlush {
   436  			s.flushLocked()
   437  		}
   438  		if s.out.end > s.outwin {
   439  			// We're blocked by flow control.
   440  			// Send a STREAM_DATA_BLOCKED frame to let the peer know.
   441  			s.outblocked.set()
   442  		}
   443  		// If we have bytes left to send, we're blocked.
   444  		canWrite = false
   445  	}
   446  	if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
   447  		// If s.out has space allocated and available to be written into,
   448  		// then reference it in s.outbuf for fast-path writes.
   449  		//
   450  		// It's perhaps a bit pointless to limit s.outbuf to the send buffer limit.
   451  		// We've already allocated this buffer so we aren't saving any memory
   452  		// by not using it.
   453  		// For now, we limit it anyway to make it easier to reason about limits.
   454  		//
   455  		// We set the limit to one less than the send buffer limit (the -1 above)
   456  		// so that a write which completely fills the buffer will overflow
   457  		// s.outbuf and trigger a flush.
   458  		s.outbufmu.Lock()
   459  		s.outbuf = s.out.availableBuffer()
   460  		if int64(len(s.outbuf)) > lim {
   461  			s.outbuf = s.outbuf[:lim]
   462  		}
   463  		s.outbufmu.Unlock()
   464  	}
   465  	s.outUnlock()
   466  	return n, nil
   467  }
   468  
   469  // WriteByte writes a single byte to the stream.
   470  func (s *Stream) WriteByte(c byte) error {
   471  	fastPath := false
   472  	s.outbufmu.Lock()
   473  	if s.outbufoff < len(s.outbuf) {
   474  		s.outbuf[s.outbufoff] = c
   475  		s.outbufoff++
   476  		fastPath = true
   477  	}
   478  	s.outbufmu.Unlock()
   479  	if fastPath {
   480  		return nil
   481  	}
   482  
   483  	b := [1]byte{c}
   484  	_, err := s.Write(b[:])
   485  	return err
   486  }
   487  
   488  func (s *Stream) flushFastOutputBuffer() {
   489  	s.outbufmu.Lock()
   490  	defer s.outbufmu.Unlock()
   491  	if s.outbuf == nil {
   492  		return
   493  	}
   494  	// Commit data previously written to s.outbuf.
   495  	// s.outbuf is a reference to a buffer in s.out, so we just need to record
   496  	// that the output buffer has been extended.
   497  	s.out.end += int64(s.outbufoff)
   498  	s.outbuf = nil
   499  	s.outbufoff = 0
   500  }
   501  
   502  // Flush flushes data written to the stream.
   503  // It does not wait for the peer to acknowledge receipt of the data.
   504  // Use Close to wait for the peer's acknowledgement.
   505  func (s *Stream) Flush() error {
   506  	if s.IsReadOnly() {
   507  		return errors.New("flush of read-only stream")
   508  	}
   509  	s.outgate.lock()
   510  	defer s.outUnlock()
   511  	if err := s.writeErrorLocked(); err != nil {
   512  		return err
   513  	}
   514  	s.flushLocked()
   515  	return nil
   516  }
   517  
   518  // writeErrorLocked returns the error (if any) which should be returned by write operations
   519  // due to the stream being reset or closed.
   520  func (s *Stream) writeErrorLocked() error {
   521  	if s.outreset.isSet() {
   522  		if s.outresetcode == streamResetByConnClose {
   523  			if err := s.conn.finalError(); err != nil {
   524  				return err
   525  			}
   526  		}
   527  		return errors.New("write to reset stream")
   528  	}
   529  	if s.outclosed.isSet() {
   530  		return errors.New("write to closed stream")
   531  	}
   532  	return nil
   533  }
   534  
   535  func (s *Stream) flushLocked() {
   536  	s.flushFastOutputBuffer()
   537  	s.outopened.set()
   538  	if s.outflushed < s.outwin {
   539  		s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
   540  	}
   541  	s.outflushed = s.out.end
   542  }
   543  
   544  // Close closes the stream.
   545  // Any blocked stream operations will be unblocked and return errors.
   546  //
   547  // Close flushes any data in the stream write buffer and waits for the peer to
   548  // acknowledge receipt of the data.
   549  // If the stream has been reset, it waits for the peer to acknowledge the reset.
   550  // If the context expires before the peer receives the stream's data,
   551  // Close discards the buffer and returns the context error.
   552  func (s *Stream) Close() error {
   553  	s.CloseRead()
   554  	if s.IsReadOnly() {
   555  		return nil
   556  	}
   557  	s.CloseWrite()
   558  	// TODO: Return code from peer's RESET_STREAM frame?
   559  	if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil {
   560  		return err
   561  	}
   562  	s.outgate.lock()
   563  	defer s.outUnlock()
   564  	if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
   565  		return nil
   566  	}
   567  	return errors.New("stream reset")
   568  }
   569  
   570  // CloseRead aborts reads on the stream.
   571  // Any blocked reads will be unblocked and return errors.
   572  //
   573  // CloseRead notifies the peer that the stream has been closed for reading.
   574  // It does not wait for the peer to acknowledge the closure.
   575  // Use Close to wait for the peer's acknowledgement.
   576  func (s *Stream) CloseRead() {
   577  	if s.IsWriteOnly() {
   578  		return
   579  	}
   580  	s.ingate.lock()
   581  	if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
   582  		// We've already received all data from the peer,
   583  		// so there's no need to send STOP_SENDING.
   584  		// This is the same as saying we sent one and they got it.
   585  		s.inclosed.setReceived()
   586  	} else {
   587  		s.inclosed.set()
   588  	}
   589  	discarded := s.in.end - s.in.start
   590  	s.in.discardBefore(s.in.end)
   591  	s.inUnlock()
   592  	s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked
   593  }
   594  
   595  // CloseWrite aborts writes on the stream.
   596  // Any blocked writes will be unblocked and return errors.
   597  //
   598  // CloseWrite sends any data in the stream write buffer to the peer.
   599  // It does not wait for the peer to acknowledge receipt of the data.
   600  // Use Close to wait for the peer's acknowledgement.
   601  func (s *Stream) CloseWrite() {
   602  	if s.IsReadOnly() {
   603  		return
   604  	}
   605  	s.outgate.lock()
   606  	defer s.outUnlock()
   607  	s.outclosed.set()
   608  	s.flushLocked()
   609  }
   610  
   611  // Reset aborts writes on the stream and notifies the peer
   612  // that the stream was terminated abruptly.
   613  // Any blocked writes will be unblocked and return errors.
   614  //
   615  // Reset sends the application protocol error code, which must be
   616  // less than 2^62, to the peer.
   617  // It does not wait for the peer to acknowledge receipt of the error.
   618  // Use Close to wait for the peer's acknowledgement.
   619  //
   620  // Reset does not affect reads.
   621  // Use CloseRead to abort reads on the stream.
   622  func (s *Stream) Reset(code uint64) {
   623  	const userClosed = true
   624  	s.resetInternal(code, userClosed)
   625  }
   626  
   627  // resetInternal resets the send side of the stream.
   628  //
   629  // If userClosed is true, this is s.Reset.
   630  // If userClosed is false, this is a reaction to a STOP_SENDING frame.
   631  func (s *Stream) resetInternal(code uint64, userClosed bool) {
   632  	s.outgate.lock()
   633  	defer s.outUnlock()
   634  	if s.IsReadOnly() {
   635  		return
   636  	}
   637  	if userClosed {
   638  		// Mark that the user closed the stream.
   639  		s.outclosed.set()
   640  	}
   641  	if s.outreset.isSet() {
   642  		return
   643  	}
   644  	if code > quicwire.MaxVarint {
   645  		code = quicwire.MaxVarint
   646  	}
   647  	// We could check here to see if the stream is closed and the
   648  	// peer has acked all the data and the FIN, but sending an
   649  	// extra RESET_STREAM in this case is harmless.
   650  	s.outreset.set()
   651  	s.outresetcode = code
   652  	s.outbufmu.Lock()
   653  	s.outbuf = nil
   654  	s.outbufoff = 0
   655  	s.outbufmu.Unlock()
   656  	s.out.discardBefore(s.out.end)
   657  	s.outunsent = rangeset[int64]{}
   658  	s.outblocked.clear()
   659  }
   660  
   661  // connHasClosed indicates the stream's conn has closed.
   662  func (s *Stream) connHasClosed() {
   663  	// If we're in the closing state, the user closed the conn.
   664  	// Otherwise, we the peer initiated the close.
   665  	// This only matters for the error we're going to return from stream operations.
   666  	localClose := s.conn.lifetime.state == connStateClosing
   667  
   668  	s.ingate.lock()
   669  	if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
   670  		if localClose {
   671  			s.inclosed.set()
   672  		} else {
   673  			s.inresetcode = streamResetByConnClose
   674  		}
   675  	}
   676  	s.inUnlock()
   677  
   678  	s.outgate.lock()
   679  	if localClose {
   680  		s.outclosed.set()
   681  		s.outreset.set()
   682  	} else {
   683  		s.outresetcode = streamResetByConnClose
   684  		s.outreset.setReceived()
   685  	}
   686  	s.outUnlock()
   687  }
   688  
   689  // inUnlock unlocks s.ingate.
   690  // It sets the gate condition if reads from s will not block.
   691  // If s has receive-related frames to write or if both directions
   692  // are done and the stream should be removed, it notifies the Conn.
   693  func (s *Stream) inUnlock() {
   694  	state := s.inUnlockNoQueue()
   695  	s.conn.maybeQueueStreamForSend(s, state)
   696  }
   697  
   698  // inUnlockNoQueue is inUnlock,
   699  // but reports whether s has frames to write rather than notifying the Conn.
   700  func (s *Stream) inUnlockNoQueue() streamState {
   701  	nextByte := s.in.start + int64(len(s.inbuf))
   702  	canRead := s.inset.contains(nextByte) || // data available to read
   703  		s.insize == s.in.start+int64(len(s.inbuf)) || // at EOF
   704  		s.inresetcode != -1 || // reset by peer
   705  		s.inclosed.isSet() // closed locally
   706  	defer s.ingate.unlock(canRead)
   707  	var state streamState
   708  	switch {
   709  	case s.IsWriteOnly():
   710  		state = streamInDone
   711  	case s.inresetcode != -1: // reset by peer
   712  		fallthrough
   713  	case s.in.start == s.insize: // all data received and read
   714  		// We don't increase MAX_STREAMS until the user calls ReadClose or Close,
   715  		// so the receive side is not finished until inclosed is set.
   716  		if s.inclosed.isSet() {
   717  			state = streamInDone
   718  		}
   719  	case s.insendmax.shouldSend(): // STREAM_MAX_DATA
   720  		state = streamInSendMeta
   721  	case s.inclosed.shouldSend(): // STOP_SENDING
   722  		state = streamInSendMeta
   723  	}
   724  	const mask = streamInDone | streamInSendMeta
   725  	return s.state.set(state, mask)
   726  }
   727  
   728  // outUnlock unlocks s.outgate.
   729  // It sets the gate condition if writes to s will not block.
   730  // If s has send-related frames to write or if both directions
   731  // are done and the stream should be removed, it notifies the Conn.
   732  func (s *Stream) outUnlock() {
   733  	state := s.outUnlockNoQueue()
   734  	s.conn.maybeQueueStreamForSend(s, state)
   735  }
   736  
   737  // outUnlockNoQueue is outUnlock,
   738  // but reports whether s has frames to write rather than notifying the Conn.
   739  func (s *Stream) outUnlockNoQueue() streamState {
   740  	isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked
   741  		s.outreset.isSet() // reset locally
   742  	if isDone {
   743  		select {
   744  		case <-s.outdone:
   745  		default:
   746  			if !s.IsReadOnly() {
   747  				close(s.outdone)
   748  			}
   749  		}
   750  	}
   751  	lim := s.out.start + s.outmaxbuf
   752  	canWrite := lim > s.out.end || // available send buffer
   753  		s.outclosed.isSet() || // closed locally
   754  		s.outreset.isSet() // reset locally
   755  	defer s.outgate.unlock(canWrite)
   756  	var state streamState
   757  	switch {
   758  	case s.IsReadOnly():
   759  		state = streamOutDone
   760  	case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end): // all data sent and acked
   761  		fallthrough
   762  	case s.outreset.isReceived(): // RESET_STREAM sent and acked
   763  		// We don't increase MAX_STREAMS until the user calls WriteClose or Close,
   764  		// so the send side is not finished until outclosed is set.
   765  		if s.outclosed.isSet() {
   766  			state = streamOutDone
   767  		}
   768  	case s.outreset.shouldSend(): // RESET_STREAM
   769  		state = streamOutSendMeta
   770  	case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged
   771  	case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
   772  		state = streamOutSendMeta
   773  	case len(s.outunsent) > 0: // STREAM frame with data
   774  		if s.outunsent.min() < s.outmaxsent {
   775  			state = streamOutSendMeta // resent data, will not consume flow control
   776  		} else {
   777  			state = streamOutSendData // new data, requires flow control
   778  		}
   779  	case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
   780  		state = streamOutSendMeta
   781  	case s.outopened.shouldSend(): // STREAM frame with no data
   782  		state = streamOutSendMeta
   783  	}
   784  	const mask = streamOutDone | streamOutSendMeta | streamOutSendData
   785  	return s.state.set(state, mask)
   786  }
   787  
   788  // handleData handles data received in a STREAM frame.
   789  func (s *Stream) handleData(off int64, b []byte, fin bool) error {
   790  	s.ingate.lock()
   791  	defer s.inUnlock()
   792  	end := off + int64(len(b))
   793  	if err := s.checkStreamBounds(end, fin); err != nil {
   794  		return err
   795  	}
   796  	if s.inclosed.isSet() || s.inresetcode != -1 {
   797  		// The user read-closed the stream, or the peer reset it.
   798  		// Either way, we can discard this frame.
   799  		return nil
   800  	}
   801  	if s.insize == -1 && end > s.in.end {
   802  		added := end - s.in.end
   803  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   804  			return err
   805  		}
   806  	}
   807  	if len(s.inset) > 0 && s.inset[0].contains(off) {
   808  		// We've received at least some of this data,
   809  		// and potentially moved it into s.inbuf
   810  		// (since it's part of the first range of received data).
   811  		// Avoid rewriting this data into s.in, since doing so could race
   812  		// with a reader reading the same data.
   813  		//
   814  		// (Note: We could apply additional checks here, to detect the peer
   815  		// sending us different data than we received the first time.
   816  		// We currently don't bother.)
   817  		newOff := min(end, s.inset[0].end)
   818  		b = b[newOff-off:]
   819  		off = newOff
   820  	}
   821  	s.in.writeAt(b, off)
   822  	s.inset.add(off, end)
   823  	if fin {
   824  		s.insize = end
   825  		// The peer has enough flow control window to send the entire stream.
   826  		s.insendmax.clear()
   827  	}
   828  	return nil
   829  }
   830  
   831  // handleReset handles a RESET_STREAM frame.
   832  func (s *Stream) handleReset(code uint64, finalSize int64) error {
   833  	s.ingate.lock()
   834  	defer s.inUnlock()
   835  	const fin = true
   836  	if err := s.checkStreamBounds(finalSize, fin); err != nil {
   837  		return err
   838  	}
   839  	if s.inresetcode != -1 {
   840  		// The stream was already reset.
   841  		return nil
   842  	}
   843  	if s.insize == -1 {
   844  		added := finalSize - s.in.end
   845  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   846  			return err
   847  		}
   848  	}
   849  	s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
   850  	s.in.discardBefore(s.in.end)
   851  	s.inresetcode = int64(code)
   852  	s.insize = finalSize
   853  	return nil
   854  }
   855  
   856  // checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame.
   857  func (s *Stream) checkStreamBounds(end int64, fin bool) error {
   858  	if end > s.inwin {
   859  		// The peer sent us data past the maximum flow control window we gave them.
   860  		return localTransportError{
   861  			code:   errFlowControl,
   862  			reason: "stream flow control window exceeded",
   863  		}
   864  	}
   865  	if s.insize != -1 && end > s.insize {
   866  		// The peer sent us data past the final size of the stream they previously gave us.
   867  		return localTransportError{
   868  			code:   errFinalSize,
   869  			reason: "data received past end of stream",
   870  		}
   871  	}
   872  	if fin && s.insize != -1 && end != s.insize {
   873  		// The peer changed the final size of the stream.
   874  		return localTransportError{
   875  			code:   errFinalSize,
   876  			reason: "final size of stream changed",
   877  		}
   878  	}
   879  	if fin && end < s.in.end {
   880  		// The peer has previously sent us data past the final size.
   881  		return localTransportError{
   882  			code:   errFinalSize,
   883  			reason: "end of stream occurs before prior data",
   884  		}
   885  	}
   886  	return nil
   887  }
   888  
   889  // handleStopSending handles a STOP_SENDING frame.
   890  func (s *Stream) handleStopSending(code uint64) error {
   891  	// Peer requests that we reset this stream.
   892  	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
   893  	const userReset = false
   894  	s.resetInternal(code, userReset)
   895  	return nil
   896  }
   897  
   898  // handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame.
   899  func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
   900  	s.outgate.lock()
   901  	defer s.outUnlock()
   902  	if maxStreamData <= s.outwin {
   903  		return nil
   904  	}
   905  	if s.outflushed > s.outwin {
   906  		s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
   907  	}
   908  	s.outwin = maxStreamData
   909  	if s.out.end > s.outwin {
   910  		// We've still got more data than flow control window.
   911  		s.outblocked.setUnsent()
   912  	} else {
   913  		s.outblocked.clear()
   914  	}
   915  	return nil
   916  }
   917  
   918  // ackOrLoss handles the fate of stream frames other than STREAM.
   919  func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
   920  	// Frames which carry new information each time they are sent
   921  	// (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked
   922  	// as received if the most recent packet carrying this frame is acked.
   923  	//
   924  	// Frames which are always the same (STOP_SENDING, RESET_STREAM)
   925  	// can be marked as received if any packet carrying this frame is acked.
   926  	switch ftype {
   927  	case frameTypeResetStream:
   928  		s.outgate.lock()
   929  		s.outreset.ackOrLoss(pnum, fate)
   930  		s.outUnlock()
   931  	case frameTypeStopSending:
   932  		s.ingate.lock()
   933  		s.inclosed.ackOrLoss(pnum, fate)
   934  		s.inUnlock()
   935  	case frameTypeMaxStreamData:
   936  		s.ingate.lock()
   937  		s.insendmax.ackLatestOrLoss(pnum, fate)
   938  		s.inUnlock()
   939  	case frameTypeStreamDataBlocked:
   940  		s.outgate.lock()
   941  		s.outblocked.ackLatestOrLoss(pnum, fate)
   942  		s.outUnlock()
   943  	default:
   944  		panic("unhandled frame type")
   945  	}
   946  }
   947  
   948  // ackOrLossData handles the fate of a STREAM frame.
   949  func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
   950  	s.outgate.lock()
   951  	defer s.outUnlock()
   952  	s.outopened.ackOrLoss(pnum, fate)
   953  	if fin {
   954  		s.outclosed.ackOrLoss(pnum, fate)
   955  	}
   956  	if s.outreset.isSet() {
   957  		// If the stream has been reset, we don't care any more.
   958  		return
   959  	}
   960  	switch fate {
   961  	case packetAcked:
   962  		s.outacked.add(start, end)
   963  		s.outunsent.sub(start, end)
   964  		// If this ack is for data at the start of the send buffer, we can now discard it.
   965  		if s.outacked.contains(s.out.start) {
   966  			s.out.discardBefore(s.outacked[0].end)
   967  		}
   968  	case packetLost:
   969  		// Mark everything lost, but not previously acked, as needing retransmission.
   970  		// We do this by adding all the lost bytes to outunsent, and then
   971  		// removing everything already acked.
   972  		s.outunsent.add(start, end)
   973  		for _, a := range s.outacked {
   974  			s.outunsent.sub(a.start, a.end)
   975  		}
   976  	}
   977  }
   978  
   979  // appendInFramesLocked appends STOP_SENDING and MAX_STREAM_DATA frames
   980  // to the current packet.
   981  //
   982  // It returns true if no more frames need appending,
   983  // false if not everything fit in the current packet.
   984  func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
   985  	if s.inclosed.shouldSendPTO(pto) {
   986  		// We don't currently have an API for setting the error code.
   987  		// Just send zero.
   988  		code := uint64(0)
   989  		if !w.appendStopSendingFrame(s.id, code) {
   990  			return false
   991  		}
   992  		s.inclosed.setSent(pnum)
   993  	}
   994  	// TODO: STOP_SENDING
   995  	if s.insendmax.shouldSendPTO(pto) {
   996  		// MAX_STREAM_DATA
   997  		maxStreamData := s.in.start + s.inmaxbuf
   998  		if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
   999  			return false
  1000  		}
  1001  		s.inwin = maxStreamData
  1002  		s.insendmax.setSent(pnum)
  1003  	}
  1004  	return true
  1005  }
  1006  
  1007  // appendOutFramesLocked appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames
  1008  // to the current packet.
  1009  //
  1010  // It returns true if no more frames need appending,
  1011  // false if not everything fit in the current packet.
  1012  func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
  1013  	if s.outreset.isSet() {
  1014  		// RESET_STREAM
  1015  		if s.outreset.shouldSendPTO(pto) {
  1016  			if !w.appendResetStreamFrame(s.id, s.outresetcode, s.outmaxsent) {
  1017  				return false
  1018  			}
  1019  			s.outreset.setSent(pnum)
  1020  			s.frameOpensStream(pnum)
  1021  		}
  1022  		return true
  1023  	}
  1024  	if s.outblocked.shouldSendPTO(pto) {
  1025  		// STREAM_DATA_BLOCKED
  1026  		if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
  1027  			return false
  1028  		}
  1029  		s.outblocked.setSent(pnum)
  1030  		s.frameOpensStream(pnum)
  1031  	}
  1032  	for {
  1033  		// STREAM
  1034  		off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
  1035  		if end := off + size; end > s.outmaxsent {
  1036  			// This will require connection-level flow control to send.
  1037  			end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
  1038  			end = max(end, off)
  1039  			size = end - off
  1040  		}
  1041  		fin := s.outclosed.isSet() && off+size == s.out.end
  1042  		shouldSend := size > 0 || // have data to send
  1043  			s.outopened.shouldSendPTO(pto) || // should open the stream
  1044  			(fin && s.outclosed.shouldSendPTO(pto)) // should close the stream
  1045  		if !shouldSend {
  1046  			return true
  1047  		}
  1048  		b, added := w.appendStreamFrame(s.id, off, int(size), fin)
  1049  		if !added {
  1050  			return false
  1051  		}
  1052  		s.out.copy(off, b)
  1053  		end := off + int64(len(b))
  1054  		if end > s.outmaxsent {
  1055  			s.conn.streams.outflow.consume(end - s.outmaxsent)
  1056  			s.outmaxsent = end
  1057  		}
  1058  		s.outunsent.sub(off, end)
  1059  		s.frameOpensStream(pnum)
  1060  		if fin {
  1061  			s.outclosed.setSent(pnum)
  1062  		}
  1063  		if pto {
  1064  			return true
  1065  		}
  1066  		if int64(len(b)) < size {
  1067  			return false
  1068  		}
  1069  	}
  1070  }
  1071  
  1072  // frameOpensStream records that we're sending a frame that will open the stream.
  1073  //
  1074  // If we don't have an acknowledgement from the peer for a previous frame opening the stream,
  1075  // record this packet as being the latest one to open it.
  1076  func (s *Stream) frameOpensStream(pnum packetNumber) {
  1077  	if !s.outopened.isReceived() {
  1078  		s.outopened.setSent(pnum)
  1079  	}
  1080  }
  1081  
  1082  // dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
  1083  func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
  1084  	switch {
  1085  	case pto:
  1086  		// On PTO, resend unacked data that fits in the probe packet.
  1087  		// For simplicity, we send the range starting at s.out.start
  1088  		// (which is definitely unacked, or else we would have discarded it)
  1089  		// up to the next acked byte (if any).
  1090  		//
  1091  		// This may miss unacked data starting after that acked byte,
  1092  		// but avoids resending data the peer has acked.
  1093  		for _, r := range outacked {
  1094  			if r.start > start {
  1095  				return start, r.start - start
  1096  			}
  1097  		}
  1098  		return start, end - start
  1099  	case outunsent.numRanges() > 0:
  1100  		return outunsent.min(), outunsent[0].size()
  1101  	default:
  1102  		return end, 0
  1103  	}
  1104  }
  1105  

View as plain text