Source file src/net/http/internal/http2/writesched.go

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import "fmt"
     8  
     9  // WriteScheduler is the interface implemented by HTTP/2 write schedulers.
    10  // Methods are never called concurrently.
    11  type WriteScheduler interface {
    12  	// OpenStream opens a new stream in the write scheduler.
    13  	// It is illegal to call this with streamID=0 or with a streamID that is
    14  	// already open -- the call may panic.
    15  	OpenStream(streamID uint32, options OpenStreamOptions)
    16  
    17  	// CloseStream closes a stream in the write scheduler. Any frames queued on
    18  	// this stream should be discarded. It is illegal to call this on a stream
    19  	// that is not open -- the call may panic.
    20  	CloseStream(streamID uint32)
    21  
    22  	// AdjustStream adjusts the priority of the given stream. This may be called
    23  	// on a stream that has not yet been opened or has been closed. Note that
    24  	// RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
    25  	// https://tools.ietf.org/html/rfc7540#section-5.1
    26  	AdjustStream(streamID uint32, priority PriorityParam)
    27  
    28  	// Push queues a frame in the scheduler. In most cases, this will not be
    29  	// called with wr.StreamID()!=0 unless that stream is currently open. The one
    30  	// exception is RST_STREAM frames, which may be sent on idle or closed streams.
    31  	Push(wr FrameWriteRequest)
    32  
    33  	// Pop dequeues the next frame to write. Returns false if no frames can
    34  	// be written. Frames with a given wr.StreamID() are Pop'd in the same
    35  	// order they are Push'd, except RST_STREAM frames. No frames should be
    36  	// discarded except by CloseStream.
    37  	Pop() (wr FrameWriteRequest, ok bool)
    38  }
    39  
    40  // OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
    41  type OpenStreamOptions struct {
    42  	// PusherID is zero if the stream was initiated by the client. Otherwise,
    43  	// PusherID names the stream that pushed the newly opened stream.
    44  	PusherID uint32
    45  	// priority is used to set the priority of the newly opened stream.
    46  	priority PriorityParam
    47  }
    48  
    49  // FrameWriteRequest is a request to write a frame.
    50  type FrameWriteRequest struct {
    51  	// write is the interface value that does the writing, once the
    52  	// WriteScheduler has selected this frame to write. The write
    53  	// functions are all defined in write.go.
    54  	write writeFramer
    55  
    56  	// stream is the stream on which this frame will be written.
    57  	// nil for non-stream frames like PING and SETTINGS.
    58  	// nil for RST_STREAM streams, which use the StreamError.StreamID field instead.
    59  	stream *stream
    60  
    61  	// done, if non-nil, must be a buffered channel with space for
    62  	// 1 message and is sent the return value from write (or an
    63  	// earlier error) when the frame has been written.
    64  	done chan error
    65  }
    66  
    67  // StreamID returns the id of the stream this frame will be written to.
    68  // 0 is used for non-stream frames such as PING and SETTINGS.
    69  func (wr FrameWriteRequest) StreamID() uint32 {
    70  	if wr.stream == nil {
    71  		if se, ok := wr.write.(StreamError); ok {
    72  			// (*serverConn).resetStream doesn't set
    73  			// stream because it doesn't necessarily have
    74  			// one. So special case this type of write
    75  			// message.
    76  			return se.StreamID
    77  		}
    78  		return 0
    79  	}
    80  	return wr.stream.id
    81  }
    82  
    83  // isControl reports whether wr is a control frame for MaxQueuedControlFrames
    84  // purposes. That includes non-stream frames and RST_STREAM frames.
    85  func (wr FrameWriteRequest) isControl() bool {
    86  	return wr.stream == nil
    87  }
    88  
    89  // DataSize returns the number of flow control bytes that must be consumed
    90  // to write this entire frame. This is 0 for non-DATA frames.
    91  func (wr FrameWriteRequest) DataSize() int {
    92  	if wd, ok := wr.write.(*writeData); ok {
    93  		return len(wd.p)
    94  	}
    95  	return 0
    96  }
    97  
    98  // Consume consumes min(n, available) bytes from this frame, where available
    99  // is the number of flow control bytes available on the stream. Consume returns
   100  // 0, 1, or 2 frames, where the integer return value gives the number of frames
   101  // returned.
   102  //
   103  // If flow control prevents consuming any bytes, this returns (_, _, 0). If
   104  // the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
   105  // returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
   106  // 'rest' contains the remaining bytes. The consumed bytes are deducted from the
   107  // underlying stream's flow control budget.
   108  func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
   109  	var empty FrameWriteRequest
   110  
   111  	// Non-DATA frames are always consumed whole.
   112  	wd, ok := wr.write.(*writeData)
   113  	if !ok || len(wd.p) == 0 {
   114  		return wr, empty, 1
   115  	}
   116  
   117  	// Might need to split after applying limits.
   118  	allowed := min(n, wr.stream.flow.available())
   119  	if wr.stream.sc.maxFrameSize < allowed {
   120  		allowed = wr.stream.sc.maxFrameSize
   121  	}
   122  	if allowed <= 0 {
   123  		return empty, empty, 0
   124  	}
   125  	if len(wd.p) > int(allowed) {
   126  		wr.stream.flow.take(allowed)
   127  		consumed := FrameWriteRequest{
   128  			stream: wr.stream,
   129  			write: &writeData{
   130  				streamID: wd.streamID,
   131  				p:        wd.p[:allowed],
   132  				// Even if the original had endStream set, there
   133  				// are bytes remaining because len(wd.p) > allowed,
   134  				// so we know endStream is false.
   135  				endStream: false,
   136  			},
   137  			// Our caller is blocking on the final DATA frame, not
   138  			// this intermediate frame, so no need to wait.
   139  			done: nil,
   140  		}
   141  		rest := FrameWriteRequest{
   142  			stream: wr.stream,
   143  			write: &writeData{
   144  				streamID:  wd.streamID,
   145  				p:         wd.p[allowed:],
   146  				endStream: wd.endStream,
   147  			},
   148  			done: wr.done,
   149  		}
   150  		return consumed, rest, 2
   151  	}
   152  
   153  	// The frame is consumed whole.
   154  	// NB: This cast cannot overflow because allowed is <= math.MaxInt32.
   155  	wr.stream.flow.take(int32(len(wd.p)))
   156  	return wr, empty, 1
   157  }
   158  
   159  // String is for debugging only.
   160  func (wr FrameWriteRequest) String() string {
   161  	var des string
   162  	if s, ok := wr.write.(fmt.Stringer); ok {
   163  		des = s.String()
   164  	} else {
   165  		des = fmt.Sprintf("%T", wr.write)
   166  	}
   167  	return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
   168  }
   169  
   170  // replyToWriter sends err to wr.done and panics if the send must block
   171  // This does nothing if wr.done is nil.
   172  func (wr *FrameWriteRequest) replyToWriter(err error) {
   173  	if wr.done == nil {
   174  		return
   175  	}
   176  	select {
   177  	case wr.done <- err:
   178  	default:
   179  		panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
   180  	}
   181  	wr.write = nil // prevent use (assume it's tainted after wr.done send)
   182  }
   183  
   184  // writeQueue is used by implementations of WriteScheduler.
   185  //
   186  // Each writeQueue contains a queue of FrameWriteRequests, meant to store all
   187  // FrameWriteRequests associated with a given stream. This is implemented as a
   188  // two-stage queue: currQueue[currPos:] and nextQueue. Removing an item is done
   189  // by incrementing currPos of currQueue. Adding an item is done by appending it
   190  // to the nextQueue. If currQueue is empty when trying to remove an item, we
   191  // can swap currQueue and nextQueue to remedy the situation.
   192  // This two-stage queue is analogous to the use of two lists in Okasaki's
   193  // purely functional queue but without the overhead of reversing the list when
   194  // swapping stages.
   195  //
   196  // writeQueue also contains prev and next, this can be used by implementations
   197  // of WriteScheduler to construct data structures that represent the order of
   198  // writing between different streams (e.g. circular linked list).
   199  type writeQueue struct {
   200  	currQueue []FrameWriteRequest
   201  	nextQueue []FrameWriteRequest
   202  	currPos   int
   203  
   204  	prev, next *writeQueue
   205  }
   206  
   207  func (q *writeQueue) empty() bool {
   208  	return (len(q.currQueue) - q.currPos + len(q.nextQueue)) == 0
   209  }
   210  
   211  func (q *writeQueue) push(wr FrameWriteRequest) {
   212  	q.nextQueue = append(q.nextQueue, wr)
   213  }
   214  
   215  func (q *writeQueue) shift() FrameWriteRequest {
   216  	if q.empty() {
   217  		panic("invalid use of queue")
   218  	}
   219  	if q.currPos >= len(q.currQueue) {
   220  		q.currQueue, q.currPos, q.nextQueue = q.nextQueue, 0, q.currQueue[:0]
   221  	}
   222  	wr := q.currQueue[q.currPos]
   223  	q.currQueue[q.currPos] = FrameWriteRequest{}
   224  	q.currPos++
   225  	return wr
   226  }
   227  
   228  func (q *writeQueue) peek() *FrameWriteRequest {
   229  	if q.currPos < len(q.currQueue) {
   230  		return &q.currQueue[q.currPos]
   231  	}
   232  	if len(q.nextQueue) > 0 {
   233  		return &q.nextQueue[0]
   234  	}
   235  	return nil
   236  }
   237  
   238  // consume consumes up to n bytes from q.s[0]. If the frame is
   239  // entirely consumed, it is removed from the queue. If the frame
   240  // is partially consumed, the frame is kept with the consumed
   241  // bytes removed. Returns true iff any bytes were consumed.
   242  func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
   243  	if q.empty() {
   244  		return FrameWriteRequest{}, false
   245  	}
   246  	consumed, rest, numresult := q.peek().Consume(n)
   247  	switch numresult {
   248  	case 0:
   249  		return FrameWriteRequest{}, false
   250  	case 1:
   251  		q.shift()
   252  	case 2:
   253  		*q.peek() = rest
   254  	}
   255  	return consumed, true
   256  }
   257  
   258  type writeQueuePool []*writeQueue
   259  
   260  // put inserts an unused writeQueue into the pool.
   261  func (p *writeQueuePool) put(q *writeQueue) {
   262  	for i := range q.currQueue {
   263  		q.currQueue[i] = FrameWriteRequest{}
   264  	}
   265  	for i := range q.nextQueue {
   266  		q.nextQueue[i] = FrameWriteRequest{}
   267  	}
   268  	q.currQueue = q.currQueue[:0]
   269  	q.nextQueue = q.nextQueue[:0]
   270  	q.currPos = 0
   271  	*p = append(*p, q)
   272  }
   273  
   274  // get returns an empty writeQueue.
   275  func (p *writeQueuePool) get() *writeQueue {
   276  	ln := len(*p)
   277  	if ln == 0 {
   278  		return new(writeQueue)
   279  	}
   280  	x := ln - 1
   281  	q := (*p)[x]
   282  	(*p)[x] = nil
   283  	*p = (*p)[:x]
   284  	return q
   285  }
   286  

View as plain text