1
2
3
4
5 package cache
6
7 import (
8 "bufio"
9 "cmd/go/internal/base"
10 "cmd/internal/quoted"
11 "context"
12 "crypto/sha256"
13 "encoding/base64"
14 "encoding/json"
15 "errors"
16 "fmt"
17 "internal/goexperiment"
18 "io"
19 "log"
20 "os"
21 "os/exec"
22 "sync"
23 "sync/atomic"
24 "time"
25 )
26
27
28
29
30
31
32 type ProgCache struct {
33 cmd *exec.Cmd
34 stdout io.ReadCloser
35 stdin io.WriteCloser
36 bw *bufio.Writer
37 jenc *json.Encoder
38
39
40
41 can map[ProgCmd]bool
42
43
44
45
46
47
48
49 fuzzDirCache Cache
50
51 closing atomic.Bool
52 ctx context.Context
53 ctxCancel context.CancelFunc
54 readLoopDone chan struct{}
55
56 mu sync.Mutex
57 nextID int64
58 inFlight map[int64]chan<- *ProgResponse
59 outputFile map[OutputID]string
60
61
62
63 writeMu sync.Mutex
64 }
65
66
67
68
69
70 type ProgCmd string
71
72 const (
73 cmdGet = ProgCmd("get")
74 cmdPut = ProgCmd("put")
75 cmdClose = ProgCmd("close")
76 )
77
78
79
80
81
82 type ProgRequest struct {
83
84
85 ID int64
86
87
88
89
90 Command ProgCmd
91
92
93 ActionID []byte `json:",omitempty"`
94
95
96
97
98
99 OutputID []byte `json:",omitempty"`
100
101
102
103
104
105
106
107 Body io.Reader `json:"-"`
108
109
110 BodySize int64 `json:",omitempty"`
111
112
113
114
115
116
117
118 ObjectID []byte `json:",omitempty"`
119 }
120
121
122
123
124
125
126
127
128
129 type ProgResponse struct {
130 ID int64
131 Err string `json:",omitempty"`
132
133
134
135
136
137
138
139
140 KnownCommands []ProgCmd `json:",omitempty"`
141
142
143
144 Miss bool `json:",omitempty"`
145 OutputID []byte `json:",omitempty"`
146 Size int64 `json:",omitempty"`
147 Time *time.Time `json:",omitempty"`
148
149
150
151
152 DiskPath string `json:",omitempty"`
153 }
154
155
156
157
158
159
160 func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
161 if fuzzDirCache == nil {
162 panic("missing fuzzDirCache")
163 }
164 args, err := quoted.Split(progAndArgs)
165 if err != nil {
166 base.Fatalf("GOCACHEPROG args: %v", err)
167 }
168 var prog string
169 if len(args) > 0 {
170 prog = args[0]
171 args = args[1:]
172 }
173
174 ctx, ctxCancel := context.WithCancel(context.Background())
175
176 cmd := exec.CommandContext(ctx, prog, args...)
177 out, err := cmd.StdoutPipe()
178 if err != nil {
179 base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
180 }
181 in, err := cmd.StdinPipe()
182 if err != nil {
183 base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
184 }
185 cmd.Stderr = os.Stderr
186 cmd.Cancel = in.Close
187
188 if err := cmd.Start(); err != nil {
189 base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
190 }
191
192 pc := &ProgCache{
193 ctx: ctx,
194 ctxCancel: ctxCancel,
195 fuzzDirCache: fuzzDirCache,
196 cmd: cmd,
197 stdout: out,
198 stdin: in,
199 bw: bufio.NewWriter(in),
200 inFlight: make(map[int64]chan<- *ProgResponse),
201 outputFile: make(map[OutputID]string),
202 readLoopDone: make(chan struct{}),
203 }
204
205
206
207 capResc := make(chan *ProgResponse, 1)
208 pc.inFlight[0] = capResc
209
210 pc.jenc = json.NewEncoder(pc.bw)
211 go pc.readLoop(pc.readLoopDone)
212
213
214
215 timer := time.NewTicker(5 * time.Second)
216 defer timer.Stop()
217 for {
218 select {
219 case <-timer.C:
220 log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
221 case capRes := <-capResc:
222 can := map[ProgCmd]bool{}
223 for _, cmd := range capRes.KnownCommands {
224 can[cmd] = true
225 }
226 if len(can) == 0 {
227 base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
228 }
229 pc.can = can
230 return pc
231 }
232 }
233 }
234
235 func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
236 defer close(readLoopDone)
237 jd := json.NewDecoder(c.stdout)
238 for {
239 res := new(ProgResponse)
240 if err := jd.Decode(res); err != nil {
241 if c.closing.Load() {
242 return
243 }
244 if err == io.EOF {
245 c.mu.Lock()
246 inFlight := len(c.inFlight)
247 c.mu.Unlock()
248 base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
249 }
250 base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
251 }
252 c.mu.Lock()
253 ch, ok := c.inFlight[res.ID]
254 delete(c.inFlight, res.ID)
255 c.mu.Unlock()
256 if ok {
257 ch <- res
258 } else {
259 base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
260 }
261 }
262 }
263
264 func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) {
265 resc := make(chan *ProgResponse, 1)
266 if err := c.writeToChild(req, resc); err != nil {
267 return nil, err
268 }
269 select {
270 case res := <-resc:
271 if res.Err != "" {
272 return nil, errors.New(res.Err)
273 }
274 return res, nil
275 case <-ctx.Done():
276 return nil, ctx.Err()
277 }
278 }
279
280 func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) {
281 c.mu.Lock()
282 c.nextID++
283 req.ID = c.nextID
284 c.inFlight[req.ID] = resc
285 c.mu.Unlock()
286
287 defer func() {
288 if err != nil {
289 c.mu.Lock()
290 delete(c.inFlight, req.ID)
291 c.mu.Unlock()
292 }
293 }()
294
295 c.writeMu.Lock()
296 defer c.writeMu.Unlock()
297
298 if err := c.jenc.Encode(req); err != nil {
299 return err
300 }
301 if err := c.bw.WriteByte('\n'); err != nil {
302 return err
303 }
304 if req.Body != nil && req.BodySize > 0 {
305 if err := c.bw.WriteByte('"'); err != nil {
306 return err
307 }
308 e := base64.NewEncoder(base64.StdEncoding, c.bw)
309 wrote, err := io.Copy(e, req.Body)
310 if err != nil {
311 return err
312 }
313 if err := e.Close(); err != nil {
314 return nil
315 }
316 if wrote != req.BodySize {
317 return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
318 req.ActionID, req.OutputID, wrote, req.BodySize)
319 }
320 if _, err := c.bw.WriteString("\"\n"); err != nil {
321 return err
322 }
323 }
324 if err := c.bw.Flush(); err != nil {
325 return err
326 }
327 return nil
328 }
329
330 func (c *ProgCache) Get(a ActionID) (Entry, error) {
331 if !c.can[cmdGet] {
332
333
334
335
336
337
338
339 return Entry{}, &entryNotFoundError{}
340 }
341 res, err := c.send(c.ctx, &ProgRequest{
342 Command: cmdGet,
343 ActionID: a[:],
344 })
345 if err != nil {
346 return Entry{}, err
347 }
348 if res.Miss {
349 return Entry{}, &entryNotFoundError{}
350 }
351 e := Entry{
352 Size: res.Size,
353 }
354 if res.Time != nil {
355 e.Time = *res.Time
356 } else {
357 e.Time = time.Now()
358 }
359 if res.DiskPath == "" {
360 return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
361 }
362 if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
363 return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
364 }
365 c.noteOutputFile(e.OutputID, res.DiskPath)
366 return e, nil
367 }
368
369 func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
370 c.mu.Lock()
371 defer c.mu.Unlock()
372 c.outputFile[o] = diskPath
373 }
374
375 func (c *ProgCache) OutputFile(o OutputID) string {
376 c.mu.Lock()
377 defer c.mu.Unlock()
378 return c.outputFile[o]
379 }
380
381 func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
382
383 h := sha256.New()
384 if _, err := file.Seek(0, 0); err != nil {
385 return OutputID{}, 0, err
386 }
387 size, err := io.Copy(h, file)
388 if err != nil {
389 return OutputID{}, 0, err
390 }
391 var out OutputID
392 h.Sum(out[:0])
393
394 if _, err := file.Seek(0, 0); err != nil {
395 return OutputID{}, 0, err
396 }
397
398 if !c.can[cmdPut] {
399
400 return out, size, nil
401 }
402
403
404
405 var deprecatedValue []byte
406 if goexperiment.CacheProg {
407 deprecatedValue = out[:]
408 }
409
410 res, err := c.send(c.ctx, &ProgRequest{
411 Command: cmdPut,
412 ActionID: a[:],
413 OutputID: out[:],
414 ObjectID: deprecatedValue,
415 Body: file,
416 BodySize: size,
417 })
418 if err != nil {
419 return OutputID{}, 0, err
420 }
421 if res.DiskPath == "" {
422 return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
423 }
424 c.noteOutputFile(out, res.DiskPath)
425 return out, size, err
426 }
427
428 func (c *ProgCache) Close() error {
429 c.closing.Store(true)
430 var err error
431
432
433
434
435 if c.can[cmdClose] {
436 _, err = c.send(c.ctx, &ProgRequest{Command: cmdClose})
437 }
438 c.ctxCancel()
439 <-c.readLoopDone
440 return err
441 }
442
443 func (c *ProgCache) FuzzDir() string {
444
445
446 return c.fuzzDirCache.FuzzDir()
447 }
448
View as plain text