Source file src/vendor/golang.org/x/net/quic/conn_flow.go
1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package quic 6 7 import ( 8 "sync/atomic" 9 "time" 10 ) 11 12 // connInflow tracks connection-level flow control for data sent by the peer to us. 13 // 14 // There are four byte offsets of significance in the stream of data received from the peer, 15 // each >= to the previous: 16 // 17 // - bytes read by the user 18 // - bytes received from the peer 19 // - limit sent to the peer in a MAX_DATA frame 20 // - potential new limit to sent to the peer 21 // 22 // We maintain a flow control window, so as bytes are read by the user 23 // the potential limit is extended correspondingly. 24 // 25 // We keep an atomic counter of bytes read by the user and not yet applied to the 26 // potential limit (credit). When this count grows large enough, we update the 27 // new limit to send and mark that we need to send a new MAX_DATA frame. 28 type connInflow struct { 29 sent sentVal // set when we need to send a MAX_DATA update to the peer 30 usedLimit int64 // total bytes sent by the peer, must be less than sentLimit 31 sentLimit int64 // last MAX_DATA sent to the peer 32 newLimit int64 // new MAX_DATA to send 33 34 credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window 35 } 36 37 func (c *Conn) inflowInit() { 38 // The initial MAX_DATA limit is sent as a transport parameter. 39 c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize() 40 c.streams.inflow.newLimit = c.streams.inflow.sentLimit 41 } 42 43 // handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream. 44 // We may extend the peer's flow control window. 45 // 46 // This is called indirectly by the user, via Read or CloseRead. 47 func (c *Conn) handleStreamBytesReadOffLoop(n int64) { 48 if n == 0 { 49 return 50 } 51 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { 52 // We should send a MAX_DATA update to the peer. 53 // Record this on the Conn's main loop. 54 c.sendMsg(func(now time.Time, c *Conn) { 55 // A MAX_DATA update may have already happened, so check again. 56 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) { 57 c.sendMaxDataUpdate() 58 } 59 }) 60 } 61 } 62 63 // handleStreamBytesReadOnLoop extends the peer's flow control window after 64 // data has been discarded due to a RESET_STREAM frame. 65 // 66 // This is called on the conn's loop. 67 func (c *Conn) handleStreamBytesReadOnLoop(n int64) { 68 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { 69 c.sendMaxDataUpdate() 70 } 71 } 72 73 func (c *Conn) sendMaxDataUpdate() { 74 c.streams.inflow.sent.setUnsent() 75 // Apply current credit to the limit. 76 // We don't strictly need to do this here 77 // since appendMaxDataFrame will do so as well, 78 // but this avoids redundant trips down this path 79 // if the MAX_DATA frame doesn't go out right away. 80 c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) 81 } 82 83 func (c *Conn) shouldUpdateFlowControl(credit int64) bool { 84 return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit) 85 } 86 87 // handleStreamBytesReceived records that the peer has sent us stream data. 88 func (c *Conn) handleStreamBytesReceived(n int64) error { 89 c.streams.inflow.usedLimit += n 90 if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit { 91 return localTransportError{ 92 code: errFlowControl, 93 reason: "stream exceeded flow control limit", 94 } 95 } 96 return nil 97 } 98 99 // appendMaxDataFrame appends a MAX_DATA frame to the current packet. 100 // 101 // It returns true if no more frames need appending, 102 // false if it could not fit a frame in the current packet. 103 func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool { 104 if c.streams.inflow.sent.shouldSendPTO(pto) { 105 // Add any unapplied credit to the new limit now. 106 c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) 107 if !w.appendMaxDataFrame(c.streams.inflow.newLimit) { 108 return false 109 } 110 c.streams.inflow.sentLimit += c.streams.inflow.newLimit 111 c.streams.inflow.sent.setSent(pnum) 112 } 113 return true 114 } 115 116 // ackOrLossMaxData records the fate of a MAX_DATA frame. 117 func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) { 118 c.streams.inflow.sent.ackLatestOrLoss(pnum, fate) 119 } 120 121 // connOutflow tracks connection-level flow control for data sent by us to the peer. 122 type connOutflow struct { 123 max int64 // largest MAX_DATA received from peer 124 used int64 // total bytes of STREAM data sent to peer 125 } 126 127 // setMaxData updates the connection-level flow control limit 128 // with the initial limit conveyed in transport parameters 129 // or an update from a MAX_DATA frame. 130 func (f *connOutflow) setMaxData(maxData int64) { 131 f.max = max(f.max, maxData) 132 } 133 134 // avail returns the number of connection-level flow control bytes available. 135 func (f *connOutflow) avail() int64 { 136 return f.max - f.used 137 } 138 139 // consume records consumption of n bytes of flow. 140 func (f *connOutflow) consume(n int64) { 141 f.used += n 142 } 143