1
2
3
4
5 package fuzz
6
7 import (
8 "bytes"
9 "context"
10 "crypto/sha256"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "os"
16 "os/exec"
17 "reflect"
18 "runtime"
19 "sync"
20 "time"
21 )
22
23 const (
24
25
26 workerFuzzDuration = 100 * time.Millisecond
27
28
29
30 workerTimeoutDuration = 1 * time.Second
31
32
33
34
35 workerExitCode = 70
36
37
38
39 workerSharedMemSize = 100 << 20
40 )
41
42
43
44
45
46 type worker struct {
47 dir string
48 binPath string
49 args []string
50 env []string
51
52 coordinator *coordinator
53
54 memMu chan *sharedMem
55
56 cmd *exec.Cmd
57 client *workerClient
58 waitErr error
59 interrupted bool
60 termC chan struct{}
61 }
62
63 func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
64 mem, err := sharedMemTempFile(workerSharedMemSize)
65 if err != nil {
66 return nil, err
67 }
68 memMu := make(chan *sharedMem, 1)
69 memMu <- mem
70 return &worker{
71 dir: dir,
72 binPath: binPath,
73 args: args,
74 env: env[:len(env):len(env)],
75 coordinator: c,
76 memMu: memMu,
77 }, nil
78 }
79
80
81 func (w *worker) cleanup() error {
82 mem := <-w.memMu
83 if mem == nil {
84 return nil
85 }
86 close(w.memMu)
87 return mem.Close()
88 }
89
90
91
92
93
94
95
96
97
98
99
100 func (w *worker) coordinate(ctx context.Context) error {
101
102 for {
103
104 if !w.isRunning() {
105 if err := w.startAndPing(ctx); err != nil {
106 return err
107 }
108 }
109
110 select {
111 case <-ctx.Done():
112
113 err := w.stop()
114 if err != nil && !w.interrupted && !isInterruptError(err) {
115 return err
116 }
117 return ctx.Err()
118
119 case <-w.termC:
120
121 err := w.stop()
122 if w.interrupted {
123 panic("worker interrupted after unexpected termination")
124 }
125 if err == nil || isInterruptError(err) {
126
127
128
129
130
131
132
133
134
135
136 return nil
137 }
138 if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
139
140
141 return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
142 }
143
144
145 return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
146
147
148 case input := <-w.coordinator.inputC:
149
150 args := fuzzArgs{
151 Limit: input.limit,
152 Timeout: input.timeout,
153 Warmup: input.warmup,
154 CoverageData: input.coverageData,
155 }
156 entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
157 canMinimize := true
158 if err != nil {
159
160 w.stop()
161 if ctx.Err() != nil {
162
163 return ctx.Err()
164 }
165 if w.interrupted {
166
167
168 return fmt.Errorf("communicating with fuzzing process: %v", err)
169 }
170 if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
171
172
173
174
175
176 return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
177 }
178 if isInternalError {
179
180
181 return err
182 }
183
184
185
186 resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
187 canMinimize = false
188 }
189 result := fuzzResult{
190 limit: input.limit,
191 count: resp.Count,
192 totalDuration: resp.TotalDuration,
193 entryDuration: resp.InterestingDuration,
194 entry: entry,
195 crasherMsg: resp.Err,
196 coverageData: resp.CoverageData,
197 canMinimize: canMinimize,
198 }
199 w.coordinator.resultC <- result
200
201 case input := <-w.coordinator.minimizeC:
202
203 result, err := w.minimize(ctx, input)
204 if err != nil {
205
206
207
208
209 result = fuzzResult{
210 entry: input.entry,
211 crasherMsg: input.crasherMsg,
212 canMinimize: false,
213 limit: input.limit,
214 }
215 if result.crasherMsg == "" {
216 result.crasherMsg = err.Error()
217 }
218 }
219 if shouldPrintDebugInfo() {
220 w.coordinator.debugLogf(
221 "input minimized, id: %s, original id: %s, crasher: %t, originally crasher: %t, minimizing took: %s",
222 result.entry.Path,
223 input.entry.Path,
224 result.crasherMsg != "",
225 input.crasherMsg != "",
226 result.totalDuration,
227 )
228 }
229 w.coordinator.resultC <- result
230 }
231 }
232 }
233
234
235
236
237
238 func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
239 if w.coordinator.opts.MinimizeTimeout != 0 {
240 var cancel func()
241 ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
242 defer cancel()
243 }
244
245 args := minimizeArgs{
246 Limit: input.limit,
247 Timeout: input.timeout,
248 KeepCoverage: input.keepCoverage,
249 }
250 entry, resp, err := w.client.minimize(ctx, input.entry, args)
251 if err != nil {
252
253 w.stop()
254 if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
255
256
257
258
259
260 return fuzzResult{
261 entry: input.entry,
262 crasherMsg: input.crasherMsg,
263 coverageData: input.keepCoverage,
264 canMinimize: false,
265 limit: input.limit,
266 }, nil
267 }
268 return fuzzResult{
269 entry: entry,
270 crasherMsg: fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
271 canMinimize: false,
272 limit: input.limit,
273 count: resp.Count,
274 totalDuration: resp.Duration,
275 }, nil
276 }
277
278 if input.crasherMsg != "" && resp.Err == "" {
279 return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
280 }
281
282 return fuzzResult{
283 entry: entry,
284 crasherMsg: resp.Err,
285 coverageData: resp.CoverageData,
286 canMinimize: false,
287 limit: input.limit,
288 count: resp.Count,
289 totalDuration: resp.Duration,
290 }, nil
291 }
292
293 func (w *worker) isRunning() bool {
294 return w.cmd != nil
295 }
296
297
298
299
300
301
302
303
304
305 func (w *worker) startAndPing(ctx context.Context) error {
306 if ctx.Err() != nil {
307 return ctx.Err()
308 }
309 if err := w.start(); err != nil {
310 return err
311 }
312 if err := w.client.ping(ctx); err != nil {
313 w.stop()
314 if ctx.Err() != nil {
315 return ctx.Err()
316 }
317 if isInterruptError(err) {
318
319 return err
320 }
321
322 return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
323 }
324 return nil
325 }
326
327
328
329
330
331
332
333
334
335
336
337 func (w *worker) start() (err error) {
338 if w.isRunning() {
339 panic("worker already started")
340 }
341 w.waitErr = nil
342 w.interrupted = false
343 w.termC = nil
344
345 cmd := exec.Command(w.binPath, w.args...)
346 cmd.Dir = w.dir
347 cmd.Env = w.env[:len(w.env):len(w.env)]
348
349
350
351
352
353
354
355
356
357 fuzzInR, fuzzInW, err := os.Pipe()
358 if err != nil {
359 return err
360 }
361 defer fuzzInR.Close()
362 fuzzOutR, fuzzOutW, err := os.Pipe()
363 if err != nil {
364 fuzzInW.Close()
365 return err
366 }
367 defer fuzzOutW.Close()
368 setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
369
370
371 if err := cmd.Start(); err != nil {
372 fuzzInW.Close()
373 fuzzOutR.Close()
374 return err
375 }
376
377
378
379
380 w.cmd = cmd
381 w.termC = make(chan struct{})
382 comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
383 m := newMutator()
384 w.client = newWorkerClient(comm, m)
385
386 go func() {
387 w.waitErr = w.cmd.Wait()
388 close(w.termC)
389 }()
390
391 return nil
392 }
393
394
395
396
397
398
399
400
401
402
403 func (w *worker) stop() error {
404 if w.termC == nil {
405 panic("worker was not started successfully")
406 }
407 select {
408 case <-w.termC:
409
410 if w.client == nil {
411
412 return w.waitErr
413 }
414
415 w.client.Close()
416 w.cmd = nil
417 w.client = nil
418 return w.waitErr
419 default:
420
421 }
422
423
424
425 closeC := make(chan struct{})
426 go func() {
427 w.client.Close()
428 close(closeC)
429 }()
430
431 sig := os.Interrupt
432 if runtime.GOOS == "windows" {
433
434
435
436 sig = os.Kill
437 }
438
439 t := time.NewTimer(workerTimeoutDuration)
440 for {
441 select {
442 case <-w.termC:
443
444 t.Stop()
445 <-closeC
446 w.cmd = nil
447 w.client = nil
448 return w.waitErr
449
450 case <-t.C:
451
452 w.interrupted = true
453 switch sig {
454 case os.Interrupt:
455
456 w.cmd.Process.Signal(sig)
457 sig = os.Kill
458 t.Reset(workerTimeoutDuration)
459
460 case os.Kill:
461
462 w.cmd.Process.Signal(sig)
463 sig = nil
464 t.Reset(workerTimeoutDuration)
465
466 case nil:
467
468 fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
469 }
470 }
471 }
472 }
473
474
475
476
477
478
479
480
481
482
483
484 func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
485 comm, err := getWorkerComm()
486 if err != nil {
487 return err
488 }
489 srv := &workerServer{
490 workerComm: comm,
491 fuzzFn: func(e CorpusEntry) (time.Duration, error) {
492 timer := time.AfterFunc(10*time.Second, func() {
493 panic("deadlocked!")
494 })
495 defer timer.Stop()
496 start := time.Now()
497 err := fn(e)
498 return time.Since(start), err
499 },
500 m: newMutator(),
501 }
502 return srv.serve(ctx)
503 }
504
505
506
507
508 type call struct {
509 Ping *pingArgs
510 Fuzz *fuzzArgs
511 Minimize *minimizeArgs
512 }
513
514
515
516 type minimizeArgs struct {
517
518
519
520 Timeout time.Duration
521
522
523
524 Limit int64
525
526
527
528
529 KeepCoverage []byte
530
531
532 Index int
533 }
534
535
536 type minimizeResponse struct {
537
538
539
540
541 WroteToMem bool
542
543
544 Err string
545
546
547
548
549 CoverageData []byte
550
551
552 Duration time.Duration
553
554
555 Count int64
556 }
557
558
559
560 type fuzzArgs struct {
561
562
563 Timeout time.Duration
564
565
566
567 Limit int64
568
569
570
571
572 Warmup bool
573
574
575
576 CoverageData []byte
577 }
578
579
580 type fuzzResponse struct {
581
582 TotalDuration time.Duration
583 InterestingDuration time.Duration
584
585
586 Count int64
587
588
589
590 CoverageData []byte
591
592
593
594 Err string
595
596
597
598 InternalErr string
599 }
600
601
602 type pingArgs struct{}
603
604
605 type pingResponse struct{}
606
607
608
609
610
611
612
613
614
615
616 type workerComm struct {
617 fuzzIn, fuzzOut *os.File
618 memMu chan *sharedMem
619 }
620
621
622
623
624
625
626 type workerServer struct {
627 workerComm
628 m *mutator
629
630
631
632
633 coverageMask []byte
634
635
636
637
638
639
640 fuzzFn func(CorpusEntry) (time.Duration, error)
641 }
642
643
644
645
646
647
648
649
650
651
652
653 func (ws *workerServer) serve(ctx context.Context) error {
654 enc := json.NewEncoder(ws.fuzzOut)
655 dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
656 for {
657 var c call
658 if err := dec.Decode(&c); err != nil {
659 if err == io.EOF || err == ctx.Err() {
660 return nil
661 } else {
662 return err
663 }
664 }
665
666 var resp any
667 switch {
668 case c.Fuzz != nil:
669 resp = ws.fuzz(ctx, *c.Fuzz)
670 case c.Minimize != nil:
671 resp = ws.minimize(ctx, *c.Minimize)
672 case c.Ping != nil:
673 resp = ws.ping(ctx, *c.Ping)
674 default:
675 return errors.New("no arguments provided for any call")
676 }
677
678 if err := enc.Encode(resp); err != nil {
679 return err
680 }
681 }
682 }
683
684
685
686
687
688
689
690
691
692
693 const chainedMutations = 5
694
695
696
697
698
699
700
701
702
703
704
705
706 func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
707 if args.CoverageData != nil {
708 if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
709 resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
710 return resp
711 }
712 ws.coverageMask = args.CoverageData
713 }
714 start := time.Now()
715 defer func() { resp.TotalDuration = time.Since(start) }()
716
717 if args.Timeout != 0 {
718 var cancel func()
719 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
720 defer cancel()
721 }
722 mem := <-ws.memMu
723 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
724 defer func() {
725 resp.Count = mem.header().count
726 ws.memMu <- mem
727 }()
728 if args.Limit > 0 && mem.header().count >= args.Limit {
729 resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
730 return resp
731 }
732
733 originalVals, err := unmarshalCorpusFile(mem.valueCopy())
734 if err != nil {
735 resp.InternalErr = err.Error()
736 return resp
737 }
738 vals := make([]any, len(originalVals))
739 copy(vals, originalVals)
740
741 shouldStop := func() bool {
742 return args.Limit > 0 && mem.header().count >= args.Limit
743 }
744 fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
745 mem.header().count++
746 var err error
747 dur, err = ws.fuzzFn(entry)
748 if err != nil {
749 errMsg = err.Error()
750 if errMsg == "" {
751 errMsg = "fuzz function failed with no input"
752 }
753 return dur, nil, errMsg
754 }
755 if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
756 return dur, coverageSnapshot, ""
757 }
758 return dur, nil, ""
759 }
760
761 if args.Warmup {
762 dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
763 if errMsg != "" {
764 resp.Err = errMsg
765 return resp
766 }
767 resp.InterestingDuration = dur
768 if coverageEnabled {
769 resp.CoverageData = coverageSnapshot
770 }
771 return resp
772 }
773
774 for {
775 select {
776 case <-ctx.Done():
777 return resp
778 default:
779 if mem.header().count%chainedMutations == 0 {
780 copy(vals, originalVals)
781 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
782 }
783 ws.m.mutate(vals, cap(mem.valueRef()))
784
785 entry := CorpusEntry{Values: vals}
786 dur, cov, errMsg := fuzzOnce(entry)
787 if errMsg != "" {
788 resp.Err = errMsg
789 return resp
790 }
791 if cov != nil {
792 resp.CoverageData = cov
793 resp.InterestingDuration = dur
794 return resp
795 }
796 if shouldStop() {
797 return resp
798 }
799 }
800 }
801 }
802
803 func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
804 start := time.Now()
805 defer func() { resp.Duration = time.Since(start) }()
806 mem := <-ws.memMu
807 defer func() { ws.memMu <- mem }()
808 vals, err := unmarshalCorpusFile(mem.valueCopy())
809 if err != nil {
810 panic(err)
811 }
812 inpHash := sha256.Sum256(mem.valueCopy())
813 if args.Timeout != 0 {
814 var cancel func()
815 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
816 defer cancel()
817 }
818
819
820
821 success, err := ws.minimizeInput(ctx, vals, mem, args)
822 if success {
823 writeToMem(vals, mem)
824 outHash := sha256.Sum256(mem.valueCopy())
825 mem.header().rawInMem = false
826 resp.WroteToMem = true
827 if err != nil {
828 resp.Err = err.Error()
829 } else {
830
831
832
833
834
835 if outHash != inpHash {
836 resp.CoverageData = coverageSnapshot
837 } else {
838 resp.CoverageData = args.KeepCoverage
839 }
840 }
841 }
842 return resp
843 }
844
845
846
847
848
849
850 func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
851 keepCoverage := args.KeepCoverage
852 memBytes := mem.valueRef()
853 bPtr := &memBytes
854 count := &mem.header().count
855 shouldStop := func() bool {
856 return ctx.Err() != nil ||
857 (args.Limit > 0 && *count >= args.Limit)
858 }
859 if shouldStop() {
860 return false, nil
861 }
862
863
864
865
866 *count++
867 _, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
868 if keepCoverage != nil {
869 if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
870 return false, nil
871 }
872 } else if retErr == nil {
873 return false, nil
874 }
875 mem.header().rawInMem = true
876
877
878
879
880
881 tryMinimized := func(candidate []byte) bool {
882 prev := vals[args.Index]
883 switch prev.(type) {
884 case []byte:
885 vals[args.Index] = candidate
886 case string:
887 vals[args.Index] = string(candidate)
888 default:
889 panic("impossible")
890 }
891 copy(*bPtr, candidate)
892 *bPtr = (*bPtr)[:len(candidate)]
893 mem.setValueLen(len(candidate))
894 *count++
895 _, err := ws.fuzzFn(CorpusEntry{Values: vals})
896 if err != nil {
897 retErr = err
898 if keepCoverage != nil {
899
900
901
902 keepCoverage = nil
903 }
904 return true
905 }
906
907 if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
908 return true
909 }
910 vals[args.Index] = prev
911 return false
912 }
913 switch v := vals[args.Index].(type) {
914 case string:
915 minimizeBytes([]byte(v), tryMinimized, shouldStop)
916 case []byte:
917 minimizeBytes(v, tryMinimized, shouldStop)
918 default:
919 panic("impossible")
920 }
921 return true, retErr
922 }
923
924 func writeToMem(vals []any, mem *sharedMem) {
925 b := marshalCorpusFile(vals...)
926 mem.setValue(b)
927 }
928
929
930
931 func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
932 return pingResponse{}
933 }
934
935
936
937
938 type workerClient struct {
939 workerComm
940 m *mutator
941
942
943
944
945
946
947 mu sync.Mutex
948 }
949
950 func newWorkerClient(comm workerComm, m *mutator) *workerClient {
951 return &workerClient{workerComm: comm, m: m}
952 }
953
954
955
956
957 func (wc *workerClient) Close() error {
958 wc.mu.Lock()
959 defer wc.mu.Unlock()
960
961
962
963 if err := wc.fuzzIn.Close(); err != nil {
964 wc.fuzzOut.Close()
965 return err
966 }
967
968
969
970 if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil {
971 wc.fuzzOut.Close()
972 return err
973 }
974 return wc.fuzzOut.Close()
975 }
976
977
978
979
980
981
982
983
984 var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
985
986
987
988 func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
989 wc.mu.Lock()
990 defer wc.mu.Unlock()
991
992 mem, ok := <-wc.memMu
993 if !ok {
994 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
995 }
996 defer func() { wc.memMu <- mem }()
997 mem.header().count = 0
998 inp, err := corpusEntryData(entryIn)
999 if err != nil {
1000 return CorpusEntry{}, minimizeResponse{}, err
1001 }
1002 mem.setValue(inp)
1003 entryOut = entryIn
1004 entryOut.Values, err = unmarshalCorpusFile(inp)
1005 if err != nil {
1006 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
1007 }
1008 for i, v := range entryOut.Values {
1009 if !isMinimizable(reflect.TypeOf(v)) {
1010 continue
1011 }
1012
1013 wc.memMu <- mem
1014 args.Index = i
1015 c := call{Minimize: &args}
1016 callErr := wc.callLocked(ctx, c, &resp)
1017 mem, ok = <-wc.memMu
1018 if !ok {
1019 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
1020 }
1021
1022 if callErr != nil {
1023 retErr = callErr
1024 if !mem.header().rawInMem {
1025
1026 return entryIn, minimizeResponse{}, retErr
1027 }
1028
1029
1030
1031 switch entryOut.Values[i].(type) {
1032 case string:
1033 entryOut.Values[i] = string(mem.valueCopy())
1034 case []byte:
1035 entryOut.Values[i] = mem.valueCopy()
1036 default:
1037 panic("impossible")
1038 }
1039 entryOut.Data = marshalCorpusFile(entryOut.Values...)
1040
1041 break
1042 }
1043
1044 if resp.WroteToMem {
1045
1046 entryOut.Data = mem.valueCopy()
1047 entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
1048 if err != nil {
1049 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
1050 }
1051 }
1052
1053
1054 if args.Timeout != 0 {
1055 args.Timeout -= resp.Duration
1056 if args.Timeout <= 0 {
1057 break
1058 }
1059 }
1060 if args.Limit != 0 {
1061 args.Limit -= mem.header().count
1062 if args.Limit <= 0 {
1063 break
1064 }
1065 }
1066 }
1067 resp.Count = mem.header().count
1068 h := sha256.Sum256(entryOut.Data)
1069 entryOut.Path = fmt.Sprintf("%x", h[:4])
1070 return entryOut, resp, retErr
1071 }
1072
1073
1074 func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
1075 wc.mu.Lock()
1076 defer wc.mu.Unlock()
1077
1078 mem, ok := <-wc.memMu
1079 if !ok {
1080 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1081 }
1082 mem.header().count = 0
1083 inp, err := corpusEntryData(entryIn)
1084 if err != nil {
1085 wc.memMu <- mem
1086 return CorpusEntry{}, fuzzResponse{}, true, err
1087 }
1088 mem.setValue(inp)
1089 wc.memMu <- mem
1090
1091 c := call{Fuzz: &args}
1092 callErr := wc.callLocked(ctx, c, &resp)
1093 if resp.InternalErr != "" {
1094 return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
1095 }
1096 mem, ok = <-wc.memMu
1097 if !ok {
1098 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1099 }
1100 defer func() { wc.memMu <- mem }()
1101 resp.Count = mem.header().count
1102
1103 if !bytes.Equal(inp, mem.valueRef()) {
1104 return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
1105 }
1106 needEntryOut := callErr != nil || resp.Err != "" ||
1107 (!args.Warmup && resp.CoverageData != nil)
1108 if needEntryOut {
1109 valuesOut, err := unmarshalCorpusFile(inp)
1110 if err != nil {
1111 return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
1112 }
1113 wc.m.r.restore(mem.header().randState, mem.header().randInc)
1114 if !args.Warmup {
1115
1116 numMutations := ((resp.Count - 1) % chainedMutations) + 1
1117 for i := int64(0); i < numMutations; i++ {
1118 wc.m.mutate(valuesOut, cap(mem.valueRef()))
1119 }
1120 }
1121 dataOut := marshalCorpusFile(valuesOut...)
1122
1123 h := sha256.Sum256(dataOut)
1124 name := fmt.Sprintf("%x", h[:4])
1125 entryOut = CorpusEntry{
1126 Parent: entryIn.Path,
1127 Path: name,
1128 Data: dataOut,
1129 Generation: entryIn.Generation + 1,
1130 }
1131 if args.Warmup {
1132
1133
1134 entryOut.IsSeed = entryIn.IsSeed
1135 }
1136 }
1137
1138 return entryOut, resp, false, callErr
1139 }
1140
1141
1142 func (wc *workerClient) ping(ctx context.Context) error {
1143 wc.mu.Lock()
1144 defer wc.mu.Unlock()
1145 c := call{Ping: &pingArgs{}}
1146 var resp pingResponse
1147 return wc.callLocked(ctx, c, &resp)
1148 }
1149
1150
1151
1152 func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
1153 enc := json.NewEncoder(wc.fuzzIn)
1154 dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
1155 if err := enc.Encode(c); err != nil {
1156 return err
1157 }
1158 return dec.Decode(resp)
1159 }
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169 type contextReader struct {
1170 ctx context.Context
1171 r io.Reader
1172 }
1173
1174 func (cr *contextReader) Read(b []byte) (int, error) {
1175 if ctxErr := cr.ctx.Err(); ctxErr != nil {
1176 return 0, ctxErr
1177 }
1178 done := make(chan struct{})
1179
1180
1181
1182 var n int
1183 var err error
1184 go func() {
1185 n, err = cr.r.Read(b)
1186 close(done)
1187 }()
1188
1189 select {
1190 case <-cr.ctx.Done():
1191 return 0, cr.ctx.Err()
1192 case <-done:
1193 return n, err
1194 }
1195 }
1196
View as plain text