Source file
src/os/pipe_test.go
1
2
3
4
5
6
7
8
9 package os_test
10
11 import (
12 "bufio"
13 "bytes"
14 "fmt"
15 "internal/testenv"
16 "io"
17 "io/fs"
18 "os"
19 "os/exec"
20 "os/signal"
21 "runtime"
22 "strconv"
23 "strings"
24 "sync"
25 "syscall"
26 "testing"
27 "time"
28 )
29
30 func TestEPIPE(t *testing.T) {
31
32
33
34
35
36
37
38
39
40
41 r, w, err := os.Pipe()
42 if err != nil {
43 t.Fatal(err)
44 }
45 if err := r.Close(); err != nil {
46 t.Fatal(err)
47 }
48
49 expect := syscall.EPIPE
50 if runtime.GOOS == "windows" {
51
52 expect = syscall.Errno(232)
53 }
54
55 for i := 0; i < 20; i++ {
56 _, err = w.Write([]byte("hi"))
57 if err == nil {
58 t.Fatal("unexpected success of Write to broken pipe")
59 }
60 if pe, ok := err.(*fs.PathError); ok {
61 err = pe.Err
62 }
63 if se, ok := err.(*os.SyscallError); ok {
64 err = se.Err
65 }
66 if err != expect {
67 t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
68 }
69 }
70 }
71
72 func TestStdPipe(t *testing.T) {
73 switch runtime.GOOS {
74 case "windows":
75 t.Skip("Windows doesn't support SIGPIPE")
76 }
77
78 if os.Getenv("GO_TEST_STD_PIPE_HELPER") != "" {
79 if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
80 signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
81 }
82 switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
83 case "1":
84 os.Stdout.Write([]byte("stdout"))
85 case "2":
86 os.Stderr.Write([]byte("stderr"))
87 case "3":
88 if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
89 os.Exit(3)
90 }
91 default:
92 panic("unrecognized value for GO_TEST_STD_PIPE_HELPER")
93 }
94
95
96
97
98 os.Exit(0)
99 }
100
101 testenv.MustHaveExec(t)
102
103
104
105
106 r, w, err := os.Pipe()
107 if err != nil {
108 t.Fatal(err)
109 }
110 if err := r.Close(); err != nil {
111 t.Fatal(err)
112 }
113
114
115
116
117
118
119 for _, sig := range []bool{false, true} {
120 for dest := 1; dest < 4; dest++ {
121 cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
122 cmd.Stdout = w
123 cmd.Stderr = w
124 cmd.ExtraFiles = []*os.File{w}
125 cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
126 if sig {
127 cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
128 }
129 if err := cmd.Run(); err == nil {
130 if !sig && dest < 3 {
131 t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
132 }
133 } else if ee, ok := err.(*exec.ExitError); !ok {
134 t.Errorf("unexpected exec error type %T: %v", err, err)
135 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
136 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
137 } else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
138 if sig || dest > 2 {
139 t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
140 }
141 } else {
142 t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
143 }
144 }
145 }
146
147
148 cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
149 cmd.Stdout = w
150 var stderr bytes.Buffer
151 cmd.Stderr = &stderr
152 cmd.Env = append(cmd.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
153 if err := cmd.Run(); err == nil {
154 t.Errorf("unexpected success of write to closed stdout")
155 } else if ee, ok := err.(*exec.ExitError); !ok {
156 t.Errorf("unexpected exec error type %T: %v", err, err)
157 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
158 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
159 } else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
160 t.Errorf("unexpected exit status %v for write to closed stdout", err)
161 }
162 if output := stderr.Bytes(); len(output) > 0 {
163 t.Errorf("unexpected output on stderr: %s", output)
164 }
165 }
166
167 func testClosedPipeRace(t *testing.T, read bool) {
168
169
170
171
172 limit := 1
173 if !read {
174
175
176 limit = 131073
177 if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
178 if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
179 limit = i + 1
180 }
181 }
182 t.Logf("using pipe write limit of %d", limit)
183 }
184
185 r, w, err := os.Pipe()
186 if err != nil {
187 t.Fatal(err)
188 }
189 defer r.Close()
190 defer w.Close()
191
192
193
194 go func() {
195
196
197
198 time.Sleep(20 * time.Millisecond)
199
200 var err error
201 if read {
202 err = r.Close()
203 } else {
204 err = w.Close()
205 }
206 if err != nil {
207 t.Error(err)
208 }
209 }()
210
211 b := make([]byte, limit)
212 if read {
213 _, err = r.Read(b[:])
214 } else {
215 _, err = w.Write(b[:])
216 }
217 if err == nil {
218 t.Error("I/O on closed pipe unexpectedly succeeded")
219 } else if pe, ok := err.(*fs.PathError); !ok {
220 t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
221 } else if pe.Err != fs.ErrClosed {
222 t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
223 } else {
224 t.Logf("I/O returned expected error %q", err)
225 }
226 }
227
228 func TestClosedPipeRaceRead(t *testing.T) {
229 testClosedPipeRace(t, true)
230 }
231
232 func TestClosedPipeRaceWrite(t *testing.T) {
233 testClosedPipeRace(t, false)
234 }
235
236
237
238
239 func TestReadNonblockingFd(t *testing.T) {
240 switch runtime.GOOS {
241 case "windows":
242 t.Skip("Windows doesn't support SetNonblock")
243 }
244 if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
245 fd := syscallDescriptor(os.Stdin.Fd())
246 syscall.SetNonblock(fd, true)
247 defer syscall.SetNonblock(fd, false)
248 _, err := os.Stdin.Read(make([]byte, 1))
249 if err != nil {
250 if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
251 t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
252 }
253 }
254 os.Exit(0)
255 }
256
257 testenv.MustHaveExec(t)
258 t.Parallel()
259
260 r, w, err := os.Pipe()
261 if err != nil {
262 t.Fatal(err)
263 }
264 defer r.Close()
265 defer w.Close()
266 cmd := testenv.Command(t, os.Args[0], "-test.run=^"+t.Name()+"$")
267 cmd.Env = append(cmd.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
268 cmd.Stdin = r
269 output, err := cmd.CombinedOutput()
270 t.Logf("%s", output)
271 if err != nil {
272 t.Errorf("child process failed: %v", err)
273 }
274 }
275
276 func TestCloseWithBlockingReadByNewFile(t *testing.T) {
277 t.Parallel()
278
279 var p [2]syscallDescriptor
280 err := syscall.Pipe(p[:])
281 if err != nil {
282 t.Fatal(err)
283 }
284
285 testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
286 }
287
288 func TestCloseWithBlockingReadByFd(t *testing.T) {
289 t.Parallel()
290
291 r, w, err := os.Pipe()
292 if err != nil {
293 t.Fatal(err)
294 }
295
296 _ = r.Fd()
297 testCloseWithBlockingRead(t, r, w)
298 }
299
300
301 func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
302 var (
303 enteringRead = make(chan struct{})
304 done = make(chan struct{})
305 )
306 go func() {
307 var b [1]byte
308 close(enteringRead)
309 _, err := r.Read(b[:])
310 if err == nil {
311 t.Error("I/O on closed pipe unexpectedly succeeded")
312 }
313
314 if pe, ok := err.(*fs.PathError); ok {
315 err = pe.Err
316 }
317 if err != io.EOF && err != fs.ErrClosed {
318 t.Errorf("got %v, expected EOF or closed", err)
319 }
320 close(done)
321 }()
322
323
324
325
326 <-enteringRead
327 time.Sleep(20 * time.Millisecond)
328
329 if err := r.Close(); err != nil {
330 t.Error(err)
331 }
332
333
334 w.Close()
335 <-done
336 }
337
338 func TestPipeEOF(t *testing.T) {
339 t.Parallel()
340
341 r, w, err := os.Pipe()
342 if err != nil {
343 t.Fatal(err)
344 }
345
346 testPipeEOF(t, r, w)
347 }
348
349
350
351
352
353
354 func testPipeEOF(t *testing.T, r io.ReadCloser, w io.WriteCloser) {
355
356
357
358
359 parkDelay := 10 * time.Millisecond
360 if testing.Short() {
361 parkDelay = 100 * time.Microsecond
362 }
363 writerDone := make(chan struct{})
364 defer func() {
365 if err := r.Close(); err != nil {
366 t.Errorf("error closing reader: %v", err)
367 }
368 <-writerDone
369 }()
370
371 write := make(chan int, 1)
372 go func() {
373 defer close(writerDone)
374
375 for i := range write {
376 time.Sleep(parkDelay)
377 _, err := fmt.Fprintf(w, "line %d\n", i)
378 if err != nil {
379 t.Errorf("error writing to fifo: %v", err)
380 return
381 }
382 }
383
384 time.Sleep(parkDelay)
385 if err := w.Close(); err != nil {
386 t.Errorf("error closing writer: %v", err)
387 }
388 }()
389
390 rbuf := bufio.NewReader(r)
391 for i := 0; i < 3; i++ {
392 write <- i
393 b, err := rbuf.ReadBytes('\n')
394 if err != nil {
395 t.Fatal(err)
396 }
397 t.Logf("%s\n", bytes.TrimSpace(b))
398 }
399
400 close(write)
401 b, err := rbuf.ReadBytes('\n')
402 if err != io.EOF || len(b) != 0 {
403 t.Errorf(`ReadBytes: %q, %v; want "", io.EOF`, b, err)
404 }
405 }
406
407
408 func TestFdRace(t *testing.T) {
409
410
411
412
413
414 r, w, err := os.Pipe()
415 if err != nil {
416 t.Fatal(err)
417 }
418 defer r.Close()
419 defer w.Close()
420
421 var wg sync.WaitGroup
422 call := func() {
423 defer wg.Done()
424 w.Fd()
425 }
426
427 const tries = 100
428 for i := 0; i < tries; i++ {
429 wg.Add(1)
430 go call()
431 }
432 wg.Wait()
433 }
434
435 func TestFdReadRace(t *testing.T) {
436 t.Parallel()
437
438 r, w, err := os.Pipe()
439 if err != nil {
440 t.Fatal(err)
441 }
442 defer r.Close()
443 defer w.Close()
444
445 const count = 10
446
447 c := make(chan bool, 1)
448 var wg sync.WaitGroup
449 wg.Add(1)
450 go func() {
451 defer wg.Done()
452 var buf [count]byte
453 r.SetReadDeadline(time.Now().Add(time.Minute))
454 c <- true
455 if _, err := r.Read(buf[:]); os.IsTimeout(err) {
456 t.Error("read timed out")
457 }
458 }()
459
460 wg.Add(1)
461 go func() {
462 defer wg.Done()
463 <-c
464
465
466
467 time.Sleep(10 * time.Millisecond)
468 r.Fd()
469
470
471
472
473 w.Write(make([]byte, count))
474 r.Close()
475 }()
476
477 wg.Wait()
478 }
479
View as plain text