Skip to content

Commit 96ae92b

Browse files
feat(array): less leaky string array (#5483)
* feat(array): less leaky string array Change the behviour of the string array back to the old behaviour where accessing the Value function returns a string that is backed by the arrow memory buffer. This avoids data allocations to memory outside of the memory allocator. The implementation of array.String has been simplified somewhat as part of the new behaviour. There are a number of places where correct behviour relies on copies of the data being made. To avoid having to fix all of these in the same PR a temporary ValueCopy function has been added to maintain the old semantics. This is being used everywhere the Value function was previously, except for cases where the value is obviously immediately processed, then discarded. * chore: update internal/arrowutil/iterator.gen.go.tmpl Co-authored-by: Chunchun Ye <[email protected]> --------- Co-authored-by: Chunchun Ye <[email protected]>
1 parent bea9586 commit 96ae92b

36 files changed

+268
-309
lines changed

array/array.go

Lines changed: 74 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package array
22

33
import (
44
"strconv"
5-
"sync/atomic"
65

76
"github.com/apache/arrow/go/v7/arrow"
87
"github.com/apache/arrow/go/v7/arrow/array"
98
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
9+
1010
"github.com/influxdata/flux/codes"
1111
"github.com/influxdata/flux/internal/errors"
1212
"github.com/influxdata/flux/memory"
@@ -103,10 +103,23 @@ type Builder interface {
103103
NewArray() Array
104104
}
105105

106+
type binaryArray interface {
107+
NullN() int
108+
NullBitmapBytes() []byte
109+
IsNull(i int) bool
110+
IsValid(i int) bool
111+
Data() arrow.ArrayData
112+
Len() int
113+
ValueBytes() []byte
114+
ValueLen(i int) int
115+
ValueOffset(i int) int
116+
ValueString(i int) string
117+
Retain()
118+
Release()
119+
}
120+
106121
type String struct {
107-
length int
108-
data *array.Binary
109-
value *stringValue
122+
binaryArray
110123
}
111124

112125
// NewStringFromBinaryArray creates an instance of String from
@@ -118,140 +131,90 @@ type String struct {
118131
func NewStringFromBinaryArray(data *array.Binary) *String {
119132
data.Retain()
120133
return &String{
121-
data: data,
134+
binaryArray: data,
122135
}
123136
}
124137

125138
func (a *String) DataType() DataType {
126139
return StringType
127140
}
128-
func (a *String) NullN() int {
129-
if a.data != nil {
130-
return a.data.NullN()
131-
}
132-
return 0
133-
}
134-
func (a *String) NullBitmapBytes() []byte {
135-
if a.data != nil {
136-
return a.data.NullBitmapBytes()
137-
}
138-
return nil
139-
}
140-
func (a *String) IsNull(i int) bool {
141-
if a.data != nil {
142-
return a.data.IsNull(i)
143-
}
144-
return false
145-
}
146-
func (a *String) IsValid(i int) bool {
147-
if a.data != nil {
148-
return a.data.IsValid(i)
149-
}
150-
return true
151-
}
152-
func (a *String) Data() arrow.ArrayData {
153-
if a.data != nil {
154-
return a.data.Data()
155-
}
156-
return nil
157-
}
158-
func (a *String) Len() int {
159-
if a.data != nil {
160-
return a.data.Len()
161-
}
162-
return a.length
163-
}
164-
func (a *String) Retain() {
165-
if a.data != nil {
166-
a.data.Retain()
167-
return
168-
}
169-
a.value.Retain()
170-
}
171-
func (a *String) Release() {
172-
if a.data != nil {
173-
a.data.Release()
174-
return
175-
}
176-
a.value.Release()
177-
}
141+
178142
func (a *String) Slice(i, j int) Array {
179-
if a.data != nil {
180-
data := array.NewSliceData(a.data.Data(), int64(i), int64(j))
181-
defer data.Release()
182-
return &String{
183-
data: array.NewBinaryData(data),
184-
}
143+
slice, ok := a.binaryArray.(interface{ Slice(i, j int) binaryArray })
144+
if ok {
145+
return &String{binaryArray: slice.Slice(i, j)}
185146
}
186-
a.value.Retain()
147+
data := array.NewSliceData(a.binaryArray.Data(), int64(i), int64(j))
148+
defer data.Release()
187149
return &String{
188-
value: a.value,
189-
length: j - i,
150+
binaryArray: array.NewBinaryData(data),
190151
}
191152
}
192153

193-
// ValueBytes returns a byte slice containing the value of this string
194-
// at index i. This slice points to the contents of the data buffer and
195-
// is only valid for the lifetime of the array.
196-
func (a *String) ValueBytes(i int) []byte {
197-
if a.data != nil {
198-
return a.data.Value(i)
199-
}
200-
return a.value.Bytes()
201-
}
202-
203-
// Value returns a string copy of the value stored at index i. The
204-
// returned value will outlive the array and is safe to use like any
205-
// other go string. The memory backing the string will be allocated by
206-
// the runtime, rather than any provided allocator.
154+
// Value returns a string view of the bytes in the array. The string
155+
// is only valid for the lifetime of the array. Care should be taken not
156+
// to store this string without also retaining the array.
207157
func (a *String) Value(i int) string {
208-
return string(a.ValueBytes(i))
158+
return a.ValueString(i)
209159
}
210-
func (a *String) ValueLen(i int) int {
211-
if a.data != nil {
212-
return a.data.ValueLen(i)
160+
161+
// ValueRef returns a reference to the memory buffer and location that
162+
// stores the value at i. The reference is only valid for as long as the
163+
// array is, the buffer needs to be retained if further access is
164+
// required.
165+
func (a *String) ValueRef(i int) StringRef {
166+
if vr, ok := a.binaryArray.(interface{ ValueRef(int) StringRef }); ok {
167+
return vr.ValueRef(i)
168+
}
169+
return StringRef{
170+
buf: a.Data().Buffers()[2],
171+
off: a.ValueOffset(i),
172+
len: a.ValueLen(i),
213173
}
214-
return a.value.Len()
215174
}
216-
func (a *String) IsConstant() bool {
217-
return a.data == nil
175+
176+
// ValueCopy returns the value at the requested position copied into a
177+
// new memory location. This value will remain valid after the array is
178+
// released, but is not tracked by the memory allocator.
179+
//
180+
// This function is intended to be temporary while changes are being
181+
// made to reduce the amount of unaccounted data memory.
182+
func (a *String) ValueCopy(i int) string {
183+
return string(a.ValueRef(i).Bytes())
218184
}
219185

220-
type stringValue struct {
221-
rc int64
222-
data []byte
186+
func (a *String) IsConstant() bool {
187+
ic, ok := a.binaryArray.(interface{ IsConstant() bool })
188+
return ok && ic.IsConstant()
189+
}
223190

224-
mem arrowmem.Allocator
191+
// StringRef contains a referenct to the storage for a value.
192+
type StringRef struct {
193+
buf *arrowmem.Buffer
194+
off int
195+
len int
225196
}
226197

227-
func (v *stringValue) Retain() {
228-
if v == nil {
229-
return
230-
}
231-
atomic.AddInt64(&v.rc, 1)
198+
// Buffer returns the memory buffer that contains the value.
199+
func (r StringRef) Buffer() *arrowmem.Buffer {
200+
return r.buf
232201
}
233202

234-
func (v *stringValue) Release() {
235-
if v == nil {
236-
return
237-
}
238-
if atomic.AddInt64(&v.rc, -1) == 0 {
239-
v.mem.Free(v.data)
240-
}
203+
// Offset returns the offset into the memory buffer at which the value
204+
// starts.
205+
func (r StringRef) Offset() int {
206+
return r.off
241207
}
242208

243-
func (v *stringValue) Bytes() []byte {
244-
if v == nil {
245-
return nil
246-
}
247-
return v.data
209+
// Len returns the length of the value.
210+
func (r StringRef) Len() int {
211+
return r.len
248212
}
249213

250-
func (v *stringValue) Len() int {
251-
if v == nil {
252-
return 0
253-
}
254-
return len(v.data)
214+
// Bytes returns the bytes from the memory buffer that contain the
215+
// value.
216+
func (r StringRef) Bytes() []byte {
217+
return r.buf.Bytes()[r.off : r.off+r.len]
255218
}
256219

257220
type sliceable interface {

array/array_test.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,30 @@ func TestString(t *testing.T) {
1515
for _, tc := range []struct {
1616
name string
1717
build func(b *array.StringBuilder)
18+
bsz int
1819
sz int
1920
want []interface{}
2021
}{
2122
{
2223
name: "Constant",
2324
build: func(b *array.StringBuilder) {
2425
for i := 0; i < 10; i++ {
25-
b.Append("a")
26+
b.Append("abcdefghij")
2627
}
2728
},
28-
sz: 1,
29+
bsz: 256, // 64 bytes nulls + 192 bytes data.
30+
sz: 64, // The minimum size of a buffer is 64 bytes
2931
want: []interface{}{
30-
"a", "a", "a", "a", "a",
31-
"a", "a", "a", "a", "a",
32+
"abcdefghij",
33+
"abcdefghij",
34+
"abcdefghij",
35+
"abcdefghij",
36+
"abcdefghij",
37+
"abcdefghij",
38+
"abcdefghij",
39+
"abcdefghij",
40+
"abcdefghij",
41+
"abcdefghij",
3242
},
3343
},
3444
{
@@ -41,7 +51,8 @@ func TestString(t *testing.T) {
4151
b.Append("b")
4252
}
4353
},
44-
sz: 192,
54+
bsz: 192,
55+
sz: 192,
4556
want: []interface{}{
4657
"a", "a", "a", "a", "a",
4758
"b", "b", "b", "b", "b",
@@ -58,7 +69,8 @@ func TestString(t *testing.T) {
5869
b.Append(v)
5970
}
6071
},
61-
sz: 192,
72+
bsz: 192,
73+
sz: 192,
6274
want: []interface{}{
6375
"a", "b", "c", "d", "e",
6476
nil, "g", "h", "i", "j",
@@ -90,7 +102,7 @@ func TestString(t *testing.T) {
90102
if want, got := len(tc.want)+2, b.Cap(); want != got {
91103
t.Errorf("unexpected builder cap -want/+got:\n\t- %d\n\t+ %d", want, got)
92104
}
93-
assert.Equal(t, tc.sz, mem.CurrentAlloc(), "unexpected memory allocation.")
105+
assert.Equal(t, tc.bsz, mem.CurrentAlloc(), "unexpected memory allocation.")
94106

95107
arr := b.NewStringArray()
96108
defer arr.Release()
@@ -165,7 +177,7 @@ func TestStringBuilder_NewArray(t *testing.T) {
165177
}
166178

167179
arr := b.NewArray()
168-
assert.Equal(t, 1, mem.CurrentAlloc(), "unexpected memory allocation.")
180+
assert.Equal(t, 64, mem.CurrentAlloc(), "unexpected memory allocation.")
169181
arr.Release()
170182
mem.AssertSize(t, 0)
171183

array/binary.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,11 @@ func StringAdd(l, r *String, mem memory.Allocator) (*String, error) {
155155
b.Resize(n)
156156
for i := 0; i < n; i++ {
157157
if l.IsValid(i) && r.IsValid(i) {
158-
lb := l.ValueBytes(i)
159-
rb := r.ValueBytes(i)
160-
buf := make([]byte, len(lb)+len(rb))
161-
copy(buf, lb)
162-
copy(buf[len(lb):], rb)
158+
ls := l.Value(i)
159+
rs := r.Value(i)
160+
buf := make([]byte, len(ls)+len(rs))
161+
copy(buf, ls)
162+
copy(buf[len(ls):], rs)
163163
b.AppendBytes(buf)
164164

165165
} else {
@@ -177,10 +177,10 @@ func StringAddLConst(l string, r *String, mem memory.Allocator) (*String, error)
177177
b.Resize(n)
178178
for i := 0; i < n; i++ {
179179
if r.IsValid(i) {
180-
rb := r.ValueBytes(i)
181-
buf := make([]byte, len(l)+len(rb))
180+
rs := r.Value(i)
181+
buf := make([]byte, len(l)+len(rs))
182182
copy(buf, l)
183-
copy(buf[len(l):], rb)
183+
copy(buf[len(l):], rs)
184184
b.AppendBytes(buf)
185185

186186
} else {
@@ -198,10 +198,10 @@ func StringAddRConst(l *String, r string, mem memory.Allocator) (*String, error)
198198
b.Resize(n)
199199
for i := 0; i < n; i++ {
200200
if l.IsValid(i) {
201-
lb := l.ValueBytes(i)
202-
buf := make([]byte, len(lb)+len(r))
203-
copy(buf, lb)
204-
copy(buf[len(lb):], r)
201+
ls := l.Value(i)
202+
buf := make([]byte, len(ls)+len(r))
203+
copy(buf, ls)
204+
copy(buf[len(ls):], r)
205205
b.AppendBytes(buf)
206206

207207
} else {

0 commit comments

Comments
 (0)