1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "fmt"
10 "io"
11 "slices"
12 "strings"
13
14 "internal/trace/event/go122"
15 "internal/trace/internal/oldtrace"
16 "internal/trace/version"
17 )
18
19
20 type Reader struct {
21 r *bufio.Reader
22 lastTs Time
23 gen *generation
24 spill *spilledBatch
25 spillErr error
26 frontier []*batchCursor
27 cpuSamples []cpuSample
28 order ordering
29 emittedSync bool
30
31 go121Events *oldTraceConverter
32 }
33
34
35 func NewReader(r io.Reader) (*Reader, error) {
36 br := bufio.NewReader(r)
37 v, err := version.ReadHeader(br)
38 if err != nil {
39 return nil, err
40 }
41 switch v {
42 case version.Go111, version.Go119, version.Go121:
43 tr, err := oldtrace.Parse(br, v)
44 if err != nil {
45 return nil, err
46 }
47 return &Reader{
48 go121Events: convertOldFormat(tr),
49 }, nil
50 case version.Go122, version.Go123:
51 return &Reader{
52 r: br,
53 order: ordering{
54 mStates: make(map[ThreadID]*mState),
55 pStates: make(map[ProcID]*pState),
56 gStates: make(map[GoID]*gState),
57 activeTasks: make(map[TaskID]taskState),
58 },
59
60 emittedSync: true,
61 }, nil
62 default:
63 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
64 }
65 }
66
67
68
69
70
71 func (r *Reader) ReadEvent() (e Event, err error) {
72 if r.go121Events != nil {
73 ev, err := r.go121Events.next()
74 if err != nil {
75
76 return Event{}, err
77 }
78 return ev, nil
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 defer func() {
99 if err != nil {
100 return
101 }
102 if err = e.validateTableIDs(); err != nil {
103 return
104 }
105 if e.base.time <= r.lastTs {
106 e.base.time = r.lastTs + 1
107 }
108 r.lastTs = e.base.time
109 }()
110
111
112 if ev, ok := r.order.Next(); ok {
113 return ev, nil
114 }
115
116
117 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
118 if !r.emittedSync {
119 r.emittedSync = true
120 return syncEvent(r.gen.evTable, r.lastTs), nil
121 }
122 if r.spillErr != nil {
123 return Event{}, r.spillErr
124 }
125 if r.gen != nil && r.spill == nil {
126
127
128
129
130
131 return Event{}, io.EOF
132 }
133
134 var err error
135 r.gen, r.spill, err = readGeneration(r.r, r.spill)
136 if r.gen == nil {
137 return Event{}, err
138 }
139 r.spillErr = err
140
141
142 r.cpuSamples = r.gen.cpuSamples
143
144
145 for _, m := range r.gen.batchMs {
146 batches := r.gen.batches[m]
147 bc := &batchCursor{m: m}
148 ok, err := bc.nextEvent(batches, r.gen.freq)
149 if err != nil {
150 return Event{}, err
151 }
152 if !ok {
153
154 continue
155 }
156 r.frontier = heapInsert(r.frontier, bc)
157 }
158
159
160 r.emittedSync = false
161 }
162 tryAdvance := func(i int) (bool, error) {
163 bc := r.frontier[i]
164
165 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
166 return ok, err
167 }
168
169
170 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
171 if err != nil {
172 return false, err
173 }
174 if ok {
175
176 heapUpdate(r.frontier, i)
177 } else {
178
179 r.frontier = heapRemove(r.frontier, i)
180 }
181 return true, nil
182 }
183
184 if len(r.cpuSamples) != 0 {
185 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
186 e := r.cpuSamples[0].asEvent(r.gen.evTable)
187 r.cpuSamples = r.cpuSamples[1:]
188 return e, nil
189 }
190 }
191
192
193 if len(r.frontier) == 0 {
194 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
195 }
196 if ok, err := tryAdvance(0); err != nil {
197 return Event{}, err
198 } else if !ok {
199
200
201
202
203
204 slices.SortFunc(r.frontier, (*batchCursor).compare)
205 success := false
206 for i := 1; i < len(r.frontier); i++ {
207 if ok, err = tryAdvance(i); err != nil {
208 return Event{}, err
209 } else if ok {
210 success = true
211 break
212 }
213 }
214 if !success {
215 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
216 }
217 }
218
219
220 ev, ok := r.order.Next()
221 if !ok {
222 panic("invariant violation: advance successful, but queue is empty")
223 }
224 return ev, nil
225 }
226
227 func dumpFrontier(frontier []*batchCursor) string {
228 var sb strings.Builder
229 for _, bc := range frontier {
230 spec := go122.Specs()[bc.ev.typ]
231 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
232 for i, arg := range spec.Args[1:] {
233 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
234 }
235 fmt.Fprintf(&sb, "]\n")
236 }
237 return sb.String()
238 }
239
View as plain text