-
Notifications
You must be signed in to change notification settings - Fork 538
Expand file tree
/
Copy pathreader.go
More file actions
234 lines (195 loc) · 5.38 KB
/
reader.go
File metadata and controls
234 lines (195 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package tail
import (
"bufio"
"bytes"
"compress/bzip2"
"compress/gzip"
"compress/zlib"
"errors"
"io"
"os"
"unsafe"
"golang.org/x/text/encoding"
)
const defaultBufSize = 4096
// newReader creates a new reader that is used to read from file.
// It is important that the provided file is positioned at the start of the file.
func newReader(f *os.File, offset int64, enc encoding.Encoding, compression string) (*reader, error) {
rr, err := newReaderAt(f, compression, 0)
if err != nil {
return nil, err
}
br := bufio.NewReader(rr)
var bom BOM
offset, bom = detectBOM(br, offset)
enc = resolveEncodingFromBOM(bom, enc)
var (
decoder = enc.NewDecoder()
encoder = enc.NewEncoder()
)
nl, err := encodedNewline(encoder)
if err != nil {
return nil, err
}
cr, err := encodedCarriageReturn(encoder)
if err != nil {
return nil, err
}
if offset != 0 {
rr, err = newReaderAt(f, compression, offset)
if err != nil {
return nil, err
}
br.Reset(rr)
}
return &reader{
pos: offset,
br: br,
decoder: decoder,
nl: nl,
lastNl: nl[len(nl)-1],
cr: cr,
pending: make([]byte, 0, defaultBufSize),
}, nil
}
type reader struct {
pos int64
br *bufio.Reader
pending []byte
compression string
decoder *encoding.Decoder
nl []byte
lastNl byte
cr []byte
}
// next reads and returns the next complete line from the file.
// It will return EOF if there is no more data to read.
func (r *reader) next() (string, error) {
for {
// Read more data up until the last byte of nl.
chunk, err := r.br.ReadSlice(r.lastNl)
if len(chunk) > 0 {
r.pending = append(r.pending, chunk...)
if line, ok := r.consumeLine(); ok {
return r.decode(line)
}
}
// ReadSlice does not allocate; it returns a slice into bufio's buffer and advances
// the read position. If we did not find a full line or got ErrBufferFull, loop and call again.
if err != nil && !errors.Is(err, bufio.ErrBufferFull) {
return "", err
}
}
}
// flush returns any remaining buffered data as a line, even if it doesn't end with a newline.
// This should be used when reaching EOF to handle the final partial line in the file.
// Returns io.EOF if there is no pending data.
func (r *reader) flush() (string, error) {
if len(r.pending) == 0 {
return "", io.EOF
}
line := r.pending[:]
r.pos += int64(len(line))
r.pending = r.pending[:0]
return r.decode(bytes.TrimSuffix(line, r.nl))
}
func (r *reader) decode(line []byte) (string, error) {
// Decode the line we have consumed.
converted, err := r.decoder.Bytes(bytes.TrimSuffix(line, r.cr))
if err != nil {
return "", err
}
// It is safe to convert this into a string here because converter will always copy
// the bytes given to it, even Nop decoder will do that.
return unsafe.String(unsafe.SliceData(converted), len(converted)), nil
}
// consumeLine checks pending for the delimiter; if found, it splits
// pending into line and remainder.
func (r *reader) consumeLine() ([]byte, bool) {
// Check if pending contains a full line.
i := bytes.Index(r.pending, r.nl)
if i < 0 {
return nil, false
}
// Extract everything up until newline.
line := r.pending[:i]
// Reset pending. We never buffer beyond newline so it is safe to reset.
r.pending = r.pending[:0]
// Advance the position on bytes we have consumed as a full line.
r.pos += int64(len(line) + len(r.nl))
return line, true
}
// position returns the byte offset for completed lines,
// not necessarily all bytes consumed from the file.
func (r *reader) position() int64 {
return r.pos
}
// reset prepares the reader for a new file handle, assuming the same encoding.
// It is important that the provided file is positioned at the start of the file.
func (r *reader) reset(f *os.File, offset int64) error {
rr, err := newReaderAt(f, r.compression, 0)
if err != nil {
return err
}
r.br.Reset(rr)
offset, _ = detectBOM(r.br, offset)
if offset != 0 {
rr, err = newReaderAt(f, r.compression, offset)
if err != nil {
return nil
}
r.br.Reset(rr)
}
r.pos = offset
r.pending = make([]byte, 0, defaultBufSize)
return nil
}
func encodedNewline(e *encoding.Encoder) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := e.Transform(out, []byte{'\n'}, true)
return out[:nDst], err
}
func encodedCarriageReturn(e *encoding.Encoder) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := e.Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}
func newReaderAt(f *os.File, compression string, offset int64) (io.Reader, error) {
// NOTE: If compression is used we always need to read from the beginning.
if compression != "" && offset != 0 {
if _, err := f.Seek(0, io.SeekStart); err != nil {
return nil, err
}
}
var (
reader io.Reader
err error
)
switch compression {
case "gz":
reader, err = gzip.NewReader(f)
case "z":
reader, err = zlib.NewReader(f)
case "bz2":
reader = bzip2.NewReader(f)
default:
if offset != 0 {
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return nil, err
}
}
reader = f
}
if err != nil {
return nil, err
}
// NOTE: If compression is used there is no easy way to seek to correct offset in the file
// because the offset we store is for uncompressed data. Instead we can discard until the correct
// offset.
if compression != "" && offset != 0 {
if _, err := io.CopyN(io.Discard, reader, offset); err != nil {
return nil, err
}
}
return reader, nil
}