@@ -17,22 +17,24 @@ var (
17
17
errOverCapacity = errors .New ("over capacity" )
18
18
)
19
19
20
- // Minimum number of bytes to compress. 1500 is the MTU of an ethernet frame.
21
- const minCompressionLen = 1500
22
-
23
20
// bufferState encapsulates intermediate buffer state when pushing data
24
21
type bufferState struct {
25
- compressionAvailable bool
26
- bufferMaxLen uint
27
- maxEventLength uint
28
- writer io.Writer
29
- buf * bytes.Buffer
30
- jsonStream * jsoniter.Stream
31
- rawLength int
22
+ maxEventLength uint
23
+ buf buffer
24
+ jsonStream * jsoniter.Stream
25
+ rawLength int
26
+ }
27
+
28
+ type buffer interface {
29
+ io.Writer
30
+ io.Reader
31
+ io.Closer
32
+ Reset ()
33
+ Len () int
32
34
}
33
35
34
36
func (b * bufferState ) compressionEnabled () bool {
35
- _ , ok := b .writer .(* cancellableGzipWriter )
37
+ _ , ok := b .buf .(* cancellableGzipWriter )
36
38
return ok
37
39
}
38
40
@@ -42,9 +44,6 @@ func (b *bufferState) containsData() bool {
42
44
43
45
func (b * bufferState ) reset () {
44
46
b .buf .Reset ()
45
- if _ , ok := b .writer .(* cancellableBytesWriter ); ! ok {
46
- b .writer = & cancellableBytesWriter {innerWriter : b .buf , maxCapacity : b .bufferMaxLen }
47
- }
48
47
b .rawLength = 0
49
48
}
50
49
@@ -53,64 +52,23 @@ func (b *bufferState) Read(p []byte) (n int, err error) {
53
52
}
54
53
55
54
func (b * bufferState ) Close () error {
56
- if _ , ok := b .writer .(* cancellableGzipWriter ); ok {
57
- return b .writer .(* cancellableGzipWriter ).close ()
58
- }
59
- return nil
55
+ return b .buf .Close ()
60
56
}
61
57
62
58
// accept returns true if data is accepted by the buffer
63
59
func (b * bufferState ) accept (data []byte ) (bool , error ) {
64
60
if len (data )+ b .rawLength > int (b .maxEventLength ) {
65
61
return false , nil
66
62
}
67
- _ , err := b .writer .Write (data )
68
- overCapacity := errors .Is (err , errOverCapacity )
69
- bufLen := b .buf .Len ()
70
- if overCapacity {
71
- bufLen += len (data )
72
- }
73
- if b .compressionAvailable && ! b .compressionEnabled () && bufLen > minCompressionLen {
74
- // switch over to a zip buffer.
75
- tmpBuf := bytes .NewBuffer (make ([]byte , 0 , b .bufferMaxLen + bufCapPadding ))
76
- writer := gzip .NewWriter (tmpBuf )
77
- writer .Reset (tmpBuf )
78
- zipWriter := & cancellableGzipWriter {
79
- innerBuffer : tmpBuf ,
80
- innerWriter : writer ,
81
- // 8 bytes required for the zip footer.
82
- maxCapacity : b .bufferMaxLen - 8 ,
83
- }
84
-
85
- if b .bufferMaxLen == 0 {
86
- zipWriter .maxCapacity = 0
87
- }
88
-
89
- // we write the bytes buffer into the zip buffer. Any error from this is I/O, and should stop the process.
90
- if _ , err2 := zipWriter .Write (b .buf .Bytes ()); err2 != nil {
91
- return false , err2
92
- }
93
- b .writer = zipWriter
94
- b .buf = tmpBuf
95
- // if the byte writer was over capacity, try to write the new entry in the zip writer:
96
- if overCapacity {
97
- if _ , err2 := zipWriter .Write (data ); err2 != nil {
98
- overCapacity2 := errors .Is (err2 , errOverCapacity )
99
- if overCapacity2 {
100
- return false , nil
101
- }
102
- return false , err2
103
- }
104
-
105
- }
63
+ _ , err := b .buf .Write (data )
64
+ if err == nil {
106
65
b .rawLength += len (data )
107
66
return true , nil
108
67
}
109
- if overCapacity {
68
+ if errors . Is ( err , errOverCapacity ) {
110
69
return false , nil
111
70
}
112
- b .rawLength += len (data )
113
- return true , err
71
+ return false , err
114
72
}
115
73
116
74
type cancellableBytesWriter struct {
@@ -128,6 +86,22 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) {
128
86
return c .innerWriter .Write (b )
129
87
}
130
88
89
+ func (c * cancellableBytesWriter ) Read (p []byte ) (int , error ) {
90
+ return c .innerWriter .Read (p )
91
+ }
92
+
93
+ func (c * cancellableBytesWriter ) Reset () {
94
+ c .innerWriter .Reset ()
95
+ }
96
+
97
+ func (c * cancellableBytesWriter ) Close () error {
98
+ return nil
99
+ }
100
+
101
+ func (c * cancellableBytesWriter ) Len () int {
102
+ return c .innerWriter .Len ()
103
+ }
104
+
131
105
type cancellableGzipWriter struct {
132
106
innerBuffer * bytes.Buffer
133
107
innerWriter * gzip.Writer
@@ -168,10 +142,24 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) {
168
142
return c .innerWriter .Write (b )
169
143
}
170
144
171
- func (c * cancellableGzipWriter ) close () error {
145
+ func (c * cancellableGzipWriter ) Read (p []byte ) (int , error ) {
146
+ return c .innerBuffer .Read (p )
147
+ }
148
+
149
+ func (c * cancellableGzipWriter ) Reset () {
150
+ c .innerBuffer .Reset ()
151
+ c .innerWriter .Reset (c .innerBuffer )
152
+ c .len = 0
153
+ }
154
+
155
+ func (c * cancellableGzipWriter ) Close () error {
172
156
return c .innerWriter .Close ()
173
157
}
174
158
159
+ func (c * cancellableGzipWriter ) Len () int {
160
+ return c .innerBuffer .Len ()
161
+ }
162
+
175
163
// bufferStatePool is a pool of bufferState objects.
176
164
type bufferStatePool struct {
177
165
pool * sync.Pool
@@ -189,18 +177,28 @@ func (p bufferStatePool) put(bf *bufferState) {
189
177
190
178
const initBufferCap = 512
191
179
192
- func newBufferStatePool (bufCap uint , compressionAvailable bool , maxEventLength uint ) bufferStatePool {
180
+ func newBufferStatePool (bufCap uint , compressionEnabled bool , maxEventLength uint ) bufferStatePool {
193
181
return bufferStatePool {
194
182
& sync.Pool {
195
183
New : func () interface {} {
196
- buf := bytes .NewBuffer (make ([]byte , 0 , initBufferCap ))
184
+ innerBuffer := bytes .NewBuffer (make ([]byte , 0 , initBufferCap ))
185
+ var buf buffer
186
+ if compressionEnabled {
187
+ buf = & cancellableGzipWriter {
188
+ innerBuffer : innerBuffer ,
189
+ innerWriter : gzip .NewWriter (buf ),
190
+ maxCapacity : bufCap ,
191
+ }
192
+ } else {
193
+ buf = & cancellableBytesWriter {
194
+ innerWriter : innerBuffer ,
195
+ maxCapacity : bufCap ,
196
+ }
197
+ }
197
198
return & bufferState {
198
- compressionAvailable : compressionAvailable ,
199
- writer : & cancellableBytesWriter {innerWriter : buf , maxCapacity : bufCap },
200
- buf : buf ,
201
- jsonStream : jsoniter .NewStream (jsoniter .ConfigDefault , nil , initBufferCap ),
202
- bufferMaxLen : bufCap ,
203
- maxEventLength : maxEventLength ,
199
+ buf : buf ,
200
+ jsonStream : jsoniter .NewStream (jsoniter .ConfigDefault , nil , initBufferCap ),
201
+ maxEventLength : maxEventLength ,
204
202
}
205
203
},
206
204
},
0 commit comments