Skip to content

Commit f1df1ea

Browse files
authored
Merge branch 'main' into filelogreceiver-line-numbers
2 parents b6c56e4 + eb40c47 commit f1df1ea

23 files changed

+475
-233
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: filelogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for gzip compressed log files
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [2328]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

CONTRIBUTING.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,15 @@ and its contributors.
164164
available configuration settings so users can copy and modify them as needed.
165165
- Run `make crosslink` to update intra-repository dependencies. It will add a `replace` directive to `go.mod` file of every intra-repository dependant. This is necessary for your component to be included in the contrib executable.
166166
- Add your component to `versions.yaml`.
167-
- All components included in the distribution must be included in [`cmd/otelcontribcol/builder-config.yaml`](./cmd/otelcontribcol/builder-config.yaml)
168-
and in the respective testing harnesses. To align with the test goal of the project, components must be testable within the framework defined within
169-
the folder. If a component can not be properly tested within the existing framework, it must increase the non testable
170-
components number with a comment within the PR explaining as to why it can not be tested.
167+
- All components included in the distribution must be included in
168+
[`cmd/otelcontribcol/builder-config.yaml`](./cmd/otelcontribcol/builder-config.yaml)
169+
and in the respective testing harnesses. To align with the test goal of the
170+
project, components must be testable within the framework defined within the
171+
folder. If a component can not be properly tested within the existing
172+
framework, it must increase the non testable components number with a comment
173+
within the PR explaining as to why it can not be tested. **(Note: this does
174+
not automatically include any components in official release binaries. See
175+
[Releasing new components](#releasing-new-components).)**
171176
172177
- Create a `metadata.yaml` file with at minimum the required fields defined in [metadata-schema.yaml](https://github.com/open-telemetry/opentelemetry-collector/blob/main/cmd/mdatagen/metadata-schema.yaml).
173178
Here is a minimal representation:

pkg/ottl/ottlfuncs/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,3 +1400,24 @@ Functions should be named and formatted according to the following standards.
14001400
- Functions that interact with multiple items MUST have plurality in the name. Ex: `truncate_all`, `keep_keys`, `replace_all_matches`.
14011401
- Functions that interact with a single item MUST NOT have plurality in the name. If a function would interact with multiple items due to a condition, like `where`, it is still considered singular. Ex: `set`, `delete`, `replace_match`.
14021402
- Functions that change a specific target MUST set the target as the first parameter.
1403+
1404+
## Adding New Editors/Converters
1405+
1406+
Before raising a PR with a new Editor or Converter, raise an issue to verify its acceptance. While acceptance is strongly specific to a specific use case, consider these guidelines for early assessment.
1407+
1408+
Your proposal likely will be accepted if:
1409+
- The proposed functionality is missing,
1410+
- The proposed solution significantly improves user experience and readability for very common use cases,
1411+
- The proposed solution is more performant in cases where it is possible to achieve the same result with existing options.
1412+
1413+
It will be up for discussion if your proposal solves an issue that can be achieved in another way but does not improve user experience or performance.
1414+
1415+
Your proposal likely won't be accepted if:
1416+
- User experience is worse and assumes a highly technical user,
1417+
- The performance of your proposal very negatively affects the processing pipeline.
1418+
1419+
As with code, OTTL aims for readability first. This means:
1420+
- Using short, meaningful, and descriptive names,
1421+
- Ensuring naming consistency across Editors and Converters,
1422+
- Avoiding deep nesting to achieve desired transformations,
1423+
- Ensuring Editors and Converters have a single responsibility.

pkg/stanza/fileconsumer/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type Config struct {
8686
Header *HeaderConfig `mapstructure:"header,omitempty"`
8787
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
8888
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
89+
Compression string `mapstructure:"compression,omitempty"`
8990
}
9091

9192
type HeaderConfig struct {
@@ -168,6 +169,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
168169
HeaderConfig: hCfg,
169170
DeleteAtEOF: c.DeleteAfterRead,
170171
IncludeFileRecordNumber: c.IncludeFileRecordNumber,
172+
Compression: c.Compression,
171173
}
172174

173175
var t tracker.Tracker

pkg/stanza/fileconsumer/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,12 @@ func (c *Config) withHeader(headerMatchPattern, extractRegex string) *Config {
832832
return c
833833
}
834834

835+
// withGzipFileSuffix is a builder-like helper for quickly setting up support for gzip compressed log files
836+
func (c *Config) withGzip() *Config {
837+
c.Compression = "gzip"
838+
return c
839+
}
840+
835841
const mockOperatorType = "mock"
836842

837843
func init() {

pkg/stanza/fileconsumer/file_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package fileconsumer
55

66
import (
7+
"compress/gzip"
78
"context"
89
"fmt"
910
"os"
@@ -1528,3 +1529,68 @@ func symlinkTestCreateLogFile(t *testing.T, tempDir string, fileIdx, numLogLines
15281529
temp1.Close()
15291530
return tokens
15301531
}
1532+
1533+
// TestReadGzipCompressedLogsFromBeginning tests that, when starting from beginning of a gzip compressed file, we
1534+
// read all the lines that are already there
1535+
func TestReadGzipCompressedLogsFromBeginning(t *testing.T) {
1536+
t.Parallel()
1537+
1538+
tempDir := t.TempDir()
1539+
cfg := NewConfig().includeDir(tempDir).withGzip()
1540+
cfg.StartAt = "beginning"
1541+
operator, sink := testManager(t, cfg)
1542+
1543+
// Create a file, then start
1544+
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")
1545+
writer := gzip.NewWriter(temp)
1546+
1547+
_, err := writer.Write([]byte("testlog1\ntestlog2\n"))
1548+
require.NoError(t, err)
1549+
1550+
require.NoError(t, writer.Close())
1551+
1552+
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1553+
defer func() {
1554+
require.NoError(t, operator.Stop())
1555+
}()
1556+
1557+
sink.ExpectToken(t, []byte("testlog1"))
1558+
sink.ExpectToken(t, []byte("testlog2"))
1559+
}
1560+
1561+
// TestReadGzipCompressedLogsFromEnd tests that, when starting at the end of a gzip compressed file, we
1562+
// read all the lines that are added afterward
1563+
func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
1564+
t.Parallel()
1565+
1566+
tempDir := t.TempDir()
1567+
cfg := NewConfig().includeDir(tempDir).withGzip()
1568+
cfg.StartAt = "end"
1569+
operator, sink := testManager(t, cfg)
1570+
1571+
// Create a file, then start
1572+
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")
1573+
1574+
appendToLog := func(t *testing.T, content string) {
1575+
writer := gzip.NewWriter(temp)
1576+
_, err := writer.Write([]byte(content))
1577+
require.NoError(t, err)
1578+
require.NoError(t, writer.Close())
1579+
}
1580+
1581+
appendToLog(t, "testlog1\ntestlog2\n")
1582+
1583+
// poll for the first time - this should not lead to emitted
1584+
// logs as those were already in the existing file
1585+
operator.poll(context.TODO())
1586+
1587+
// append new content to the log and poll again - this should be picked up
1588+
appendToLog(t, "testlog3\n")
1589+
operator.poll(context.TODO())
1590+
sink.ExpectToken(t, []byte("testlog3"))
1591+
1592+
// do another iteration to verify correct setting of compressed reader offset
1593+
appendToLog(t, "testlog4\n")
1594+
operator.poll(context.TODO())
1595+
sink.ExpectToken(t, []byte("testlog4"))
1596+
}

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Factory struct {
4343
Attributes attrs.Resolver
4444
DeleteAtEOF bool
4545
IncludeFileRecordNumber bool
46+
Compression string
4647
}
4748

4849
func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
@@ -75,14 +76,15 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
7576
lineSplitFunc: f.SplitFunc,
7677
deleteAtEOF: f.DeleteAtEOF,
7778
includeFileRecordNum: f.IncludeFileRecordNumber,
79+
compression: f.Compression,
7880
}
7981
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))
8082

8183
if r.Fingerprint.Len() > r.fingerprintSize {
8284
// User has reconfigured fingerprint_size
8385
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize)
8486
if rereadErr != nil {
85-
return nil, fmt.Errorf("reread fingerprint: %w", err)
87+
return nil, fmt.Errorf("reread fingerprint: %w", rereadErr)
8688
}
8789
if !r.Fingerprint.StartsWith(shorter) {
8890
return nil, errors.New("file truncated")
@@ -121,5 +123,6 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
121123
for k, v := range attributes {
122124
r.FileAttributes[k] = v
123125
}
126+
124127
return r, nil
125128
}

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont
55

66
import (
77
"bufio"
8+
"compress/gzip"
89
"context"
910
"errors"
11+
"io"
1012
"os"
1113

1214
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
@@ -37,6 +39,7 @@ type Reader struct {
3739
set component.TelemetrySettings
3840
fileName string
3941
file *os.File
42+
reader io.Reader
4043
fingerprintSize int
4144
initialBufferSize int
4245
maxLogSize int
@@ -49,10 +52,42 @@ type Reader struct {
4952
deleteAtEOF bool
5053
needsUpdateFingerprint bool
5154
includeFileRecordNum bool
55+
compression string
5256
}
5357

5458
// ReadToEnd will read until the end of the file
5559
func (r *Reader) ReadToEnd(ctx context.Context) {
60+
switch r.compression {
61+
case "gzip":
62+
// We need to create a gzip reader each time ReadToEnd is called because the underlying
63+
// SectionReader can only read a fixed window (from previous offset to EOF).
64+
info, err := r.file.Stat()
65+
if err != nil {
66+
r.set.Logger.Error("Failed to stat", zap.Error(err))
67+
return
68+
}
69+
currentEOF := info.Size()
70+
71+
// use a gzip Reader with an underlying SectionReader to pick up at the last
72+
// offset of a gzip compressed file
73+
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
74+
if err != nil {
75+
if !errors.Is(err, io.EOF) {
76+
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
77+
}
78+
return
79+
} else {
80+
r.reader = gzipReader
81+
}
82+
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
83+
// we need to set the offset to the end of the file.
84+
defer func() {
85+
r.Offset = currentEOF
86+
}()
87+
default:
88+
r.reader = r.file
89+
}
90+
5691
if _, err := r.file.Seek(r.Offset, 0); err != nil {
5792
r.set.Logger.Error("Failed to seek", zap.Error(err))
5893
return
@@ -163,7 +198,7 @@ func (r *Reader) close() {
163198

164199
// Read from the file and update the fingerprint if necessary
165200
func (r *Reader) Read(dst []byte) (n int, err error) {
166-
n, err = r.file.Read(dst)
201+
n, err = r.reader.Read(dst)
167202
if n == 0 || err != nil {
168203
return
169204
}

receiver/filelogreceiver/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Tails and parses logs from files.
5757
| `ordering_criteria.sort_by.location` | | Relevant if `sort_type` is set to `timestamp`. Defines the location of the timestamp of the file. |
5858
| `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. |
5959
| `ordering_criteria.sort_by.ascending` | | Sort direction |
60+
| `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are `` or `gzip` |
6061

6162
Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`.
6263

@@ -179,6 +180,22 @@ Exception in thread 2 "main" java.lang.NullPointerException
179180
at com.example.myproject.Bootstrap.main(Bootstrap.java:44)
180181
```
181182
183+
## Example - Reading compressed log files
184+
185+
Receiver Configuration
186+
```yaml
187+
receivers:
188+
filelog:
189+
include:
190+
- /var/log/example/compressed.log.gz
191+
compression: gzip
192+
```
193+
194+
The above configuration will be able to read gzip compressed log files by setting the `compression` option to `gzip`.
195+
When this option is set, all files ending with that suffix are scanned using a gzip reader that decompresses the file content
196+
before scanning through it. Please note that if the compressed file is expected to be updated, the additional compressed logs must be appended to the
197+
compressed file, rather than recompressing the whole content and overwriting the previous file.
198+
182199
## Offset tracking
183200

184201
The `storage` setting allows you to define the proper storage extension for storing file offsets.

receiver/fluentforwardreceiver/collector.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,33 @@ package fluentforwardreceiver // import "github.com/open-telemetry/opentelemetry
66
import (
77
"context"
88

9-
"go.opencensus.io/stats"
109
"go.opentelemetry.io/collector/consumer"
1110
"go.opentelemetry.io/collector/pdata/plog"
1211
"go.opentelemetry.io/collector/receiver/receiverhelper"
1312
"go.uber.org/zap"
1413

15-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver/observ"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver/internal/metadata"
1615
)
1716

1817
// Collector acts as an aggregator of LogRecords so that we don't have to
1918
// generate as many plog.Logs instances...we can pre-batch the LogRecord
2019
// instances from several Forward events into one to hopefully reduce
2120
// allocations and GC overhead.
2221
type Collector struct {
23-
nextConsumer consumer.Logs
24-
eventCh <-chan Event
25-
logger *zap.Logger
26-
obsrecv *receiverhelper.ObsReport
22+
nextConsumer consumer.Logs
23+
eventCh <-chan Event
24+
logger *zap.Logger
25+
obsrecv *receiverhelper.ObsReport
26+
telemetryBuilder *metadata.TelemetryBuilder
2727
}
2828

29-
func newCollector(eventCh <-chan Event, next consumer.Logs, logger *zap.Logger, obsrecv *receiverhelper.ObsReport) *Collector {
29+
func newCollector(eventCh <-chan Event, next consumer.Logs, logger *zap.Logger, obsrecv *receiverhelper.ObsReport, telemetryBuilder *metadata.TelemetryBuilder) *Collector {
3030
return &Collector{
31-
nextConsumer: next,
32-
eventCh: eventCh,
33-
logger: logger,
34-
obsrecv: obsrecv,
31+
nextConsumer: next,
32+
eventCh: eventCh,
33+
logger: logger,
34+
obsrecv: obsrecv,
35+
telemetryBuilder: telemetryBuilder,
3536
}
3637
}
3738

@@ -55,7 +56,7 @@ func (c *Collector) processEvents(ctx context.Context) {
5556
c.fillBufferUntilChanEmpty(logSlice)
5657

5758
logRecordCount := out.LogRecordCount()
58-
stats.Record(context.Background(), observ.RecordsGenerated.M(int64(logRecordCount)))
59+
c.telemetryBuilder.FluentRecordsGenerated.Add(ctx, int64(logRecordCount))
5960
obsCtx := c.obsrecv.StartLogsOp(ctx)
6061
err := c.nextConsumer.ConsumeLogs(obsCtx, out)
6162
c.obsrecv.EndLogsOp(obsCtx, "fluent", logRecordCount, err)

0 commit comments

Comments
 (0)