Source file src/runtime/netpoll_kqueue.go

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build darwin || dragonfly || freebsd || netbsd || openbsd
     6  
     7  package runtime
     8  
     9  // Integrated network poller (kqueue-based implementation).
    10  
    11  import (
    12  	"internal/goarch"
    13  	"internal/runtime/atomic"
    14  	"unsafe"
    15  )
    16  
    17  var (
    18  	kq             int32         = -1
    19  	netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
    20  )
    21  
    22  func netpollinit() {
    23  	kq = kqueue()
    24  	if kq < 0 {
    25  		println("runtime: kqueue failed with", -kq)
    26  		throw("runtime: netpollinit failed")
    27  	}
    28  	closeonexec(kq)
    29  	addWakeupEvent(kq)
    30  }
    31  
    32  func netpollopen(fd uintptr, pd *pollDesc) int32 {
    33  	// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
    34  	// for the whole fd lifetime. The notifications are automatically unregistered
    35  	// when fd is closed.
    36  	var ev [2]keventt
    37  	*(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
    38  	ev[0].filter = _EVFILT_READ
    39  	ev[0].flags = _EV_ADD | _EV_CLEAR
    40  	ev[0].fflags = 0
    41  	ev[0].data = 0
    42  
    43  	if goarch.PtrSize == 4 {
    44  		// We only have a pointer-sized field to store into,
    45  		// so on a 32-bit system we get no sequence protection.
    46  		// TODO(iant): If we notice any problems we could at least
    47  		// steal the low-order 2 bits for a tiny sequence number.
    48  		ev[0].udata = (*byte)(unsafe.Pointer(pd))
    49  	} else {
    50  		tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
    51  		ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
    52  	}
    53  	ev[1] = ev[0]
    54  	ev[1].filter = _EVFILT_WRITE
    55  	n := kevent(kq, &ev[0], 2, nil, 0, nil)
    56  	if n < 0 {
    57  		return -n
    58  	}
    59  	return 0
    60  }
    61  
    62  func netpollclose(fd uintptr) int32 {
    63  	// Don't need to unregister because calling close()
    64  	// on fd will remove any kevents that reference the descriptor.
    65  	return 0
    66  }
    67  
    68  func netpollarm(pd *pollDesc, mode int) {
    69  	throw("runtime: unused")
    70  }
    71  
    72  // netpollBreak interrupts a kevent.
    73  func netpollBreak() {
    74  	// Failing to cas indicates there is an in-flight wakeup, so we're done here.
    75  	if !netpollWakeSig.CompareAndSwap(0, 1) {
    76  		return
    77  	}
    78  
    79  	wakeNetpoll(kq)
    80  }
    81  
    82  // netpoll checks for ready network connections.
    83  // Returns a list of goroutines that become runnable,
    84  // and a delta to add to netpollWaiters.
    85  // This must never return an empty list with a non-zero delta.
    86  //
    87  // delay < 0: blocks indefinitely
    88  // delay == 0: does not block, just polls
    89  // delay > 0: block for up to that many nanoseconds
    90  func netpoll(delay int64) (gList, int32) {
    91  	if kq == -1 {
    92  		return gList{}, 0
    93  	}
    94  	var tp *timespec
    95  	var ts timespec
    96  	if delay < 0 {
    97  		tp = nil
    98  	} else if delay == 0 {
    99  		tp = &ts
   100  	} else {
   101  		ts.setNsec(delay)
   102  		if ts.tv_sec > 1e6 {
   103  			// Darwin returns EINVAL if the sleep time is too long.
   104  			ts.tv_sec = 1e6
   105  		}
   106  		tp = &ts
   107  	}
   108  	var events [64]keventt
   109  retry:
   110  	n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
   111  	if n < 0 {
   112  		// Ignore the ETIMEDOUT error for now, but try to dive deep and
   113  		// figure out what really happened with n == ETIMEOUT,
   114  		// see https://go.dev/issue/59679 for details.
   115  		if n != -_EINTR && n != -_ETIMEDOUT {
   116  			println("runtime: kevent on fd", kq, "failed with", -n)
   117  			throw("runtime: netpoll failed")
   118  		}
   119  		// If a timed sleep was interrupted, just return to
   120  		// recalculate how long we should sleep now.
   121  		if delay > 0 {
   122  			return gList{}, 0
   123  		}
   124  		goto retry
   125  	}
   126  	var toRun gList
   127  	delta := int32(0)
   128  	for i := 0; i < int(n); i++ {
   129  		ev := &events[i]
   130  
   131  		if isWakeup(ev) {
   132  			if delay != 0 {
   133  				// netpollBreak could be picked up by a nonblocking poll.
   134  				// Only call drainWakeupEvent and reset the netpollWakeSig if blocking.
   135  				drainWakeupEvent(kq)
   136  				netpollWakeSig.Store(0)
   137  			}
   138  			continue
   139  		}
   140  
   141  		var mode int32
   142  		switch ev.filter {
   143  		case _EVFILT_READ:
   144  			mode += 'r'
   145  
   146  			// On some systems when the read end of a pipe
   147  			// is closed the write end will not get a
   148  			// _EVFILT_WRITE event, but will get a
   149  			// _EVFILT_READ event with EV_EOF set.
   150  			// Note that setting 'w' here just means that we
   151  			// will wake up a goroutine waiting to write;
   152  			// that goroutine will try the write again,
   153  			// and the appropriate thing will happen based
   154  			// on what that write returns (success, EPIPE, EAGAIN).
   155  			if ev.flags&_EV_EOF != 0 {
   156  				mode += 'w'
   157  			}
   158  		case _EVFILT_WRITE:
   159  			mode += 'w'
   160  		}
   161  		if mode != 0 {
   162  			var pd *pollDesc
   163  			var tag uintptr
   164  			if goarch.PtrSize == 4 {
   165  				// No sequence protection on 32-bit systems.
   166  				// See netpollopen for details.
   167  				pd = (*pollDesc)(unsafe.Pointer(ev.udata))
   168  				tag = 0
   169  			} else {
   170  				tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
   171  				pd = (*pollDesc)(tp.pointer())
   172  				tag = tp.tag()
   173  				if pd.fdseq.Load() != tag {
   174  					continue
   175  				}
   176  			}
   177  			pd.setEventErr(ev.flags == _EV_ERROR, tag)
   178  			delta += netpollready(&toRun, pd, mode)
   179  		}
   180  	}
   181  	return toRun, delta
   182  }
   183  

View as plain text