Source file src/vendor/golang.org/x/net/quic/stream_limits.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package quic
     6  
     7  import (
     8  	"context"
     9  )
    10  
    11  // Limits on the number of open streams.
    12  // Every connection has separate limits for bidirectional and unidirectional streams.
    13  //
    14  // Note that the MAX_STREAMS limit includes closed as well as open streams.
    15  // Closing a stream doesn't enable an endpoint to open a new one;
    16  // only an increase in the MAX_STREAMS limit does.
    17  
    18  // localStreamLimits are limits on the number of open streams created by us.
    19  type localStreamLimits struct {
    20  	gate   gate
    21  	max    int64 // peer-provided MAX_STREAMS
    22  	opened int64 // number of streams opened by us, -1 when conn is closed
    23  }
    24  
    25  func (lim *localStreamLimits) init() {
    26  	lim.gate = newGate()
    27  }
    28  
    29  // open creates a new local stream, blocking until MAX_STREAMS quota is available.
    30  func (lim *localStreamLimits) open(ctx context.Context, c *Conn) (num int64, err error) {
    31  	// TODO: Send a STREAMS_BLOCKED when blocked.
    32  	if err := lim.gate.waitAndLock(ctx); err != nil {
    33  		return 0, err
    34  	}
    35  	defer lim.unlock()
    36  	if lim.opened < 0 {
    37  		return 0, errConnClosed
    38  	}
    39  	num = lim.opened
    40  	lim.opened++
    41  	return num, nil
    42  }
    43  
    44  // unlock is a wrapper around lim.gate.unlock. This should be used in lieu of
    45  // lim.gate.unlock directly so that out gate-state-setting logic is consistent
    46  // across multiple calls.
    47  func (lim *localStreamLimits) unlock() {
    48  	lim.gate.unlock(lim.opened < lim.max)
    49  }
    50  
    51  // wasOpened reports whether the given stream was opened by us.
    52  func (lim *localStreamLimits) wasOpened(num int64) bool {
    53  	lim.gate.lock()
    54  	defer lim.unlock()
    55  	return num < lim.opened
    56  }
    57  
    58  // connHasClosed indicates the connection has been closed, locally or by the peer.
    59  func (lim *localStreamLimits) connHasClosed() {
    60  	lim.gate.lock()
    61  	lim.opened = -1
    62  	lim.unlock()
    63  }
    64  
    65  // setMax sets the MAX_STREAMS provided by the peer.
    66  func (lim *localStreamLimits) setMax(maxStreams int64) {
    67  	lim.gate.lock()
    68  	lim.max = max(lim.max, maxStreams)
    69  	lim.unlock()
    70  }
    71  
    72  // remoteStreamLimits are limits on the number of open streams created by the peer.
    73  type remoteStreamLimits struct {
    74  	max     int64   // last MAX_STREAMS sent to the peer
    75  	opened  int64   // number of streams opened by the peer (including subsequently closed ones)
    76  	closed  int64   // number of peer streams in the "closed" state
    77  	maxOpen int64   // how many streams we want to let the peer simultaneously open
    78  	sendMax sentVal // set when we should send MAX_STREAMS
    79  }
    80  
    81  func (lim *remoteStreamLimits) init(maxOpen int64) {
    82  	lim.maxOpen = maxOpen
    83  	lim.max = min(maxOpen, implicitStreamLimit) // initial limit sent in transport parameters
    84  	lim.opened = 0
    85  }
    86  
    87  // open handles the peer opening a new stream.
    88  func (lim *remoteStreamLimits) open(id streamID) error {
    89  	num := id.num()
    90  	if num >= lim.max {
    91  		return localTransportError{
    92  			code:   errStreamLimit,
    93  			reason: "stream limit exceeded",
    94  		}
    95  	}
    96  	if num >= lim.opened {
    97  		lim.opened = num + 1
    98  		lim.maybeUpdateMax()
    99  	}
   100  	return nil
   101  }
   102  
   103  // close handles the peer closing an open stream.
   104  func (lim *remoteStreamLimits) close() {
   105  	lim.closed++
   106  	lim.maybeUpdateMax()
   107  }
   108  
   109  // maybeUpdateMax updates the MAX_STREAMS value we will send to the peer.
   110  func (lim *remoteStreamLimits) maybeUpdateMax() {
   111  	newMax := min(
   112  		// Max streams the peer can have open at once.
   113  		lim.closed+lim.maxOpen,
   114  		// Max streams the peer can open with a single frame.
   115  		lim.opened+implicitStreamLimit,
   116  	)
   117  	avail := lim.max - lim.opened
   118  	if newMax > lim.max && (avail < 8 || newMax-lim.max >= 2*avail) {
   119  		// If the peer has less than 8 streams, or if increasing the peer's
   120  		// stream limit would double it, then send a MAX_STREAMS.
   121  		lim.max = newMax
   122  		lim.sendMax.setUnsent()
   123  	}
   124  }
   125  
   126  // appendFrame appends a MAX_STREAMS frame to the current packet, if necessary.
   127  //
   128  // It returns true if no more frames need appending,
   129  // false if not everything fit in the current packet.
   130  func (lim *remoteStreamLimits) appendFrame(w *packetWriter, typ streamType, pnum packetNumber, pto bool) bool {
   131  	if lim.sendMax.shouldSendPTO(pto) {
   132  		if !w.appendMaxStreamsFrame(typ, lim.max) {
   133  			return false
   134  		}
   135  		lim.sendMax.setSent(pnum)
   136  	}
   137  	return true
   138  }
   139  

View as plain text