From eecbfc4aa2b50b27b56e67125a788a05253064ff Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sat, 16 Apr 2022 13:44:10 -0700 Subject: [PATCH 1/2] upgrade github.com/klauspost/compress to v1.15.1 and remove the need for runtime finalizers --- compress/zstd/zstd.go | 61 ++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/compress/zstd/zstd.go b/compress/zstd/zstd.go index c458053bf..1f74d876e 100644 --- a/compress/zstd/zstd.go +++ b/compress/zstd/zstd.go @@ -3,7 +3,6 @@ package zstd import ( "io" - "runtime" "sync" "github.com/klauspost/compress/zstd" @@ -29,19 +28,18 @@ func (c *Codec) Name() string { return "zstd" } // NewReader implements the compress.Codec interface. func (c *Codec) NewReader(r io.Reader) io.ReadCloser { p := new(reader) - if dec, _ := decoderPool.Get().(*decoder); dec == nil { - z, err := zstd.NewReader(r) + if p.dec, _ = decoderPool.Get().(*zstd.Decoder); p.dec != nil { + p.dec.Reset(r) + } else { + z, err := zstd.NewReader(r, + zstd.WithDecoderConcurrency(1), + zstd.WithDecoderLowmem(true), + ) if err != nil { p.err = err } else { - p.dec = &decoder{z} - // We need a finalizer because the reader spawns goroutines - // that will only be stopped if the Close method is called. - runtime.SetFinalizer(p.dec, (*decoder).finalize) + p.dec = z } - } else { - p.dec = dec - p.err = dec.Reset(r) } return p } @@ -57,18 +55,10 @@ func (c *Codec) zstdLevel() zstd.EncoderLevel { return zstd.EncoderLevelFromZstd(c.level()) } -var decoderPool sync.Pool // *decoder - -type decoder struct { - *zstd.Decoder -} - -func (d *decoder) finalize() { - d.Close() -} +var decoderPool sync.Pool // *zstd.Decoder type reader struct { - dec *decoder + dec *zstd.Decoder err error } @@ -88,6 +78,9 @@ func (r *reader) Read(p []byte) (int, error) { if r.err != nil { return 0, r.err } + if r.dec == nil { + return 0, io.EOF + } return r.dec.Read(p) } @@ -96,21 +89,25 @@ func (r *reader) WriteTo(w io.Writer) (int64, error) { if r.err != nil { return 0, r.err } + if r.dec == nil { + return 0, io.ErrClosedPipe + } return r.dec.WriteTo(w) } // NewWriter implements the compress.Codec interface. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser { p := new(writer) - if enc, _ := c.encoderPool.Get().(*encoder); enc == nil { - z, err := zstd.NewWriter(w, zstd.WithEncoderLevel(c.zstdLevel())) + if enc, _ := c.encoderPool.Get().(*zstd.Encoder); enc == nil { + z, err := zstd.NewWriter(w, + zstd.WithEncoderLevel(c.zstdLevel()), + zstd.WithEncoderConcurrency(1), + zstd.WithZeroFrames(true), + ) if err != nil { p.err = err } else { - p.enc = &encoder{z} - // We need a finalizer because the writer spawns goroutines - // that will only be stopped if the Close method is called. - runtime.SetFinalizer(p.enc, (*encoder).finalize) + p.enc = z } } else { p.enc = enc @@ -120,17 +117,9 @@ func (c *Codec) NewWriter(w io.Writer) io.WriteCloser { return p } -type encoder struct { - *zstd.Encoder -} - -func (e *encoder) finalize() { - e.Close() -} - type writer struct { c *Codec - enc *encoder + enc *zstd.Encoder err error } @@ -149,7 +138,7 @@ func (w *writer) Close() error { w.enc = nil return err } - return nil + return w.err } // WriteTo implements the io.WriterTo interface. From e7f5ac12d854d1af6e200af9de733423bd1f6528 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 17 Apr 2022 12:51:40 -0700 Subject: [PATCH 2/2] remove lowmem config --- compress/zstd/zstd.go | 1 - 1 file changed, 1 deletion(-) diff --git a/compress/zstd/zstd.go b/compress/zstd/zstd.go index 1f74d876e..1cc5e8490 100644 --- a/compress/zstd/zstd.go +++ b/compress/zstd/zstd.go @@ -33,7 +33,6 @@ func (c *Codec) NewReader(r io.Reader) io.ReadCloser { } else { z, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1), - zstd.WithDecoderLowmem(true), ) if err != nil { p.err = err