@@ -18,6 +18,7 @@ import (
1818 "context"
1919 "errors"
2020 "fmt"
21+ "hash/crc32"
2122 "io"
2223 "net/http"
2324 "net/url"
@@ -51,6 +52,10 @@ func (w *gRPCWriter) Write(p []byte) (n int, err error) {
5152 case <- w .donec :
5253 return 0 , w .streamResult
5354 case w .writesChan <- cmd :
55+ // update fullObjectChecksum on every write and send it on finalWrite
56+ if ! w .disableAutoChecksum {
57+ w .fullObjectChecksum = crc32 .Update (w .fullObjectChecksum , crc32cTable , p )
58+ }
5459 // write command successfully delivered to sender. We no longer own cmd.
5560 break
5661 }
@@ -170,6 +175,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
170175
171176 flushSupported : params .append ,
172177 sendCRC32C : params .sendCRC32C ,
178+ disableAutoChecksum : params .disableAutoChecksum ,
173179 forceOneShot : params .chunkSize <= 0 ,
174180 forceEmptyContentType : params .forceEmptyContentType ,
175181 append : params .append ,
@@ -239,8 +245,11 @@ type gRPCWriter struct {
239245 setSize func (int64 )
240246 setTakeoverOffset func (int64 )
241247
248+ fullObjectChecksum uint32
249+
242250 flushSupported bool
243251 sendCRC32C bool
252+ disableAutoChecksum bool
244253 forceOneShot bool
245254 forceEmptyContentType bool
246255 append bool
@@ -785,25 +794,62 @@ func completion(r *storagepb.BidiWriteObjectResponse) *gRPCBidiWriteCompletion {
785794 }
786795}
787796
788- func bidiWriteObjectRequest (buf []byte , offset int64 , flush , finishWrite bool ) * storagepb.BidiWriteObjectRequest {
797+ // Server contract expects full object checksum to be sent only on first or last write.
798+ // Checksums of full object are already being sent on first Write during initialization of sender.
799+ // Send objectChecksums only on final request and nil in other cases.
800+ func bidiWriteObjectRequest (r gRPCBidiWriteRequest , bufChecksum * uint32 , objectChecksums * storagepb.ObjectChecksums ) * storagepb.BidiWriteObjectRequest {
789801 var data * storagepb.BidiWriteObjectRequest_ChecksummedData
790- if buf != nil {
802+ if r . buf != nil {
791803 data = & storagepb.BidiWriteObjectRequest_ChecksummedData {
792804 ChecksummedData : & storagepb.ChecksummedData {
793- Content : buf ,
805+ Content : r .buf ,
806+ Crc32C : bufChecksum ,
794807 },
795808 }
796809 }
797810 req := & storagepb.BidiWriteObjectRequest {
798- Data : data ,
799- WriteOffset : offset ,
800- FinishWrite : finishWrite ,
801- Flush : flush ,
802- StateLookup : flush ,
811+ Data : data ,
812+ WriteOffset : r .offset ,
813+ FinishWrite : r .finishWrite ,
814+ Flush : r .flush ,
815+ StateLookup : r .flush ,
816+ ObjectChecksums : objectChecksums ,
803817 }
804818 return req
805819}
806820
821+ type getObjectChecksumsParams struct {
822+ fullObjectChecksum func () uint32
823+ finishWrite bool
824+ sendCRC32C bool
825+ disableAutoChecksum bool
826+ attrs * ObjectAttrs
827+ }
828+
829+ // getObjectChecksums determines what checksum information to include in the final
830+ // gRPC request
831+ //
832+ // function returns a populated ObjectChecksums only when finishWrite is true
833+ // If CRC32C is disabled, it returns the user-provided checksum if available.
834+ // If CRC32C is enabled, it returns the user-provided checksum if available,
835+ // or the computed checksum of the entire object.
836+ func getObjectChecksums (params * getObjectChecksumsParams ) * storagepb.ObjectChecksums {
837+ if ! params .finishWrite {
838+ return nil
839+ }
840+
841+ // send user's checksum on last write op if available
842+ if params .sendCRC32C {
843+ return toProtoChecksums (params .sendCRC32C , params .attrs )
844+ }
845+ if params .disableAutoChecksum {
846+ return nil
847+ }
848+ return & storagepb.ObjectChecksums {
849+ Crc32C : proto .Uint32 (params .fullObjectChecksum ()),
850+ }
851+ }
852+
807853type gRPCBidiWriteBufferSender interface {
808854 // connect implementations may attempt to establish a connection for issuing
809855 // writes.
@@ -832,6 +878,9 @@ type gRPCOneshotBidiWriteBufferSender struct {
832878 bucket string
833879 firstMessage * storagepb.BidiWriteObjectRequest
834880 streamErr error
881+
882+ checksumSettings func () (bool , bool , * ObjectAttrs )
883+ fullObjectChecksum func () uint32
835884}
836885
837886func (w * gRPCWriter ) newGRPCOneshotBidiWriteBufferSender () * gRPCOneshotBidiWriteBufferSender {
@@ -843,11 +892,13 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
843892 WriteObjectSpec : w .spec ,
844893 },
845894 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
846- // For a non-resumable upload, checksums must be sent in this message.
847- // TODO: Currently the checksums are only sent on the first message
848- // of the stream, but in the future, we must also support sending it
849- // on the *last* message of the stream (instead of the first).
850- ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
895+ ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
896+ },
897+ checksumSettings : func () (bool , bool , * ObjectAttrs ) {
898+ return w .sendCRC32C , w .disableAutoChecksum , w .attrs
899+ },
900+ fullObjectChecksum : func () uint32 {
901+ return w .fullObjectChecksum
851902 },
852903 }
853904}
@@ -888,7 +939,20 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
888939 continue
889940 }
890941
891- req := bidiWriteObjectRequest (r .buf , r .offset , r .flush , r .finishWrite )
942+ sendCrc32C , disableAutoChecksum , attrs := s .checksumSettings ()
943+ var bufChecksum * uint32
944+ if ! disableAutoChecksum {
945+ bufChecksum = proto .Uint32 (crc32 .Checksum (r .buf , crc32cTable ))
946+ }
947+ objectChecksums := getObjectChecksums (& getObjectChecksumsParams {
948+ fullObjectChecksum : s .fullObjectChecksum ,
949+ finishWrite : r .finishWrite ,
950+ sendCRC32C : sendCrc32C ,
951+ disableAutoChecksum : disableAutoChecksum ,
952+ attrs : attrs ,
953+ })
954+ req := bidiWriteObjectRequest (r , bufChecksum , objectChecksums )
955+
892956 if firstSend {
893957 proto .Merge (req , s .firstMessage )
894958 firstSend = false
@@ -932,6 +996,9 @@ type gRPCResumableBidiWriteBufferSender struct {
932996 startWriteRequest * storagepb.StartResumableWriteRequest
933997 upid string
934998
999+ checksumSettings func () (bool , bool , * ObjectAttrs )
1000+ fullObjectChecksum func () uint32
1001+
9351002 streamErr error
9361003}
9371004
@@ -942,10 +1009,13 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
9421009 startWriteRequest : & storagepb.StartResumableWriteRequest {
9431010 WriteObjectSpec : w .spec ,
9441011 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
945- // TODO: Currently the checksums are only sent on the request to initialize
946- // the upload, but in the future, we must also support sending it
947- // on the *last* message of the stream.
948- ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
1012+ ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
1013+ },
1014+ checksumSettings : func () (bool , bool , * ObjectAttrs ) {
1015+ return w .sendCRC32C , w .disableAutoChecksum , w .attrs
1016+ },
1017+ fullObjectChecksum : func () uint32 {
1018+ return w .fullObjectChecksum
9491019 },
9501020 }
9511021}
@@ -1005,7 +1075,21 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
10051075 cs .requestAcks <- struct {}{}
10061076 continue
10071077 }
1008- req := bidiWriteObjectRequest (r .buf , r .offset , r .flush , r .finishWrite )
1078+
1079+ sendCrc32C , disableAutoChecksum , attrs := s .checksumSettings ()
1080+ var bufChecksum * uint32
1081+ if ! disableAutoChecksum {
1082+ bufChecksum = proto .Uint32 (crc32 .Checksum (r .buf , crc32cTable ))
1083+ }
1084+ objectChecksums := getObjectChecksums (& getObjectChecksumsParams {
1085+ fullObjectChecksum : s .fullObjectChecksum ,
1086+ finishWrite : r .finishWrite ,
1087+ sendCRC32C : sendCrc32C ,
1088+ disableAutoChecksum : disableAutoChecksum ,
1089+ attrs : attrs ,
1090+ })
1091+ req := bidiWriteObjectRequest (r , bufChecksum , objectChecksums )
1092+
10091093 if firstSend {
10101094 req .FirstMessage = & storagepb.BidiWriteObjectRequest_UploadId {UploadId : s .upid }
10111095 firstSend = false
@@ -1319,7 +1403,14 @@ func (s *gRPCAppendBidiWriteBufferSender) maybeHandleRedirectionError(err error)
13191403func (s * gRPCAppendBidiWriteBufferSender ) send (stream storagepb.Storage_BidiWriteObjectClient , buf []byte , offset int64 , flush , finishWrite , sendFirstMessage bool ) error {
13201404 finalizeObject := finishWrite && s .finalizeOnClose
13211405 flush = flush || finishWrite
1322- req := bidiWriteObjectRequest (buf , offset , flush , finalizeObject )
1406+ r := gRPCBidiWriteRequest {
1407+ buf : buf ,
1408+ offset : offset ,
1409+ flush : flush ,
1410+ finishWrite : finalizeObject ,
1411+ }
1412+ // TODO(b/453869602): implement default checksumming for appendable writes
1413+ req := bidiWriteObjectRequest (r , nil , nil )
13231414 if finalizeObject {
13241415 // appendable objects pass checksums on the finalize message only
13251416 req .ObjectChecksums = s .objectChecksums
0 commit comments