-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstream.js
More file actions
67 lines (56 loc) · 1.74 KB
/
stream.js
File metadata and controls
67 lines (56 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
var ReadStream = require("read-stream")
, LevelWriteStream = require("level-write-stream")
, extend = require("xtend")
, getOptions = require("./utils/getOptions")
, toKeyBuffer = require("level-encoding/toKeyBuffer")
, makeStreamData = require("level-encoding/makeStreamData")
module.exports = Streams
function Streams(onReady, db, defaults) {
return {
readStream: readStream
, writeStream: LevelWriteStream(db)
, keyStream: keyStream
, valueStream: valueStream
}
function readStream(options) {
options = getOptions(defaults, options)
var start = options.start
, end = options.end
, range = null
, queue = ReadStream()
, stream = queue.stream
onReady(_open)
return stream
function _open(idb) {
if (start || end) {
range = idb.makeKeyRange({
lower: toKeyBuffer(start, options)
, upper: toKeyBuffer(end, options)
})
}
idb.iterate(function onItem(value) {
queue.push(makeStreamData(value, options))
}, extend({
keyRange: range
, order: options.reverse ? "DESC" : "ASC"
, onEnd: queue.end
, onError: emit
}, options))
}
}
function keyStream(options) {
return readStream(extend(options || {}, {
keys: true
, values: false
}))
}
function valueStream(options) {
return readStream(extend(options || {}, {
keys: false
, values: true
}))
}
function emit(err) {
db.emit("error", err)
}
}