Source file src/vendor/golang.org/x/net/quic/conn_streams.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package quic
     6  
     7  import (
     8  	"context"
     9  	"sync"
    10  	"sync/atomic"
    11  	"time"
    12  )
    13  
    14  type streamsState struct {
    15  	queue queue[*Stream] // new, peer-created streams
    16  
    17  	// All peer-created streams.
    18  	//
    19  	// Implicitly created streams are included as an empty entry in the map.
    20  	// (For example, if we receive a frame for stream 4, we implicitly create stream 0 and
    21  	// insert an empty entry for it to the map.)
    22  	//
    23  	// The map value is maybeStream rather than *Stream as a reminder that values can be nil.
    24  	streams map[streamID]maybeStream
    25  
    26  	// Limits on the number of streams, indexed by streamType.
    27  	localLimit  [streamTypeCount]localStreamLimits
    28  	remoteLimit [streamTypeCount]remoteStreamLimits
    29  
    30  	// Peer configuration provided in transport parameters.
    31  	peerInitialMaxStreamDataRemote    [streamTypeCount]int64 // streams opened by us
    32  	peerInitialMaxStreamDataBidiLocal int64                  // streams opened by them
    33  
    34  	// Connection-level flow control.
    35  	inflow  connInflow
    36  	outflow connOutflow
    37  
    38  	// Streams with frames to send are stored in one of two circular linked lists,
    39  	// depending on whether they require connection-level flow control.
    40  	needSend  atomic.Bool
    41  	sendMu    sync.Mutex
    42  	queueMeta streamRing // streams with any non-flow-controlled frames
    43  	queueData streamRing // streams with only flow-controlled frames
    44  }
    45  
    46  // maybeStream is a possibly nil *Stream. See streamsState.streams.
    47  type maybeStream struct {
    48  	s *Stream
    49  }
    50  
    51  func (c *Conn) streamsInit() {
    52  	c.streams.streams = make(map[streamID]maybeStream)
    53  	c.streams.queue = newQueue[*Stream]()
    54  	c.streams.localLimit[bidiStream].init()
    55  	c.streams.localLimit[uniStream].init()
    56  	c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
    57  	c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
    58  	c.inflowInit()
    59  }
    60  
    61  func (c *Conn) streamsCleanup() {
    62  	c.streams.queue.close(errConnClosed)
    63  	c.streams.localLimit[bidiStream].connHasClosed()
    64  	c.streams.localLimit[uniStream].connHasClosed()
    65  	for _, s := range c.streams.streams {
    66  		if s.s != nil {
    67  			s.s.connHasClosed()
    68  		}
    69  	}
    70  }
    71  
    72  // AcceptStream waits for and returns the next stream created by the peer.
    73  func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) {
    74  	return c.streams.queue.get(ctx)
    75  }
    76  
    77  // NewStream creates a stream.
    78  //
    79  // If the peer's maximum stream limit for the connection has been reached,
    80  // NewStream blocks until the limit is increased or the context expires.
    81  func (c *Conn) NewStream(ctx context.Context) (*Stream, error) {
    82  	return c.newLocalStream(ctx, bidiStream)
    83  }
    84  
    85  // NewSendOnlyStream creates a unidirectional, send-only stream.
    86  //
    87  // If the peer's maximum stream limit for the connection has been reached,
    88  // NewSendOnlyStream blocks until the limit is increased or the context expires.
    89  func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) {
    90  	return c.newLocalStream(ctx, uniStream)
    91  }
    92  
    93  func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
    94  	num, err := c.streams.localLimit[styp].open(ctx, c)
    95  	if err != nil {
    96  		return nil, err
    97  	}
    98  
    99  	s := newStream(c, newStreamID(c.side, styp, num))
   100  	s.outmaxbuf = c.config.maxStreamWriteBufferSize()
   101  	s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
   102  	if styp == bidiStream {
   103  		s.inmaxbuf = c.config.maxStreamReadBufferSize()
   104  		s.inwin = c.config.maxStreamReadBufferSize()
   105  	}
   106  	s.inUnlock()
   107  	s.outUnlock()
   108  
   109  	// Modify c.streams on the conn's loop.
   110  	if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
   111  		c.streams.streams[s.id] = maybeStream{s}
   112  	}); err != nil {
   113  		return nil, err
   114  	}
   115  	return s, nil
   116  }
   117  
   118  // streamFrameType identifies which direction of a stream,
   119  // from the local perspective, a frame is associated with.
   120  //
   121  // For example, STREAM is a recvStream frame,
   122  // because it carries data from the peer to us.
   123  type streamFrameType uint8
   124  
   125  const (
   126  	sendStream = streamFrameType(iota) // for example, MAX_DATA
   127  	recvStream                         // for example, STREAM_DATA_BLOCKED
   128  )
   129  
   130  // streamForID returns the stream with the given id.
   131  // If the stream does not exist, it returns nil.
   132  func (c *Conn) streamForID(id streamID) *Stream {
   133  	return c.streams.streams[id].s
   134  }
   135  
   136  // streamForFrame returns the stream with the given id.
   137  // If the stream does not exist, it may be created.
   138  //
   139  // streamForFrame aborts the connection if the stream id, state, and frame type don't align.
   140  // For example, it aborts the connection with a STREAM_STATE error if a MAX_DATA frame
   141  // is received for a receive-only stream, or if the peer attempts to create a stream that
   142  // should be originated locally.
   143  //
   144  // streamForFrame returns nil if the stream no longer exists or if an error occurred.
   145  func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream {
   146  	if id.streamType() == uniStream {
   147  		if (id.initiator() == c.side) != (ftype == sendStream) {
   148  			// Received an invalid frame for unidirectional stream.
   149  			// For example, a RESET_STREAM frame for a send-only stream.
   150  			c.abort(now, localTransportError{
   151  				code:   errStreamState,
   152  				reason: "invalid frame for unidirectional stream",
   153  			})
   154  			return nil
   155  		}
   156  	}
   157  
   158  	ms, isOpen := c.streams.streams[id]
   159  	if ms.s != nil {
   160  		return ms.s
   161  	}
   162  
   163  	num := id.num()
   164  	styp := id.streamType()
   165  	if id.initiator() == c.side {
   166  		// This stream was created by us, and has been closed.
   167  		if c.streams.localLimit[styp].wasOpened(num) {
   168  			return nil
   169  		}
   170  		// Received a frame for a stream that should be originated by us,
   171  		// but which we never created.
   172  		c.abort(now, localTransportError{
   173  			code:   errStreamState,
   174  			reason: "received frame for unknown stream",
   175  		})
   176  		return nil
   177  	} else {
   178  		// if isOpen, this is a stream that was implicitly opened by a
   179  		// previous frame for a larger-numbered stream, but we haven't
   180  		// actually created it yet.
   181  		if !isOpen && num < c.streams.remoteLimit[styp].opened {
   182  			// This stream was created by the peer, and has been closed.
   183  			return nil
   184  		}
   185  	}
   186  
   187  	prevOpened := c.streams.remoteLimit[styp].opened
   188  	if err := c.streams.remoteLimit[styp].open(id); err != nil {
   189  		c.abort(now, err)
   190  		return nil
   191  	}
   192  
   193  	// Receiving a frame for a stream implicitly creates all streams
   194  	// with the same initiator and type and a lower number.
   195  	// Add a nil entry to the streams map for each implicitly created stream.
   196  	for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
   197  		c.streams.streams[n] = maybeStream{}
   198  	}
   199  
   200  	s := newStream(c, id)
   201  	s.inmaxbuf = c.config.maxStreamReadBufferSize()
   202  	s.inwin = c.config.maxStreamReadBufferSize()
   203  	if id.streamType() == bidiStream {
   204  		s.outmaxbuf = c.config.maxStreamWriteBufferSize()
   205  		s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
   206  	}
   207  	s.inUnlock()
   208  	s.outUnlock()
   209  
   210  	c.streams.streams[id] = maybeStream{s}
   211  	c.streams.queue.put(s)
   212  	return s
   213  }
   214  
   215  // maybeQueueStreamForSend marks a stream as containing frames that need sending.
   216  func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
   217  	if state.wantQueue() == state.inQueue() {
   218  		return // already on the right queue
   219  	}
   220  	c.streams.sendMu.Lock()
   221  	defer c.streams.sendMu.Unlock()
   222  	state = s.state.load() // may have changed while waiting
   223  	c.queueStreamForSendLocked(s, state)
   224  
   225  	c.streams.needSend.Store(true)
   226  	c.wake()
   227  }
   228  
   229  // queueStreamForSendLocked moves a stream to the correct send queue,
   230  // or removes it from all queues.
   231  //
   232  // state is the last known stream state.
   233  func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
   234  	for {
   235  		wantQueue := state.wantQueue()
   236  		inQueue := state.inQueue()
   237  		if inQueue == wantQueue {
   238  			return // already on the right queue
   239  		}
   240  
   241  		switch inQueue {
   242  		case metaQueue:
   243  			c.streams.queueMeta.remove(s)
   244  		case dataQueue:
   245  			c.streams.queueData.remove(s)
   246  		}
   247  
   248  		switch wantQueue {
   249  		case metaQueue:
   250  			c.streams.queueMeta.append(s)
   251  			state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
   252  		case dataQueue:
   253  			c.streams.queueData.append(s)
   254  			state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
   255  		case noQueue:
   256  			state = s.state.set(0, streamQueueMeta|streamQueueData)
   257  		}
   258  
   259  		// If the stream state changed while we were moving the stream,
   260  		// we might now be on the wrong queue.
   261  		//
   262  		// For example:
   263  		//   - stream has data to send: streamOutSendData|streamQueueData
   264  		//   - appendStreamFrames sends all the data: streamQueueData
   265  		//   - concurrently, more data is written: streamOutSendData|streamQueueData
   266  		//   - appendStreamFrames calls us with the last state it observed
   267  		//     (streamQueueData).
   268  		//   - We remove the stream from the queue and observe the updated state:
   269  		//     streamOutSendData
   270  		//   - We realize that the stream needs to go back on the data queue.
   271  		//
   272  		// Go back around the loop to confirm we're on the correct queue.
   273  	}
   274  }
   275  
   276  // appendStreamFrames writes stream-related frames to the current packet.
   277  //
   278  // It returns true if no more frames need appending,
   279  // false if not everything fit in the current packet.
   280  func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
   281  	// MAX_DATA
   282  	if !c.appendMaxDataFrame(w, pnum, pto) {
   283  		return false
   284  	}
   285  
   286  	if pto {
   287  		return c.appendStreamFramesPTO(w, pnum)
   288  	}
   289  	if !c.streams.needSend.Load() {
   290  		// If queueMeta includes newly-finished streams, we may extend the peer's
   291  		// stream limits. When there are no streams to process, add MAX_STREAMS
   292  		// frames here. Otherwise, wait until after we've processed queueMeta.
   293  		return c.appendMaxStreams(w, pnum, pto)
   294  	}
   295  	c.streams.sendMu.Lock()
   296  	defer c.streams.sendMu.Unlock()
   297  	// queueMeta contains streams with non-flow-controlled frames to send.
   298  	for c.streams.queueMeta.head != nil {
   299  		s := c.streams.queueMeta.head
   300  		state := s.state.load()
   301  		if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
   302  			panic("BUG: queueMeta stream is not streamQueueMeta")
   303  		}
   304  		if state&streamInSendMeta != 0 {
   305  			s.ingate.lock()
   306  			ok := s.appendInFramesLocked(w, pnum, pto)
   307  			state = s.inUnlockNoQueue()
   308  			if !ok {
   309  				return false
   310  			}
   311  			if state&streamInSendMeta != 0 {
   312  				panic("BUG: streamInSendMeta set after successfully appending frames")
   313  			}
   314  		}
   315  		if state&streamOutSendMeta != 0 {
   316  			s.outgate.lock()
   317  			// This might also append flow-controlled frames if we have any
   318  			// and available conn-level quota. That's fine.
   319  			ok := s.appendOutFramesLocked(w, pnum, pto)
   320  			state = s.outUnlockNoQueue()
   321  			// We're checking both ok and state, because appendOutFramesLocked
   322  			// might have filled up the packet with flow-controlled data.
   323  			// If so, we want to move the stream to queueData for any remaining frames.
   324  			if !ok && state&streamOutSendMeta != 0 {
   325  				return false
   326  			}
   327  			if state&streamOutSendMeta != 0 {
   328  				panic("BUG: streamOutSendMeta set after successfully appending frames")
   329  			}
   330  		}
   331  		// We've sent all frames for this stream, so remove it from the send queue.
   332  		c.streams.queueMeta.remove(s)
   333  		if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
   334  			// Stream is finished, remove it from the conn.
   335  			state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
   336  			delete(c.streams.streams, s.id)
   337  
   338  			// Record finalization of remote streams, to know when
   339  			// to extend the peer's stream limit.
   340  			if s.id.initiator() != c.side {
   341  				c.streams.remoteLimit[s.id.streamType()].close()
   342  			}
   343  		} else {
   344  			state = s.state.set(0, streamQueueMeta|streamConnRemoved)
   345  		}
   346  		// The stream may have flow-controlled data to send,
   347  		// or something might have added non-flow-controlled frames after we
   348  		// unlocked the stream.
   349  		// If so, put the stream back on a queue.
   350  		c.queueStreamForSendLocked(s, state)
   351  	}
   352  
   353  	// MAX_STREAMS (possibly triggered by finalization of remote streams above).
   354  	if !c.appendMaxStreams(w, pnum, pto) {
   355  		return false
   356  	}
   357  
   358  	// queueData contains streams with flow-controlled frames.
   359  	for c.streams.queueData.head != nil {
   360  		avail := c.streams.outflow.avail()
   361  		if avail == 0 {
   362  			break // no flow control quota available
   363  		}
   364  		s := c.streams.queueData.head
   365  		s.outgate.lock()
   366  		ok := s.appendOutFramesLocked(w, pnum, pto)
   367  		state := s.outUnlockNoQueue()
   368  		if !ok {
   369  			// We've sent some data for this stream, but it still has more to send.
   370  			// If the stream got a reasonable chance to put data in a packet,
   371  			// advance sendHead to the next stream in line, to avoid starvation.
   372  			// We'll come back to this stream after going through the others.
   373  			//
   374  			// If the packet was already mostly out of space, leave sendHead alone
   375  			// and come back to this stream again on the next packet.
   376  			if avail > 512 {
   377  				c.streams.queueData.head = s.next
   378  			}
   379  			return false
   380  		}
   381  		if state&streamQueueData == 0 {
   382  			panic("BUG: queueData stream is not streamQueueData")
   383  		}
   384  		if state&streamOutSendData != 0 {
   385  			// We must have run out of connection-level flow control:
   386  			// appendOutFramesLocked says it wrote all it can, but there's
   387  			// still data to send.
   388  			//
   389  			// Advance sendHead to the next stream in line to avoid starvation.
   390  			if c.streams.outflow.avail() != 0 {
   391  				panic("BUG: streamOutSendData set and flow control available after send")
   392  			}
   393  			c.streams.queueData.head = s.next
   394  			return true
   395  		}
   396  		c.streams.queueData.remove(s)
   397  		state = s.state.set(0, streamQueueData)
   398  		c.queueStreamForSendLocked(s, state)
   399  	}
   400  	if c.streams.queueMeta.head == nil && c.streams.queueData.head == nil {
   401  		c.streams.needSend.Store(false)
   402  	}
   403  	return true
   404  }
   405  
   406  // appendStreamFramesPTO writes stream-related frames to the current packet
   407  // for a PTO probe.
   408  //
   409  // It returns true if no more frames need appending,
   410  // false if not everything fit in the current packet.
   411  func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
   412  	const pto = true
   413  	if !c.appendMaxStreams(w, pnum, pto) {
   414  		return false
   415  	}
   416  	c.streams.sendMu.Lock()
   417  	defer c.streams.sendMu.Unlock()
   418  	for _, ms := range c.streams.streams {
   419  		s := ms.s
   420  		if s == nil {
   421  			continue
   422  		}
   423  		const pto = true
   424  		s.ingate.lock()
   425  		inOK := s.appendInFramesLocked(w, pnum, pto)
   426  		s.inUnlockNoQueue()
   427  		if !inOK {
   428  			return false
   429  		}
   430  
   431  		s.outgate.lock()
   432  		outOK := s.appendOutFramesLocked(w, pnum, pto)
   433  		s.outUnlockNoQueue()
   434  		if !outOK {
   435  			return false
   436  		}
   437  	}
   438  	return true
   439  }
   440  
   441  func (c *Conn) appendMaxStreams(w *packetWriter, pnum packetNumber, pto bool) bool {
   442  	if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
   443  		return false
   444  	}
   445  	if !c.streams.remoteLimit[bidiStream].appendFrame(w, bidiStream, pnum, pto) {
   446  		return false
   447  	}
   448  	return true
   449  }
   450  
   451  // A streamRing is a circular linked list of streams.
   452  type streamRing struct {
   453  	head *Stream
   454  }
   455  
   456  // remove removes s from the ring.
   457  // s must be on the ring.
   458  func (r *streamRing) remove(s *Stream) {
   459  	if s.next == s {
   460  		r.head = nil // s was the last stream in the ring
   461  	} else {
   462  		s.prev.next = s.next
   463  		s.next.prev = s.prev
   464  		if r.head == s {
   465  			r.head = s.next
   466  		}
   467  	}
   468  }
   469  
   470  // append places s at the last position in the ring.
   471  // s must not be attached to any ring.
   472  func (r *streamRing) append(s *Stream) {
   473  	if r.head == nil {
   474  		r.head = s
   475  		s.next = s
   476  		s.prev = s
   477  	} else {
   478  		s.prev = r.head.prev
   479  		s.next = r.head
   480  		s.prev.next = s
   481  		s.next.prev = s
   482  	}
   483  }
   484  

View as plain text