1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "bytes"
10 "cmp"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "slices"
15 "strings"
16
17 "internal/trace/event"
18 "internal/trace/event/go122"
19 )
20
21
22
23
24
25 type generation struct {
26 gen uint64
27 batches map[ThreadID][]batch
28 batchMs []ThreadID
29 cpuSamples []cpuSample
30 *evTable
31 }
32
33
34
35
36 type spilledBatch struct {
37 gen uint64
38 *batch
39 }
40
41
42
43
44
45
46
47
48 func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
49 g := &generation{
50 evTable: &evTable{
51 pcs: make(map[uint64]frame),
52 },
53 batches: make(map[ThreadID][]batch),
54 }
55
56 if spill != nil {
57 g.gen = spill.gen
58 if err := processBatch(g, *spill.batch); err != nil {
59 return nil, nil, err
60 }
61 spill = nil
62 }
63
64
65 var spillErr error
66 for {
67 b, gen, err := readBatch(r)
68 if err == io.EOF {
69 break
70 }
71 if err != nil {
72 if g.gen != 0 {
73
74
75
76 spillErr = err
77 break
78 }
79 return nil, nil, err
80 }
81 if gen == 0 {
82
83 return nil, nil, fmt.Errorf("invalid generation number %d", gen)
84 }
85 if g.gen == 0 {
86
87 g.gen = gen
88 }
89 if gen == g.gen+1 {
90 spill = &spilledBatch{gen: gen, batch: &b}
91 break
92 }
93 if gen != g.gen {
94
95
96
97
98
99
100
101 return nil, nil, fmt.Errorf("generations out of order")
102 }
103 if err := processBatch(g, b); err != nil {
104 return nil, nil, err
105 }
106 }
107
108
109 if g.freq == 0 {
110 return nil, nil, fmt.Errorf("no frequency event found")
111 }
112
113
114
115
116
117
118
119 g.stacks.compactify()
120 g.strings.compactify()
121
122
123 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
124 return nil, nil, err
125 }
126
127
128 for i := range g.cpuSamples {
129 s := &g.cpuSamples[i]
130 s.time = g.freq.mul(timestamp(s.time))
131 }
132
133 slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
134 return cmp.Compare(a.time, b.time)
135 })
136 return g, spill, spillErr
137 }
138
139
140 func processBatch(g *generation, b batch) error {
141 switch {
142 case b.isStringsBatch():
143 if err := addStrings(&g.strings, b); err != nil {
144 return err
145 }
146 case b.isStacksBatch():
147 if err := addStacks(&g.stacks, g.pcs, b); err != nil {
148 return err
149 }
150 case b.isCPUSamplesBatch():
151 samples, err := addCPUSamples(g.cpuSamples, b)
152 if err != nil {
153 return err
154 }
155 g.cpuSamples = samples
156 case b.isFreqBatch():
157 freq, err := parseFreq(b)
158 if err != nil {
159 return err
160 }
161 if g.freq != 0 {
162 return fmt.Errorf("found multiple frequency events")
163 }
164 g.freq = freq
165 case b.exp != event.NoExperiment:
166 if g.expData == nil {
167 g.expData = make(map[event.Experiment]*ExperimentalData)
168 }
169 if err := addExperimentalData(g.expData, b); err != nil {
170 return err
171 }
172 default:
173 if _, ok := g.batches[b.m]; !ok {
174 g.batchMs = append(g.batchMs, b.m)
175 }
176 g.batches[b.m] = append(g.batches[b.m], b)
177 }
178 return nil
179 }
180
181
182
183 func validateStackStrings(
184 stacks *dataTable[stackID, stack],
185 strings *dataTable[stringID, string],
186 frames map[uint64]frame,
187 ) error {
188 var err error
189 stacks.forEach(func(id stackID, stk stack) bool {
190 for _, pc := range stk.pcs {
191 frame, ok := frames[pc]
192 if !ok {
193 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
194 return false
195 }
196 _, ok = strings.get(frame.funcID)
197 if !ok {
198 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
199 return false
200 }
201 _, ok = strings.get(frame.fileID)
202 if !ok {
203 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
204 return false
205 }
206 }
207 return true
208 })
209 return err
210 }
211
212
213
214
215 func addStrings(stringTable *dataTable[stringID, string], b batch) error {
216 if !b.isStringsBatch() {
217 return fmt.Errorf("internal error: addStrings called on non-string batch")
218 }
219 r := bytes.NewReader(b.data)
220 hdr, err := r.ReadByte()
221 if err != nil || event.Type(hdr) != go122.EvStrings {
222 return fmt.Errorf("missing strings batch header")
223 }
224
225 var sb strings.Builder
226 for r.Len() != 0 {
227
228 ev, err := r.ReadByte()
229 if err != nil {
230 return err
231 }
232 if event.Type(ev) != go122.EvString {
233 return fmt.Errorf("expected string event, got %d", ev)
234 }
235
236
237 id, err := binary.ReadUvarint(r)
238 if err != nil {
239 return err
240 }
241
242
243 len, err := binary.ReadUvarint(r)
244 if err != nil {
245 return err
246 }
247 if len > go122.MaxStringSize {
248 return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize)
249 }
250
251
252 n, err := io.CopyN(&sb, r, int64(len))
253 if n != int64(len) {
254 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
255 }
256 if err != nil {
257 return fmt.Errorf("copying string data: %w", err)
258 }
259
260
261 s := sb.String()
262 sb.Reset()
263 if err := stringTable.insert(stringID(id), s); err != nil {
264 return err
265 }
266 }
267 return nil
268 }
269
270
271
272
273 func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
274 if !b.isStacksBatch() {
275 return fmt.Errorf("internal error: addStacks called on non-stacks batch")
276 }
277 r := bytes.NewReader(b.data)
278 hdr, err := r.ReadByte()
279 if err != nil || event.Type(hdr) != go122.EvStacks {
280 return fmt.Errorf("missing stacks batch header")
281 }
282
283 for r.Len() != 0 {
284
285 ev, err := r.ReadByte()
286 if err != nil {
287 return err
288 }
289 if event.Type(ev) != go122.EvStack {
290 return fmt.Errorf("expected stack event, got %d", ev)
291 }
292
293
294 id, err := binary.ReadUvarint(r)
295 if err != nil {
296 return err
297 }
298
299
300 nFrames, err := binary.ReadUvarint(r)
301 if err != nil {
302 return err
303 }
304 if nFrames > go122.MaxFramesPerStack {
305 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack)
306 }
307
308
309 frames := make([]uint64, 0, nFrames)
310 for i := uint64(0); i < nFrames; i++ {
311
312 pc, err := binary.ReadUvarint(r)
313 if err != nil {
314 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
315 }
316 funcID, err := binary.ReadUvarint(r)
317 if err != nil {
318 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
319 }
320 fileID, err := binary.ReadUvarint(r)
321 if err != nil {
322 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
323 }
324 line, err := binary.ReadUvarint(r)
325 if err != nil {
326 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
327 }
328 frames = append(frames, pc)
329
330 if _, ok := pcs[pc]; !ok {
331 pcs[pc] = frame{
332 pc: pc,
333 funcID: stringID(funcID),
334 fileID: stringID(fileID),
335 line: line,
336 }
337 }
338 }
339
340
341 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
342 return err
343 }
344 }
345 return nil
346 }
347
348
349
350
351 func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
352 if !b.isCPUSamplesBatch() {
353 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
354 }
355 r := bytes.NewReader(b.data)
356 hdr, err := r.ReadByte()
357 if err != nil || event.Type(hdr) != go122.EvCPUSamples {
358 return nil, fmt.Errorf("missing CPU samples batch header")
359 }
360
361 for r.Len() != 0 {
362
363 ev, err := r.ReadByte()
364 if err != nil {
365 return nil, err
366 }
367 if event.Type(ev) != go122.EvCPUSample {
368 return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
369 }
370
371
372 ts, err := binary.ReadUvarint(r)
373 if err != nil {
374 return nil, err
375 }
376
377
378 m, err := binary.ReadUvarint(r)
379 if err != nil {
380 return nil, err
381 }
382 mid := ThreadID(m)
383
384
385 p, err := binary.ReadUvarint(r)
386 if err != nil {
387 return nil, err
388 }
389 pid := ProcID(p)
390
391
392 g, err := binary.ReadUvarint(r)
393 if err != nil {
394 return nil, err
395 }
396 goid := GoID(g)
397 if g == 0 {
398 goid = NoGoroutine
399 }
400
401
402 s, err := binary.ReadUvarint(r)
403 if err != nil {
404 return nil, err
405 }
406
407
408 samples = append(samples, cpuSample{
409 schedCtx: schedCtx{
410 M: mid,
411 P: pid,
412 G: goid,
413 },
414 time: Time(ts),
415 stack: stackID(s),
416 })
417 }
418 return samples, nil
419 }
420
421
422 func parseFreq(b batch) (frequency, error) {
423 if !b.isFreqBatch() {
424 return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
425 }
426 r := bytes.NewReader(b.data)
427 r.ReadByte()
428
429
430 f, err := binary.ReadUvarint(r)
431 if err != nil {
432 return 0, err
433 }
434
435 return frequency(1.0 / (float64(f) / 1e9)), nil
436 }
437
438
439
440 func addExperimentalData(expData map[event.Experiment]*ExperimentalData, b batch) error {
441 if b.exp == event.NoExperiment {
442 return fmt.Errorf("internal error: addExperimentalData called on non-experimental batch")
443 }
444 ed, ok := expData[b.exp]
445 if !ok {
446 ed = new(ExperimentalData)
447 expData[b.exp] = ed
448 }
449 ed.Batches = append(ed.Batches, ExperimentalBatch{
450 Thread: b.m,
451 Data: b.data,
452 })
453 return nil
454 }
455
View as plain text