Source file src/vendor/golang.org/x/net/quic/conn_flow.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package quic
     6  
     7  import (
     8  	"sync/atomic"
     9  	"time"
    10  )
    11  
    12  // connInflow tracks connection-level flow control for data sent by the peer to us.
    13  //
    14  // There are four byte offsets of significance in the stream of data received from the peer,
    15  // each >= to the previous:
    16  //
    17  //   - bytes read by the user
    18  //   - bytes received from the peer
    19  //   - limit sent to the peer in a MAX_DATA frame
    20  //   - potential new limit to sent to the peer
    21  //
    22  // We maintain a flow control window, so as bytes are read by the user
    23  // the potential limit is extended correspondingly.
    24  //
    25  // We keep an atomic counter of bytes read by the user and not yet applied to the
    26  // potential limit (credit). When this count grows large enough, we update the
    27  // new limit to send and mark that we need to send a new MAX_DATA frame.
    28  type connInflow struct {
    29  	sent      sentVal // set when we need to send a MAX_DATA update to the peer
    30  	usedLimit int64   // total bytes sent by the peer, must be less than sentLimit
    31  	sentLimit int64   // last MAX_DATA sent to the peer
    32  	newLimit  int64   // new MAX_DATA to send
    33  
    34  	credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
    35  }
    36  
    37  func (c *Conn) inflowInit() {
    38  	// The initial MAX_DATA limit is sent as a transport parameter.
    39  	c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
    40  	c.streams.inflow.newLimit = c.streams.inflow.sentLimit
    41  }
    42  
    43  // handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
    44  // We may extend the peer's flow control window.
    45  //
    46  // This is called indirectly by the user, via Read or CloseRead.
    47  func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
    48  	if n == 0 {
    49  		return
    50  	}
    51  	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
    52  		// We should send a MAX_DATA update to the peer.
    53  		// Record this on the Conn's main loop.
    54  		c.sendMsg(func(now time.Time, c *Conn) {
    55  			// A MAX_DATA update may have already happened, so check again.
    56  			if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
    57  				c.sendMaxDataUpdate()
    58  			}
    59  		})
    60  	}
    61  }
    62  
    63  // handleStreamBytesReadOnLoop extends the peer's flow control window after
    64  // data has been discarded due to a RESET_STREAM frame.
    65  //
    66  // This is called on the conn's loop.
    67  func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
    68  	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
    69  		c.sendMaxDataUpdate()
    70  	}
    71  }
    72  
    73  func (c *Conn) sendMaxDataUpdate() {
    74  	c.streams.inflow.sent.setUnsent()
    75  	// Apply current credit to the limit.
    76  	// We don't strictly need to do this here
    77  	// since appendMaxDataFrame will do so as well,
    78  	// but this avoids redundant trips down this path
    79  	// if the MAX_DATA frame doesn't go out right away.
    80  	c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
    81  }
    82  
    83  func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
    84  	return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
    85  }
    86  
    87  // handleStreamBytesReceived records that the peer has sent us stream data.
    88  func (c *Conn) handleStreamBytesReceived(n int64) error {
    89  	c.streams.inflow.usedLimit += n
    90  	if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
    91  		return localTransportError{
    92  			code:   errFlowControl,
    93  			reason: "stream exceeded flow control limit",
    94  		}
    95  	}
    96  	return nil
    97  }
    98  
    99  // appendMaxDataFrame appends a MAX_DATA frame to the current packet.
   100  //
   101  // It returns true if no more frames need appending,
   102  // false if it could not fit a frame in the current packet.
   103  func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
   104  	if c.streams.inflow.sent.shouldSendPTO(pto) {
   105  		// Add any unapplied credit to the new limit now.
   106  		c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
   107  		if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
   108  			return false
   109  		}
   110  		c.streams.inflow.sentLimit += c.streams.inflow.newLimit
   111  		c.streams.inflow.sent.setSent(pnum)
   112  	}
   113  	return true
   114  }
   115  
   116  // ackOrLossMaxData records the fate of a MAX_DATA frame.
   117  func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
   118  	c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
   119  }
   120  
   121  // connOutflow tracks connection-level flow control for data sent by us to the peer.
   122  type connOutflow struct {
   123  	max  int64 // largest MAX_DATA received from peer
   124  	used int64 // total bytes of STREAM data sent to peer
   125  }
   126  
   127  // setMaxData updates the connection-level flow control limit
   128  // with the initial limit conveyed in transport parameters
   129  // or an update from a MAX_DATA frame.
   130  func (f *connOutflow) setMaxData(maxData int64) {
   131  	f.max = max(f.max, maxData)
   132  }
   133  
   134  // avail returns the number of connection-level flow control bytes available.
   135  func (f *connOutflow) avail() int64 {
   136  	return f.max - f.used
   137  }
   138  
   139  // consume records consumption of n bytes of flow.
   140  func (f *connOutflow) consume(n int64) {
   141  	f.used += n
   142  }
   143  

View as plain text