Skip to content

Commit ab06f98

Browse files
committed
Revert "[pkg/stanza][fileconsumer] Remove/unexport deprecated fields (open-telemetry#24688)"
This reverts commit d4146f5.
1 parent e61d6ce commit ab06f98

18 files changed

+183
-224
lines changed

.chloggen/pkg-stanza-fileconsumer-deprecated.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

.chloggen/pkg-stanza-fileconsumer-deprecated2.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

.chloggen/pkg-stanza-fileconsumer-deprecated3.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

.chloggen/pkg-stanza-fileconsumer-deprecated4.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

.chloggen/pkg-stanza-fileconsumer-deprecated5.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

pkg/stanza/fileconsumer/attributes.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
5+
6+
import (
7+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
8+
)
9+
10+
// Deprecated: [v0.82.0] This will be removed in a future release, tentatively v0.84.0.
11+
type FileAttributes struct {
12+
Name string `json:"-"`
13+
Path string `json:"-"`
14+
NameResolved string `json:"-"`
15+
PathResolved string `json:"-"`
16+
HeaderAttributes map[string]any
17+
}
18+
19+
// HeaderAttributesCopy gives a copy of the HeaderAttributes, in order to restrict mutation of the HeaderAttributes.
20+
//
21+
// Deprecated: [v0.82.0] This will be removed in a future release, tentatively v0.84.0.
22+
func (f *FileAttributes) HeaderAttributesCopy() map[string]any {
23+
return util.MapCopy(f.HeaderAttributes)
24+
}

pkg/stanza/fileconsumer/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
165165
maxBatchFiles: c.MaxConcurrentFiles / 2,
166166
maxBatches: c.MaxBatches,
167167
deleteAfterRead: c.DeleteAfterRead,
168-
knownFiles: make([]*reader, 0, 10),
168+
knownFiles: make([]*Reader, 0, 10),
169169
seenPaths: make(map[string]struct{}, 100),
170170
}, nil
171171
}

