Source file src/vendor/golang.org/x/net/quic/pipe.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package quic
     6  
     7  import (
     8  	"sync"
     9  )
    10  
    11  // A pipe is a byte buffer used in implementing streams.
    12  //
    13  // A pipe contains a window of stream data.
    14  // Random access reads and writes are supported within the window.
    15  // Writing past the end of the window extends it.
    16  // Data may be discarded from the start of the pipe, advancing the window.
    17  type pipe struct {
    18  	start int64    // stream position of first stored byte
    19  	end   int64    // stream position just past the last stored byte
    20  	head  *pipebuf // if non-nil, then head.off + len(head.b) > start
    21  	tail  *pipebuf // if non-nil, then tail.off + len(tail.b) == end
    22  }
    23  
    24  type pipebuf struct {
    25  	off  int64 // stream position of b[0]
    26  	b    []byte
    27  	next *pipebuf
    28  }
    29  
    30  func (pb *pipebuf) end() int64 {
    31  	return pb.off + int64(len(pb.b))
    32  }
    33  
    34  var pipebufPool = sync.Pool{
    35  	New: func() any {
    36  		return &pipebuf{
    37  			b: make([]byte, 4096),
    38  		}
    39  	},
    40  }
    41  
    42  func newPipebuf() *pipebuf {
    43  	return pipebufPool.Get().(*pipebuf)
    44  }
    45  
    46  func (b *pipebuf) recycle() {
    47  	b.off = 0
    48  	b.next = nil
    49  	pipebufPool.Put(b)
    50  }
    51  
    52  // writeAt writes len(b) bytes to the pipe at offset off.
    53  //
    54  // Writes to offsets before p.start are discarded.
    55  // Writes to offsets after p.end extend the pipe window.
    56  func (p *pipe) writeAt(b []byte, off int64) {
    57  	end := off + int64(len(b))
    58  	if end > p.end {
    59  		p.end = end
    60  	} else if end <= p.start {
    61  		return
    62  	}
    63  
    64  	if off < p.start {
    65  		// Discard the portion of b which falls before p.start.
    66  		trim := p.start - off
    67  		b = b[trim:]
    68  		off = p.start
    69  	}
    70  
    71  	if p.head == nil {
    72  		p.head = newPipebuf()
    73  		p.head.off = p.start
    74  		p.tail = p.head
    75  	}
    76  	pb := p.head
    77  	if off >= p.tail.off {
    78  		// Common case: Writing past the end of the pipe.
    79  		pb = p.tail
    80  	}
    81  	for {
    82  		pboff := off - pb.off
    83  		if pboff < int64(len(pb.b)) {
    84  			n := copy(pb.b[pboff:], b)
    85  			if n == len(b) {
    86  				return
    87  			}
    88  			off += int64(n)
    89  			b = b[n:]
    90  		}
    91  		if pb.next == nil {
    92  			pb.next = newPipebuf()
    93  			pb.next.off = pb.off + int64(len(pb.b))
    94  			p.tail = pb.next
    95  		}
    96  		pb = pb.next
    97  	}
    98  }
    99  
   100  // copy copies len(b) bytes into b starting from off.
   101  // The pipe must contain [off, off+len(b)).
   102  func (p *pipe) copy(off int64, b []byte) {
   103  	dst := b[:0]
   104  	p.read(off, len(b), func(c []byte) error {
   105  		dst = append(dst, c...)
   106  		return nil
   107  	})
   108  }
   109  
   110  // read calls f with the data in [off, off+n)
   111  // The data may be provided sequentially across multiple calls to f.
   112  // Note that read (unlike an io.Reader) does not consume the read data.
   113  func (p *pipe) read(off int64, n int, f func([]byte) error) error {
   114  	if off < p.start {
   115  		panic("invalid read range")
   116  	}
   117  	for pb := p.head; pb != nil && n > 0; pb = pb.next {
   118  		if off >= pb.end() {
   119  			continue
   120  		}
   121  		b := pb.b[off-pb.off:]
   122  		if len(b) > n {
   123  			b = b[:n]
   124  		}
   125  		off += int64(len(b))
   126  		n -= len(b)
   127  		if err := f(b); err != nil {
   128  			return err
   129  		}
   130  	}
   131  	if n > 0 {
   132  		panic("invalid read range")
   133  	}
   134  	return nil
   135  }
   136  
   137  // peek returns a reference to up to n bytes of internal data buffer, starting at p.start.
   138  // The returned slice is valid until the next call to discardBefore.
   139  // The length of the returned slice will be in the range [0,n].
   140  func (p *pipe) peek(n int64) []byte {
   141  	pb := p.head
   142  	if pb == nil {
   143  		return nil
   144  	}
   145  	b := pb.b[p.start-pb.off:]
   146  	return b[:min(int64(len(b)), n)]
   147  }
   148  
   149  // availableBuffer returns the available contiguous, allocated buffer space
   150  // following the pipe window.
   151  //
   152  // This is used by the stream write fast path, which makes multiple writes into the pipe buffer
   153  // without a lock, and then adjusts p.end at a later time with a lock held.
   154  func (p *pipe) availableBuffer() []byte {
   155  	if p.tail == nil {
   156  		return nil
   157  	}
   158  	return p.tail.b[p.end-p.tail.off:]
   159  }
   160  
   161  // discardBefore discards all data prior to off.
   162  func (p *pipe) discardBefore(off int64) {
   163  	for p.head != nil && p.head.end() < off {
   164  		head := p.head
   165  		p.head = p.head.next
   166  		head.recycle()
   167  	}
   168  	if p.head == nil {
   169  		p.tail = nil
   170  	}
   171  	p.start = off
   172  	p.end = max(p.end, off)
   173  }
   174  

View as plain text