8
8
"github.com/Debian/dcs/cmd/dcs-web/common"
9
9
"github.com/Debian/dcs/cmd/dcs-web/search"
10
10
dcsregexp "github.com/Debian/dcs/regexp"
11
+ "github.com/Debian/dcs/stringpool"
11
12
"github.com/influxdb/influxdb-go"
13
+ "hash/fnv"
12
14
"io"
13
15
"log"
14
16
"math"
@@ -115,6 +117,38 @@ func (s ByRanking) Swap(i, j int) {
115
117
s [i ], s [j ] = s [j ], s [i ]
116
118
}
117
119
120
+ type resultPointer struct {
121
+ backendidx int
122
+ ranking float32
123
+ offset int64
124
+ length int64
125
+
126
+ // Used as a tie-breaker when sorting by ranking to guarantee stable
127
+ // results, independent of the order in which the results are returned from
128
+ // source backends.
129
+ pathHash uint64
130
+
131
+ // Used for per-package results. Points into a stringpool.StringPool
132
+ packageName * string
133
+ }
134
+
135
+ type pointerByRanking []resultPointer
136
+
137
+ func (s pointerByRanking ) Len () int {
138
+ return len (s )
139
+ }
140
+
141
+ func (s pointerByRanking ) Less (i , j int ) bool {
142
+ if s [i ].ranking == s [j ].ranking {
143
+ return s [i ].pathHash > s [j ].pathHash
144
+ }
145
+ return s [i ].ranking > s [j ].ranking
146
+ }
147
+
148
+ func (s pointerByRanking ) Swap (i , j int ) {
149
+ s [i ], s [j ] = s [j ], s [i ]
150
+ }
151
+
118
152
type queryState struct {
119
153
started time.Time
120
154
events []event
@@ -132,9 +166,12 @@ type queryState struct {
132
166
resultPages int
133
167
numResults int
134
168
135
- // TODO: this will be deleted once we write everything directly to disk
136
- allResults []Result
137
- allResultsMu * sync.Mutex
169
+ // One file per backend, containing JSON-serialized results. When writing,
170
+ // we keep the offsets, so that we can later sort the pointers and write
171
+ // the resulting files.
172
+ tempFiles []* os.File
173
+ packagePool * stringpool.StringPool
174
+ resultPointers []resultPointer
138
175
139
176
allPackages map [string ]bool
140
177
allPackagesSorted []string
@@ -198,7 +235,7 @@ func queryBackend(queryid string, backend string, backendidx int, query string)
198
235
}
199
236
}
200
237
if r .Type == "result" {
201
- storeResult (queryid , r )
238
+ storeResult (queryid , backendidx , r )
202
239
} else if r .Type == "progress" {
203
240
storeProgress (queryid , backendidx , r )
204
241
}
@@ -214,7 +251,7 @@ func maybeStartQuery(queryid, src, query string) bool {
214
251
// XXX: Starting a new query while there may still be clients reading that
215
252
// query is not a great idea. Best fix may be to make getEvent() use a
216
253
// querystate instead of the string identifier.
217
- if ! running || time .Since (querystate .started ) > 15 * time .Minute {
254
+ if ! running || time .Since (querystate .started ) > 30 * time .Minute {
218
255
backends := strings .Split (* common .SourceBackends , "," )
219
256
state [queryid ] = queryState {
220
257
started : time .Now (),
@@ -224,13 +261,32 @@ func maybeStartQuery(queryid, src, query string) bool {
224
261
filesTotal : make ([]int , len (backends )),
225
262
filesProcessed : make ([]int , len (backends )),
226
263
filesMu : & sync.Mutex {},
227
- allResults : make ([]Result , 0 ),
228
- allResultsMu : & sync.Mutex {},
264
+ tempFiles : make ([]* os.File , len (backends )),
229
265
allPackages : make (map [string ]bool ),
230
266
allPackagesMu : & sync.Mutex {},
267
+ packagePool : stringpool .NewStringPool (),
268
+ }
269
+
270
+ var err error
271
+ dir := filepath .Join (* queryResultsPath , queryid )
272
+ if err := os .MkdirAll (dir , os .FileMode (0755 )); err != nil {
273
+ // TODO: mark the query as failed
274
+ log .Printf ("[%s] could not create %q: %v\n " , queryid , dir , err )
275
+ return false
231
276
}
277
+
278
+ // TODO: it’d be so much better if we would correctly handle ESPACE errors
279
+ // in the code below (and above), but for that we need to carefully test it.
280
+ ensureEnoughSpaceAvailable ()
281
+
232
282
for i := 0 ; i < len (backends ); i ++ {
233
283
state [queryid ].filesTotal [i ] = - 1
284
+ path := filepath .Join (dir , fmt .Sprintf ("unsorted_%d.json" , i ))
285
+ state [queryid ].tempFiles [i ], err = os .Create (path )
286
+ if err != nil {
287
+ log .Printf ("[%s] could not create %q: %v\n " , queryid , path , err )
288
+ // TODO: mark query as failed
289
+ }
234
290
}
235
291
log .Printf ("initial results = %v\n " , state [queryid ])
236
292
for idx , backend := range backends {
@@ -260,7 +316,7 @@ func sendPaginationUpdate(queryid string, s queryState) {
260
316
}
261
317
}
262
318
263
- func storeResult (queryid string , result Result ) {
319
+ func storeResult (queryid string , backendidx int , result Result ) {
264
320
result .Type = "result"
265
321
266
322
result .Package = result .Path [:strings .Index (result .Path , "_" )]
@@ -269,8 +325,6 @@ func storeResult(queryid string, result Result) {
269
325
// for the top 10 at all.
270
326
s := state [queryid ]
271
327
272
- log .Printf ("[%s] (currently %d) result %v\n " , queryid , len (s .allResults ), result )
273
-
274
328
if s .FirstPathRank > 0 {
275
329
// Now store the combined ranking of PathRanking (pre) and Ranking (post).
276
330
// We add the values because they are both percentages.
@@ -302,24 +356,51 @@ func storeResult(queryid string, result Result) {
302
356
addEventMarshal (queryid , & result )
303
357
}
304
358
305
- // TODO: as a first POC, keep all results in memory, sort them, write them out to files.
359
+ tmpOffset , err := state [queryid ].tempFiles [backendidx ].Seek (0 , os .SEEK_CUR )
360
+ if err != nil {
361
+ log .Printf ("[%s] could not seek: %v\n " , queryid , err )
362
+ // TODO: mark query as failed
363
+ return
364
+ }
365
+
366
+ if err := json .NewEncoder (s .tempFiles [backendidx ]).Encode (result ); err != nil {
367
+ log .Printf ("[%s] could not write %v: %v\n " , queryid , result , err )
368
+ // TODO: mark query as failed
369
+ }
370
+
371
+ offsetAfterWriting , err := state [queryid ].tempFiles [backendidx ].Seek (0 , os .SEEK_CUR )
372
+ if err != nil {
373
+ log .Printf ("[%s] could not seek: %v\n " , queryid , err )
374
+ // TODO: mark query as failed
375
+ return
376
+ }
377
+
378
+ h := fnv .New64 ()
379
+ io .WriteString (h , result .Path )
380
+
306
381
stateMu .Lock ()
307
382
s = state [queryid ]
308
- s .allResults = append (s .allResults , result )
383
+ s .resultPointers = append (s .resultPointers , resultPointer {
384
+ backendidx : backendidx ,
385
+ ranking : result .Ranking ,
386
+ offset : tmpOffset ,
387
+ length : offsetAfterWriting - tmpOffset ,
388
+ pathHash : h .Sum64 (),
389
+ packageName : s .packagePool .Get (result .Package )})
309
390
s .allPackages [result .Package ] = true
310
391
s .numResults ++
311
392
state [queryid ] = s
312
393
stateMu .Unlock ()
313
-
314
- // TODO: write the result to disk, no matter what
315
- // TODO: eventually, we’ll want to write it to unsorted.json and sort it afterwards. we could do that by reading through the file, storing (ranking, file_offset) tuples, sorting them, then writing out the sorted files. note that we can even store the (ranking, file_offset) tuples at the time when the results come in.
316
394
}
317
395
318
396
func finishQuery (queryid string ) {
319
397
log .Printf ("[%s] done, closing all client channels.\n " , queryid )
320
398
stateMu .Lock ()
321
399
s := state [queryid ]
322
400
s .done = true
401
+ for _ , f := range s .tempFiles {
402
+ f .Close ()
403
+ }
323
404
state [queryid ] = s
324
405
stateMu .Unlock ()
325
406
addEvent (queryid , []byte {}, nil )
@@ -414,18 +495,48 @@ func ensureEnoughSpaceAvailable() {
414
495
}
415
496
}
416
497
498
+ func createFromPointers (queryid string , name string , pointers []resultPointer ) error {
499
+ log .Printf ("[%s] writing %q\n " , queryid , name )
500
+ f , err := os .Create (name )
501
+ if err != nil {
502
+ return err
503
+ }
504
+ defer f .Close ()
505
+ if _ , err := f .Write ([]byte ("[" )); err != nil {
506
+ return err
507
+ }
508
+ for idx , pointer := range pointers {
509
+ src := state [queryid ].tempFiles [pointer .backendidx ]
510
+ if _ , err := src .Seek (pointer .offset , os .SEEK_SET ); err != nil {
511
+ return err
512
+ }
513
+ if idx > 0 {
514
+ if _ , err := f .Write ([]byte ("," )); err != nil {
515
+ return err
516
+ }
517
+ }
518
+ if _ , err := io .CopyN (f , src , pointer .length ); err != nil {
519
+ return err
520
+ }
521
+ }
522
+ if _ , err := f .Write ([]byte ("]\n " )); err != nil {
523
+ return err
524
+ }
525
+ return nil
526
+ }
527
+
417
528
func writeToDisk (queryid string ) {
418
529
// Get the slice with results and unset it on the state so that processing can continue.
419
530
stateMu .Lock ()
420
531
s := state [queryid ]
421
- results := s .allResults
422
- if len (results ) == 0 {
532
+ pointers := s .resultPointers
533
+ if len (pointers ) == 0 {
423
534
log .Printf ("[%s] not writing, no results.\n " , queryid )
424
535
stateMu .Unlock ()
425
536
finishQuery (queryid )
426
537
return
427
538
}
428
- s .allResults = make ([] Result , 0 )
539
+ s .resultPointers = nil
429
540
idx := 0
430
541
packages := make ([]string , len (s .allPackages ))
431
542
// TODO: sort by ranking as soon as we store the best ranking with each package. (at the moment it’s first result, first stored)
@@ -437,10 +548,10 @@ func writeToDisk(queryid string) {
437
548
state [queryid ] = s
438
549
stateMu .Unlock ()
439
550
440
- log .Printf ("[%s] writing, %d results.\n " , queryid , len (results ))
551
+ log .Printf ("[%s] writing, %d results.\n " , queryid , len (pointers ))
441
552
log .Printf ("[%s] packages: %v\n " , queryid , packages )
442
553
443
- sort .Sort (ByRanking ( results ))
554
+ sort .Sort (pointerByRanking ( pointers ))
444
555
445
556
resultsPerPage := 10
446
557
dir := filepath .Join (* queryResultsPath , queryid )
@@ -466,57 +577,40 @@ func writeToDisk(queryid string) {
466
577
}
467
578
f .Close ()
468
579
469
- pages := int (math .Ceil (float64 (len (results )) / float64 (resultsPerPage )))
580
+ pages := int (math .Ceil (float64 (len (pointers )) / float64 (resultsPerPage )))
470
581
for page := 0 ; page < pages ; page ++ {
471
582
start := page * resultsPerPage
472
583
end := (page + 1 ) * resultsPerPage
473
- if end > len (results ) {
474
- end = len (results )
584
+ if end > len (pointers ) {
585
+ end = len (pointers )
475
586
}
587
+
476
588
name := filepath .Join (dir , fmt .Sprintf ("page_%d.json" , page ))
477
- log .Printf ("[%s] writing %q\n " , queryid , name )
478
- f , err := os .Create (name )
479
- if err != nil {
480
- log .Printf ("[%s] could not create %q: %v\n " , queryid , f , err )
589
+ if err := createFromPointers (queryid , name , pointers [start :end ]); err != nil {
590
+ log .Printf ("[%s] could not create %q from pointers: %v\n " , queryid , name , err )
481
591
// TODO: mark query as failed
482
592
return
483
593
}
484
- encoder := json .NewEncoder (f )
485
- if err := encoder .Encode (results [start :end ]); err != nil {
486
- log .Printf ("[%s] could not write %v: %v\n " , queryid , results [start :end ], err )
487
- // TODO: mark query as failed
488
- return
489
- }
490
- // We don’t use defer f.Close() because that would only be executed once the function returns.
491
- f .Close ()
492
594
}
493
595
494
596
// Now save the results into their package-specific files.
495
- bypkg := make (map [string ][]Result )
496
- for _ , result := range results {
497
- pkgresults := bypkg [result . Package ]
597
+ bypkg := make (map [string ][]resultPointer )
598
+ for _ , pointer := range pointers {
599
+ pkgresults := bypkg [* pointer . packageName ]
498
600
if len (pkgresults ) >= resultsPerPackage {
499
601
continue
500
602
}
501
- pkgresults = append (pkgresults , result )
502
- bypkg [result . Package ] = pkgresults
603
+ pkgresults = append (pkgresults , pointer )
604
+ bypkg [* pointer . packageName ] = pkgresults
503
605
}
504
606
505
607
for pkg , pkgresults := range bypkg {
506
608
name := filepath .Join (dir , fmt .Sprintf ("pkg_%s.json" , pkg ))
507
- log .Printf ("[%s] writing %q\n " , queryid , name )
508
- f , err := os .Create (name )
509
- if err != nil {
510
- log .Printf ("[%s] could not create %q: %v\n " , queryid , f , err )
609
+ if err := createFromPointers (queryid , name , pkgresults ); err != nil {
610
+ log .Printf ("[%s] could not create %q from pointers: %v\n " , queryid , name , err )
511
611
// TODO: mark query as failed
512
612
return
513
613
}
514
- if err := json .NewEncoder (f ).Encode (pkgresults ); err != nil {
515
- log .Printf ("[%s] could not write results: %v\n " , queryid , err )
516
- // TODO: mark query as failed
517
- return
518
- }
519
- f .Close ()
520
614
}
521
615
522
616
stateMu .Lock ()
0 commit comments