-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
118 lines (100 loc) · 3.14 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
'use strict'
var pull = require('pull-stream')
var mfr = require('map-filter-reduce')
var Flatmap = require('pull-flatmap')
var FlumeViewLevel = require('flumeview-level')
var query = require('./query')
var select = require('./select')
var u = require('./util')
var isArray = Array.isArray
// sorted index.
module.exports = function (indexes, emitLinks, version) {
if (!emitLinks) { emitLinks = function (data, emit) { emit(data) } }
function getIndexes (data, seq) {
var A = []
indexes.forEach(function (index) {
var a = [index.key]
for (var i = 0; i < index.value.length; i++) {
var key = index.value[i]
if (!u.has(key, data)) return
a.push(u.get(key, data))
}
a.push(seq)
A.push(a)
})
return A
}
var create = FlumeViewLevel(version || 2, function (value, seq) {
var A = []
emitLinks(value, function (value) {
A = A.concat(getIndexes(value, seq))
})
return A
})
return function (log, name) {
var index = create(log, name)
var _read = index.read
index.methods.explain = 'async'
index.explain = function (opts = {}, cb) {
var q, sort
if (isArray(opts.query)) {
q = opts.query[0].$filter || {}
sort = opts.query[opts.query.length - 1].$sort
if (sort) opts.query.pop()
} else if (opts.query) {
q = opts.query
} else { q = {} }
var index = sort ? u.findByPath(indexes, sort) : select(indexes, q)
if (sort && !index) return cb(new Error('could not sort by:' + JSON.stringify(sort)))
return cb(null, {
index
})
}
index.read = function (opts = {}) {
var q, sort
if (isArray(opts.query)) {
q = opts.query[0].$filter || {}
sort = opts.query[opts.query.length - 1].$sort
if (sort) opts.query.pop()
} else if (opts.query) {
q = opts.query
} else { q = {} }
var index = sort ? u.findByPath(indexes, sort) : select(indexes, q)
if (sort && !index) return pull.error(new Error('could not sort by:' + JSON.stringify(sort)))
if (!index) {
return pull(
log.stream({
values: true, seqs: false, live: opts.live, old: opts.old, limit: opts.limit, reverse: opts.reverse
}),
Flatmap(function (data) {
var emit = []
emitLinks(data, function (a) {
emit.push(a)
})
return emit
})
)
}
var _opts = query(index, q)
_opts.values = true
_opts.keys = true
_opts.reverse = !!opts.reverse
_opts.live = opts.live
_opts.old = opts.old
_opts.sync = opts.sync
_opts.unlinkedValues = opts.unlinkedValues
return pull(
_read(_opts),
pull.map(function (data) {
if (data.sync) return data
var o = opts.unlinkedValues ? data.value : {}
for (var i = 0; i < index.value.length; i++) { u.set(index.value[i], data.key[i + 1], o) }
return o
}),
isArray(opts.query) ? mfr(opts.query) : pull.through(),
opts.limit ? pull.take(opts.limit) : pull.through()
)
}
return index
}
}