@@ -88,35 +88,35 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
88
88
}
89
89
90
90
// Copy creates a deep copy of a Reader
91
- func (f * Reader ) Copy (file * os.File ) (* Reader , error ) {
92
- reader , err := f .fileInput .NewReader (f .fileAttributes .Path , file , f .Fingerprint .Copy ())
91
+ func (r * Reader ) Copy (file * os.File ) (* Reader , error ) {
92
+ reader , err := r .fileInput .NewReader (r .fileAttributes .Path , file , r .Fingerprint .Copy ())
93
93
if err != nil {
94
94
return nil , err
95
95
}
96
- reader .Offset = f .Offset
96
+ reader .Offset = r .Offset
97
97
return reader , nil
98
98
}
99
99
100
100
// InitializeOffset sets the starting offset
101
- func (f * Reader ) InitializeOffset (startAtBeginning bool ) error {
101
+ func (r * Reader ) InitializeOffset (startAtBeginning bool ) error {
102
102
if ! startAtBeginning {
103
- info , err := f .file .Stat ()
103
+ info , err := r .file .Stat ()
104
104
if err != nil {
105
105
return fmt .Errorf ("stat: %s" , err )
106
106
}
107
- f .Offset = info .Size ()
107
+ r .Offset = info .Size ()
108
108
}
109
109
return nil
110
110
}
111
111
112
112
// ReadToEnd will read until the end of the file
113
- func (f * Reader ) ReadToEnd (ctx context.Context ) {
114
- if _ , err := f .file .Seek (f .Offset , 0 ); err != nil {
115
- f .Errorw ("Failed to seek" , zap .Error (err ))
113
+ func (r * Reader ) ReadToEnd (ctx context.Context ) {
114
+ if _ , err := r .file .Seek (r .Offset , 0 ); err != nil {
115
+ r .Errorw ("Failed to seek" , zap .Error (err ))
116
116
return
117
117
}
118
118
119
- scanner := NewPositionalScanner (f , f .fileInput .MaxLogSize , f .Offset , f .fileInput .SplitFunc )
119
+ scanner := NewPositionalScanner (r , r .fileInput .MaxLogSize , r .Offset , r .fileInput .SplitFunc )
120
120
121
121
// Iterate over the tokenized file, emitting entries as we go
122
122
for {
@@ -129,76 +129,76 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
129
129
ok := scanner .Scan ()
130
130
if ! ok {
131
131
if err := getScannerError (scanner ); err != nil {
132
- f .Errorw ("Failed during scan" , zap .Error (err ))
132
+ r .Errorw ("Failed during scan" , zap .Error (err ))
133
133
}
134
134
break
135
135
}
136
136
137
- if err := f .emit (ctx , scanner .Bytes ()); err != nil {
138
- f .Error ("Failed to emit entry" , zap .Error (err ))
137
+ if err := r .emit (ctx , scanner .Bytes ()); err != nil {
138
+ r .Error ("Failed to emit entry" , zap .Error (err ))
139
139
}
140
- f .Offset = scanner .Pos ()
140
+ r .Offset = scanner .Pos ()
141
141
}
142
142
}
143
143
144
144
// Close will close the file
145
- func (f * Reader ) Close () {
146
- if f .file != nil {
147
- if err := f .file .Close (); err != nil {
148
- f .Debugf ("Problem closing reader" , "Error" , err .Error ())
145
+ func (r * Reader ) Close () {
146
+ if r .file != nil {
147
+ if err := r .file .Close (); err != nil {
148
+ r .Debugf ("Problem closing reader" , "Error" , err .Error ())
149
149
}
150
150
}
151
151
}
152
152
153
153
// Emit creates an entry with the decoded message and sends it to the next
154
154
// operator in the pipeline
155
- func (f * Reader ) emit (ctx context.Context , msgBuf []byte ) error {
155
+ func (r * Reader ) emit (ctx context.Context , msgBuf []byte ) error {
156
156
// Skip the entry if it's empty
157
157
if len (msgBuf ) == 0 {
158
158
return nil
159
159
}
160
160
161
- msg , err := f .decode (msgBuf )
161
+ msg , err := r .decode (msgBuf )
162
162
if err != nil {
163
163
return fmt .Errorf ("decode: %s" , err )
164
164
}
165
165
166
- e , err := f .fileInput .NewEntry (msg )
166
+ e , err := r .fileInput .NewEntry (msg )
167
167
if err != nil {
168
168
return fmt .Errorf ("create entry: %s" , err )
169
169
}
170
170
171
- if err := e .Set (f .fileInput .FilePathField , f .fileAttributes .Path ); err != nil {
171
+ if err := e .Set (r .fileInput .FilePathField , r .fileAttributes .Path ); err != nil {
172
172
return err
173
173
}
174
- if err := e .Set (f .fileInput .FileNameField , f .fileAttributes .Name ); err != nil {
174
+ if err := e .Set (r .fileInput .FileNameField , r .fileAttributes .Name ); err != nil {
175
175
return err
176
176
}
177
177
178
- if err := e .Set (f .fileInput .FilePathResolvedField , f .fileAttributes .ResolvedPath ); err != nil {
178
+ if err := e .Set (r .fileInput .FilePathResolvedField , r .fileAttributes .ResolvedPath ); err != nil {
179
179
return err
180
180
}
181
181
182
- if err := e .Set (f .fileInput .FileNameResolvedField , f .fileAttributes .ResolvedName ); err != nil {
182
+ if err := e .Set (r .fileInput .FileNameResolvedField , r .fileAttributes .ResolvedName ); err != nil {
183
183
return err
184
184
}
185
185
186
- f .fileInput .Write (ctx , e )
186
+ r .fileInput .Write (ctx , e )
187
187
return nil
188
188
}
189
189
190
190
// decode converts the bytes in msgBuf to utf-8 from the configured encoding
191
- func (f * Reader ) decode (msgBuf []byte ) (string , error ) {
191
+ func (r * Reader ) decode (msgBuf []byte ) (string , error ) {
192
192
for {
193
- f .decoder .Reset ()
194
- nDst , _ , err := f .decoder .Transform (f .decodeBuffer , msgBuf , true )
193
+ r .decoder .Reset ()
194
+ nDst , _ , err := r .decoder .Transform (r .decodeBuffer , msgBuf , true )
195
195
if err != nil && err == transform .ErrShortDst {
196
- f .decodeBuffer = make ([]byte , len (f .decodeBuffer )* 2 )
196
+ r .decodeBuffer = make ([]byte , len (r .decodeBuffer )* 2 )
197
197
continue
198
198
} else if err != nil {
199
199
return "" , fmt .Errorf ("transform encoding: %s" , err )
200
200
}
201
- return string (f .decodeBuffer [:nDst ]), nil
201
+ return string (r .decodeBuffer [:nDst ]), nil
202
202
}
203
203
}
204
204
@@ -213,13 +213,13 @@ func getScannerError(scanner *PositionalScanner) error {
213
213
}
214
214
215
215
// Read from the file and update the fingerprint if necessary
216
- func (f * Reader ) Read (dst []byte ) (int , error ) {
217
- if len (f .Fingerprint .FirstBytes ) == f .fileInput .fingerprintSize {
218
- return f .file .Read (dst )
216
+ func (r * Reader ) Read (dst []byte ) (int , error ) {
217
+ if len (r .Fingerprint .FirstBytes ) == r .fileInput .fingerprintSize {
218
+ return r .file .Read (dst )
219
219
}
220
- n , err := f .file .Read (dst )
221
- appendCount := min0 (n , f .fileInput .fingerprintSize - int (f .Offset ))
222
- f .Fingerprint .FirstBytes = append (f .Fingerprint .FirstBytes [:f .Offset ], dst [:appendCount ]... )
220
+ n , err := r .file .Read (dst )
221
+ appendCount := min0 (n , r .fileInput .fingerprintSize - int (r .Offset ))
222
+ r .Fingerprint .FirstBytes = append (r .Fingerprint .FirstBytes [:r .Offset ], dst [:appendCount ]... )
223
223
return n , err
224
224
}
225
225
0 commit comments