Skip to content

Commit ec15aba

Browse files
authored
feat(arrow/cdata): Add ReleaseCArrowArrayStream function (#373)
### Rationale for this change See #371 @zeroshade ### What changes are included in this PR? A new `ReleaseCArrowArrayStream` function to facilitate triggering the release callback of a `CArrowArrayStream`. ### Are these changes tested? ``` $ go test ./arrow/cdata -tags=test -count=1 ok github.com/apache/arrow-go/v18/arrow/cdata 5.413s ``` ### Are there any user-facing changes? Yes, a new function is added.
1 parent 436b24d commit ec15aba

File tree

4 files changed

+11
-7
lines changed

4 files changed

+11
-7
lines changed

arrow/cdata/cdata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,3 +1035,7 @@ func releaseArr(arr *CArrowArray) {
10351035
func releaseSchema(schema *CArrowSchema) {
10361036
C.ArrowSchemaRelease(schema)
10371037
}
1038+
1039+
func releaseStream(stream *CArrowArrayStream) {
1040+
C.ArrowArrayStreamRelease(stream)
1041+
}

arrow/cdata/cdata_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,7 @@ func TestRecordBatch(t *testing.T) {
768768

769769
func TestRecordReaderStream(t *testing.T) {
770770
stream := arrayStreamTest()
771-
defer releaseStream(stream)
771+
defer ReleaseCArrowArrayStream(stream)
772772

773773
rdr := ImportCArrayStream(stream, nil)
774774
i := 0
@@ -852,7 +852,7 @@ func TestExportRecordReaderStreamLifetime(t *testing.T) {
852852

853853
// C Stream is holding on to memory
854854
assert.NotEqual(t, 0, mem.CurrentAlloc())
855-
releaseStream(out)
855+
ReleaseCArrowArrayStream(out)
856856
}
857857

858858
func TestEmptyListExport(t *testing.T) {

arrow/cdata/cdata_test_framework.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ func exportInt32TypeSchema() CArrowSchema {
9898
return s
9999
}
100100

101-
func releaseStream(s *CArrowArrayStream) {
102-
C.ArrowArrayStreamRelease(s)
103-
}
104-
105101
func schemaIsReleased(s *CArrowSchema) bool {
106102
return C.ArrowSchemaIsReleased(s) == 1
107103
}

arrow/cdata/interface.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema
268268
// callbacks to be a working ArrowArrayStream utilizing the passed in RecordReader. The
269269
// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release
270270
// callback, as such it is unnecessary to call Release on the passed in reader unless it has
271-
// previously been retained.
271+
// previously been retained. To call that release callback and prevent a memory leak, you can
272+
// call ReleaseCArrowArrayStream on the CArrowArrayStream once it is no longer needed.
272273
//
273274
// WARNING: the output ArrowArrayStream MUST BE ZERO INITIALIZED, or the Go garbage
274275
// collector may error at runtime, due to CGO rules ("the current implementation may
@@ -284,6 +285,9 @@ func ReleaseCArrowArray(arr *CArrowArray) { releaseArr(arr) }
284285
// ReleaseCArrowSchema calls ArrowSchemaRelease on the passed in cdata schema
285286
func ReleaseCArrowSchema(schema *CArrowSchema) { releaseSchema(schema) }
286287

288+
// ReleaseCArrowArrayStream calls ArrowArrayStreamRelease on the passed in cdata stream
289+
func ReleaseCArrowArrayStream(stream *CArrowArrayStream) { releaseStream(stream) }
290+
287291
// RecordMessage is a simple container for a record batch channel to stream for
288292
// using the Async C Data Interface via ExportAsyncRecordBatchStream.
289293
type RecordMessage struct {

0 commit comments

Comments
 (0)