Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ type openWriterParams struct {
// sendCRC32C - see `Writer.SendCRC32C`.
// Optional.
sendCRC32C bool
// disableAutoChecksum - see `Writer.DisableAutoChecksum`.
// Optional.
disableAutoChecksum bool
// append - Write with appendable object semantics.
// Optional.
append bool
Expand Down
131 changes: 111 additions & 20 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -51,6 +52,10 @@ func (w *gRPCWriter) Write(p []byte) (n int, err error) {
case <-w.donec:
return 0, w.streamResult
case w.writesChan <- cmd:
// update fullObjectChecksum on every write and send it on finalWrite
if !w.disableAutoChecksum {
w.fullObjectChecksum = crc32.Update(w.fullObjectChecksum, crc32cTable, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to note that you are doing the work twice here for each set of bytes between here and L945. In theory it should be possible to calculate the per-message checksum once per buffer and then use those sums to update the full object checksum as well. It doesn't look like there is an easy interface to do this with in Go, but maybe worth considering if you are trying to save CPU.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of no retries, yes, it does seem like the same calculation is being done twice. But in case of retries, the buffer in this line and the buffer in L945 will be out of sync. And I cannot use the checksum in L945 to update the global checksum because we could be using same bytes multiple times in case of retries. So I had to separate these two computations.

}
// write command successfully delivered to sender. We no longer own cmd.
break
}
Expand Down Expand Up @@ -170,6 +175,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage

flushSupported: params.append,
sendCRC32C: params.sendCRC32C,
disableAutoChecksum: params.disableAutoChecksum,
forceOneShot: params.chunkSize <= 0,
forceEmptyContentType: params.forceEmptyContentType,
append: params.append,
Expand Down Expand Up @@ -239,8 +245,11 @@ type gRPCWriter struct {
setSize func(int64)
setTakeoverOffset func(int64)

fullObjectChecksum uint32

flushSupported bool
sendCRC32C bool
disableAutoChecksum bool
forceOneShot bool
forceEmptyContentType bool
append bool
Expand Down Expand Up @@ -785,25 +794,62 @@ func completion(r *storagepb.BidiWriteObjectResponse) *gRPCBidiWriteCompletion {
}
}

func bidiWriteObjectRequest(buf []byte, offset int64, flush, finishWrite bool) *storagepb.BidiWriteObjectRequest {
// Server contract expects full object checksum to be sent only on first or last write.
// Checksums of full object are already being sent on first Write during initialization of sender.
// Send objectChecksums only on final request and nil in other cases.
func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectChecksums *storagepb.ObjectChecksums) *storagepb.BidiWriteObjectRequest {
var data *storagepb.BidiWriteObjectRequest_ChecksummedData
if buf != nil {
if r.buf != nil {
data = &storagepb.BidiWriteObjectRequest_ChecksummedData{
ChecksummedData: &storagepb.ChecksummedData{
Content: buf,
Content: r.buf,
Crc32C: bufChecksum,
},
}
}
req := &storagepb.BidiWriteObjectRequest{
Data: data,
WriteOffset: offset,
FinishWrite: finishWrite,
Flush: flush,
StateLookup: flush,
Data: data,
WriteOffset: r.offset,
FinishWrite: r.finishWrite,
Flush: r.flush,
StateLookup: r.flush,
ObjectChecksums: objectChecksums,
}
return req
}

type getObjectChecksumsParams struct {
fullObjectChecksum func() uint32
finishWrite bool
sendCRC32C bool
disableAutoChecksum bool
attrs *ObjectAttrs
}

// getObjectChecksums determines what checksum information to include in the final
// gRPC request
//
// function returns a populated ObjectChecksums only when finishWrite is true
// If CRC32C is disabled, it returns the user-provided checksum if available.
// If CRC32C is enabled, it returns the user-provided checksum if available,
// or the computed checksum of the entire object.
func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectChecksums {
if !params.finishWrite {
return nil
}

// send user's checksum on last write op if available
if params.sendCRC32C {
return toProtoChecksums(params.sendCRC32C, params.attrs)
}
if params.disableAutoChecksum {
return nil
}
return &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(params.fullObjectChecksum()),
}
}

type gRPCBidiWriteBufferSender interface {
// connect implementations may attempt to establish a connection for issuing
// writes.
Expand Down Expand Up @@ -832,6 +878,9 @@ type gRPCOneshotBidiWriteBufferSender struct {
bucket string
firstMessage *storagepb.BidiWriteObjectRequest
streamErr error

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
}

func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender {
Expand All @@ -843,11 +892,13 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
WriteObjectSpec: w.spec,
},
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
// For a non-resumable upload, checksums must be sent in this message.
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
}
}
Expand Down Expand Up @@ -888,7 +939,20 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
continue
}

req := bidiWriteObjectRequest(r.buf, r.offset, r.flush, r.finishWrite)
sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

if firstSend {
proto.Merge(req, s.firstMessage)
firstSend = false
Expand Down Expand Up @@ -932,6 +996,9 @@ type gRPCResumableBidiWriteBufferSender struct {
startWriteRequest *storagepb.StartResumableWriteRequest
upid string

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32

streamErr error
}

Expand All @@ -942,10 +1009,13 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
startWriteRequest: &storagepb.StartResumableWriteRequest{
WriteObjectSpec: w.spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
// TODO: Currently the checksums are only sent on the request to initialize
// the upload, but in the future, we must also support sending it
// on the *last* message of the stream.
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
}
}
Expand Down Expand Up @@ -1005,7 +1075,21 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
cs.requestAcks <- struct{}{}
continue
}
req := bidiWriteObjectRequest(r.buf, r.offset, r.flush, r.finishWrite)

sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

if firstSend {
req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: s.upid}
firstSend = false
Expand Down Expand Up @@ -1318,7 +1402,14 @@ func (s *gRPCAppendBidiWriteBufferSender) maybeHandleRedirectionError(err error)
func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWriteObjectClient, buf []byte, offset int64, flush, finishWrite, sendFirstMessage bool) error {
finalizeObject := finishWrite && s.finalizeOnClose
flush = flush || finishWrite
req := bidiWriteObjectRequest(buf, offset, flush, finalizeObject)
r := gRPCBidiWriteRequest{
buf: buf,
offset: offset,
flush: flush,
finishWrite: finalizeObject,
}
// TODO(b/453869602): implement default checksumming for appendable writes
req := bidiWriteObjectRequest(r, nil, nil)
if finalizeObject {
// appendable objects pass checksums on the finalize message only
req.ObjectChecksums = s.objectChecksums
Expand Down
92 changes: 92 additions & 0 deletions storage/grpc_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"testing"

"cloud.google.com/go/storage/internal/apiv2/storagepb"
"google.golang.org/protobuf/proto"
)

func TestGetObjectChecksums(t *testing.T) {
tests := []struct {
name string
fullObjectChecksum func() uint32
finishWrite bool
sendCRC32C bool
disableAutoChecksum bool
attrs *ObjectAttrs
want *storagepb.ObjectChecksums
}{
{
name: "finishWrite is false",
finishWrite: false,
want: nil,
},
{
name: "sendCRC32C is true, attrs have CRC32C",
finishWrite: true,
sendCRC32C: true,
attrs: &ObjectAttrs{CRC32C: 123},
want: &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(123),
},
},
{
name: "disableCRC32C is true and sendCRC32C is true",
finishWrite: true,
sendCRC32C: true,
disableAutoChecksum: true,
attrs: &ObjectAttrs{CRC32C: 123},
want: &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(123),
},
},
{
name: "disableCRC32C is true and sendCRC32C is false",
finishWrite: true,
sendCRC32C: false,
disableAutoChecksum: true,
want: nil,
},
{
name: "CRC32C enabled, no user-provided checksum",
fullObjectChecksum: func() uint32 { return 456 },
finishWrite: true,
sendCRC32C: false,
disableAutoChecksum: false,
attrs: &ObjectAttrs{},
want: &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(456),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: tt.fullObjectChecksum,
finishWrite: tt.finishWrite,
sendCRC32C: tt.sendCRC32C,
disableAutoChecksum: tt.disableAutoChecksum,
attrs: tt.attrs,
})
if !proto.Equal(got, tt.want) {
t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want)
}
})
}
}
35 changes: 27 additions & 8 deletions storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,35 @@ type Writer struct {
// attributes are ignored.
ObjectAttrs

// SendCRC32C specifies whether to transmit a CRC32C field. It should be set
// to true in addition to setting the Writer's CRC32C field, because zero
// is a valid CRC and normally a zero would not be transmitted.
// If a CRC32C is sent, and the data written does not match the checksum,
// the write will be rejected.
// SendCRC32C specifies whether to transmit a CRC32C checksum. When this is
// true and the Writer's CRC32C field is set, that checksum is sent to GCS.
// If the data written does not match the checksum, the write is rejected.
// It is necessary to set this field to true in addition to setting the
// Writer's CRC32C field because zero is a valid CRC.
//
// Note: SendCRC32C must be set to true BEFORE the first call to
// Writer.Write() in order to send the checksum. If it is set after that
// point, the checksum will be ignored.
// When using gRPC, the client automatically calculates and sends checksums
// per-chunk and for the full object. However, A user-provided checksum takes
// precedence over the auto-calculated checksum for full object.
// To disable auto checksum behavior, see DisableAutoChecksum.
//
// Note: SendCRC32C must be set before the first call to Writer.Write().
SendCRC32C bool

// DisableAutoChecksum disables automatic CRC32C checksum calculation and
// validation in gRPC Writer. By default when using gRPC, the Writer
// automatically performs checksum validation for both individual chunks and
// the entire object. Setting this to true disables this behavior. This flag
// is ignored when not using gRPC.
//
// Disabling automatic checksumming does not prevent a user-provided checksum
// from being sent. If SendCRC32C is true and the Writer's CRC32C field is
// populated, that checksum will still be sent to GCS for validation.
//
// Note: DisableAutoChecksum must be set before the first call to
// Writer.Write(). Automatic checksumming is not enabled for writes
// using the HTTP client or for unfinalized writes to appendable objects in gRPC.
DisableAutoChecksum bool

// ChunkSize controls the maximum number of bytes of the object that the
// Writer will attempt to send to the server in a single request. Objects
// smaller than the size will be sent in a single request, while larger
Expand Down Expand Up @@ -286,6 +304,7 @@ func (w *Writer) openWriter() (err error) {
appendGen: w.o.gen,
encryptionKey: w.o.encryptionKey,
sendCRC32C: w.SendCRC32C,
disableAutoChecksum: w.DisableAutoChecksum,
append: w.Append,
finalizeOnClose: w.FinalizeOnClose,
donec: w.donec,
Expand Down
Loading