pkg/stanza/fileconsumer/config_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ func TestUnmarshal(t *testing.T) {
164164
Expect: func() *mockOperatorConfig {
165165
cfg := NewConfig()
166166
cfg.OrderingCriteria.Regex = `err\.[a-zA-Z]\.\d+\.(?P<rotation_time>\d{10})\.log`
167-
cfg.OrderingCriteria.SortBy = []sortRuleImpl{
167+
cfg.OrderingCriteria.SortBy = []SortRuleImpl{
168168
{
169169
&TimestampSortRule{
170-
baseSortRule: baseSortRule{
170+
BaseSortRule: BaseSortRule{
171171
SortType: sortTypeTimestamp,
172172
RegexKey: "rotation_time",
173173
Ascending: true,
@@ -185,10 +185,10 @@ func TestUnmarshal(t *testing.T) {
185185
Expect: func() *mockOperatorConfig {
186186
cfg := NewConfig()
187187
cfg.OrderingCriteria.Regex = `err\.(?P<file_num>[a-zA-Z])\.\d+\.\d{10}\.log`
188-
cfg.OrderingCriteria.SortBy = []sortRuleImpl{
188+
cfg.OrderingCriteria.SortBy = []SortRuleImpl{
189189
{
190190
&NumericSortRule{
191-
baseSortRule: baseSortRule{
191+
BaseSortRule: BaseSortRule{
192192
SortType: sortTypeNumeric,
193193
RegexKey: "file_num",
194194
},
@@ -575,10 +575,10 @@ func TestBuild(t *testing.T) {
575575
{
576576
"BadOrderingCriteriaRegex",
577577
func(f *Config) {
578-
f.OrderingCriteria.SortBy = []sortRuleImpl{
578+
f.OrderingCriteria.SortBy = []SortRuleImpl{
579579
{
580580
&NumericSortRule{
581-
baseSortRule: baseSortRule{
581+
BaseSortRule: BaseSortRule{
582582
RegexKey: "value",
583583
SortType: sortTypeNumeric,
584584
},
@@ -593,10 +593,10 @@ func TestBuild(t *testing.T) {
593593
"BasicOrderingCriteriaTimetsamp",
594594
func(f *Config) {
595595
f.OrderingCriteria.Regex = ".*"
596-
f.OrderingCriteria.SortBy = []sortRuleImpl{
596+
f.OrderingCriteria.SortBy = []SortRuleImpl{
597597
{
598598
&TimestampSortRule{
599-
baseSortRule: baseSortRule{
599+
BaseSortRule: BaseSortRule{
600600
RegexKey: "value",
601601
SortType: sortTypeTimestamp,
602602
},
@@ -611,10 +611,10 @@ func TestBuild(t *testing.T) {
611611
"GoodOrderingCriteriaTimestamp",
612612
func(f *Config) {
613613
f.OrderingCriteria.Regex = ".*"
614-
f.OrderingCriteria.SortBy = []sortRuleImpl{
614+
f.OrderingCriteria.SortBy = []SortRuleImpl{
615615
{
616616
&TimestampSortRule{
617-
baseSortRule: baseSortRule{
617+
BaseSortRule: BaseSortRule{
618618
RegexKey: "value",
619619
SortType: sortTypeTimestamp,
620620
},

pkg/stanza/fileconsumer/file.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ const (
2525
logFilePathResolved = "log.file.path_resolved"
2626
)
2727

28+
// Deprecated: [v0.82.0] Use emit.Callback instead. This will be removed in a future release, tentatively v0.84.0.
29+
type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)
30+
2831
type Manager struct {
2932
*zap.SugaredLogger
3033
wg sync.WaitGroup
3134
cancel context.CancelFunc
3235

3336
readerFactory readerFactory
34-
finder MatchingCriteria
37+
finder Finder
3538
roller roller
3639
persister operator.Persister
3740

@@ -40,7 +43,7 @@ type Manager struct {
4043
maxBatchFiles int
4144
deleteAfterRead bool
4245

43-
knownFiles []*reader
46+
knownFiles []*Reader
4447
seenPaths map[string]struct{}
4548

4649
currentFps []*fingerprint.Fingerprint
@@ -56,7 +59,7 @@ func (m *Manager) Start(persister operator.Persister) error {
5659
return fmt.Errorf("read known files from database: %w", err)
5760
}
5861

59-
if files, err := m.finder.findFiles(); err != nil {
62+
if files, err := m.finder.FindFiles(); err != nil {
6063
m.Warnw("error occurred while finding files", "error", err.Error())
6164
} else if len(files) == 0 {
6265
m.Warnw("no files match the configured include patterns",
@@ -116,7 +119,7 @@ func (m *Manager) poll(ctx context.Context) {
116119
batchesProcessed := 0
117120

118121
// Get the list of paths on disk
119-
matches, err := m.finder.findFiles()
122+
matches, err := m.finder.FindFiles()
120123
if err != nil {
121124
m.Errorf("error finding files: %s", err)
122125
}
@@ -139,7 +142,7 @@ func (m *Manager) poll(ctx context.Context) {
139142

140143
func (m *Manager) consume(ctx context.Context, paths []string) {
141144
m.Debug("Consuming files")
142-
readers := make([]*reader, 0, len(paths))
145+
readers := make([]*Reader, 0, len(paths))
143146
for _, path := range paths {
144147
r := m.makeReader(path)
145148
if r != nil {
@@ -153,9 +156,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
153156
m.roller.readLostFiles(ctx, readers)
154157

155158
var wg sync.WaitGroup
156-
for _, r := range readers {
159+
for _, reader := range readers {
157160
wg.Add(1)
158-
go func(r *reader) {
161+
go func(r *Reader) {
159162
defer wg.Done()
160163
r.ReadToEnd(ctx)
161164
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
@@ -165,13 +168,13 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
165168
m.Errorf("could not delete %s", r.file.Name())
166169
}
167170
}
168-
}(r)
171+
}(reader)
169172
}
170173
wg.Wait()
171174

172175
// Save off any files that were not fully read
173176
if m.deleteAfterRead {
174-
unfinished := make([]*reader, 0, len(readers))
177+
unfinished := make([]*Reader, 0, len(readers))
175178
for _, r := range readers {
176179
if !r.eof {
177180
unfinished = append(unfinished, r)
@@ -239,7 +242,7 @@ func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
239242
// makeReader take a file path, then creates reader,
240243
// discarding any that have a duplicate fingerprint to other files that have already
241244
// been read this polling interval
242-
func (m *Manager) makeReader(path string) *reader {
245+
func (m *Manager) makeReader(path string) *Reader {
243246
// Open the files first to minimize the time between listing and opening
244247
fp, file := m.makeFingerprint(path)
245248
if fp == nil {
@@ -271,7 +274,7 @@ func (m *Manager) clearCurrentFingerprints() {
271274
// saveCurrent adds the readers from this polling interval to this list of
272275
// known files, then increments the generation of all tracked old readers
273276
// before clearing out readers that have existed for 3 generations.
274-
func (m *Manager) saveCurrent(readers []*reader) {
277+
func (m *Manager) saveCurrent(readers []*Reader) {
275278
// Add readers from the current, completed poll interval to the list of known files
276279
m.knownFiles = append(m.knownFiles, readers...)
277280

@@ -287,7 +290,7 @@ func (m *Manager) saveCurrent(readers []*reader) {
287290
}
288291
}
289292

290-
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) {
293+
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
291294
// Check if the new path has the same fingerprint as an old path
292295
if oldReader, ok := m.findFingerprintMatch(fp); ok {
293296
return m.readerFactory.copy(oldReader, file)
@@ -297,7 +300,7 @@ func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader
297300
return m.readerFactory.newReader(file, fp)
298301
}
299302

300-
func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader, bool) {
303+
func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*Reader, bool) {
301304
// Iterate backwards to match newest first
302305
for i := len(m.knownFiles) - 1; i >= 0; i-- {
303306
oldReader := m.knownFiles[i]
@@ -344,7 +347,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
344347
}
345348

346349
if encoded == nil {
347-
m.knownFiles = make([]*reader, 0, 10)
350+
m.knownFiles = make([]*Reader, 0, 10)
348351
return nil
349352
}
350353

@@ -362,7 +365,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
362365
}
363366

364367
// Decode each of the known files
365-
m.knownFiles = make([]*reader, 0, knownFileCount)
368+
m.knownFiles = make([]*Reader, 0, knownFileCount)
366369
for i := 0; i < knownFileCount; i++ {
367370
// Only the offset, fingerprint, and splitter
368371
// will be used before this reader is discarded

pkg/stanza/fileconsumer/file_sort.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type sortRule interface {
2727
sort(re *regexp.Regexp, files []string) ([]string, error)
2828
}
2929

30-
func (sr *sortRuleImpl) Unmarshal(component *confmap.Conf) error {
30+
func (sr *SortRuleImpl) Unmarshal(component *confmap.Conf) error {
3131
if !component.IsSet("sort_type") {
3232
return fmt.Errorf("missing required field 'sort_type'")
3333
}

pkg/stanza/fileconsumer/file_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -731,10 +731,10 @@ func TestMultiFileSort(t *testing.T) {
731731
cfg := NewConfig().includeDir(tempDir)
732732
cfg.StartAt = "beginning"
733733
cfg.MatchingCriteria.OrderingCriteria.Regex = `.*(?P<value>\d)`
734-
cfg.MatchingCriteria.OrderingCriteria.SortBy = []sortRuleImpl{
734+
cfg.MatchingCriteria.OrderingCriteria.SortBy = []SortRuleImpl{
735735
{
736736
&NumericSortRule{
737-
baseSortRule: baseSortRule{
737+
BaseSortRule: BaseSortRule{
738738
RegexKey: `value`,
739739
},
740740
},
@@ -765,10 +765,10 @@ func TestMultiFileSortTimestamp(t *testing.T) {
765765
cfg := NewConfig().includeDir(tempDir)
766766
cfg.StartAt = "beginning"
767767
cfg.MatchingCriteria.OrderingCriteria.Regex = `.(?P<value>\d{10})\.log`
768-
cfg.MatchingCriteria.OrderingCriteria.SortBy = []sortRuleImpl{
768+
cfg.MatchingCriteria.OrderingCriteria.SortBy = []SortRuleImpl{
769769
{
770770
&TimestampSortRule{
771-
baseSortRule: baseSortRule{
771+
BaseSortRule: BaseSortRule{
772772
RegexKey: `value`,
773773
SortType: "timestamp",
774774
},

0 commit comments

Comments
 (0)