Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
bfad232
make data size optional
Hitenjain14 Feb 23, 2024
dd8def5
add opt for stream upload
Hitenjain14 Feb 24, 2024
1f523d3
Merge branch 'sprint-1.13' into feat/optional-size
Hitenjain14 Feb 28, 2024
8681af2
default nonce as 0 when http badrequest occurs
storybehind Feb 26, 2024
351d6c8
Merge branch 'sprint-1.13' into fix-mint-nonce
dabasov Mar 5, 2024
453af81
fix leaf index in vt
Hitenjain14 Mar 6, 2024
9a8a182
Merge branch 'feat/optional-size' of https://github.com/0chain/gosdk …
Hitenjain14 Mar 6, 2024
d8a7fc8
Merge branch 'sprint-1.13' of https://github.com/0chain/gosdk into fe…
Hitenjain14 Mar 6, 2024
0ae2713
fix shard and actual size for stream
Hitenjain14 Mar 6, 2024
cb8d73a
fix actual size for stream upload
Hitenjain14 Mar 6, 2024
7b2f71c
add errChan in memChan
Hitenjain14 Mar 7, 2024
7a17f61
strict timeouts and break when consensus thresh is met
Hitenjain14 Mar 8, 2024
3707045
fix list resp chan
Hitenjain14 Mar 8, 2024
d38ff81
rmv err check
Hitenjain14 Mar 9, 2024
14c6161
Merge pull request #1421 from 0chain/feat/list-thresh
dabasov Mar 10, 2024
ba68b3a
Merge pull request #1416 from 0chain/fix-mint-nonce
dabasov Mar 10, 2024
193e6f9
add buffer check (#1422)
Hitenjain14 Mar 11, 2024
69f9906
fix initialize download queue
Hitenjain14 Mar 13, 2024
385236b
change buf cap
Hitenjain14 Mar 13, 2024
83f0cc9
Merge pull request #1424 from 0chain/hotfix/time-queue
dabasov Mar 13, 2024
08f3de9
Merge branch 'sprint-1.14' of https://github.com/0chain/gosdk into fe…
Hitenjain14 Mar 14, 2024
5053d2d
Merge remote-tracking branch 'origin' into feat/optional-size
Hitenjain14 Mar 14, 2024
c5ad726
Merge branch 'sprint-1.14' of https://github.com/0chain/gosdk into fe…
Hitenjain14 Mar 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,27 @@ type MemChanFile struct {
ModTime time.Time // FileInfo.ModTime
ChunkWriteSize int // 0 value means no limit
Sys interface{} // FileInfo.Sys
reader io.Reader
ErrChan chan error
data []byte
}

func (f *MemChanFile) Stat() (fs.FileInfo, error) {
return &MemFileChanInfo{name: f.Name, f: f}, nil
}
func (f *MemChanFile) Read(p []byte) (int, error) {
recieveData, ok := <-f.Buffer
if !ok {
return 0, io.EOF
}
if len(recieveData) > len(p) {
return 0, io.ErrShortBuffer
select {
case err := <-f.ErrChan:
return 0, err
case recieveData, ok := <-f.Buffer:
if !ok {
return 0, io.EOF
}
if len(recieveData) > len(p) {
return 0, io.ErrShortBuffer
}
n := copy(p, recieveData)
return n, nil
}
n := copy(p, recieveData)
return n, nil
}

func (f *MemChanFile) Write(p []byte) (n int, err error) {
Expand All @@ -310,7 +314,7 @@ func (f *MemChanFile) Write(p []byte) (n int, err error) {
f.Buffer <- data
} else {
if cap(f.data) == 0 {
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = make([]byte, 0, len(p))
}
f.data = append(f.data, p...)
}
Expand All @@ -326,15 +330,14 @@ func (f *MemChanFile) Sync() error {
}
f.Buffer <- f.data[current:end]
}
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = nil
return nil
}
func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) {
return 0, nil
}

func (f *MemChanFile) Close() error {
f.reader = nil
close(f.Buffer)
return nil
}
Expand Down
24 changes: 21 additions & 3 deletions core/util/validation_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ const (
Right
)

const (
START_LENGTH = 64
ADD_LENGTH = 320
)

type ValidationTree struct {
writeLock sync.Mutex
writeCount int
Expand Down Expand Up @@ -61,7 +66,7 @@ func (v *ValidationTree) Write(b []byte) (int, error) {
return 0, nil
}

if v.writtenSize+int64(len(b)) > v.dataSize {
if v.dataSize > 0 && v.writtenSize+int64(len(b)) > v.dataSize {
return 0, fmt.Errorf("data size overflow. expected %d, got %d", v.dataSize, v.writtenSize+int64(len(b)))
}

Expand All @@ -80,6 +85,12 @@ func (v *ValidationTree) Write(b []byte) (int, error) {
n, _ := v.h.Write(b[i:j])
v.writeCount += n // update write count
if v.writeCount == MaxMerkleLeavesSize {
if v.leafIndex >= len(v.leaves) {
// increase leaves size
leaves := make([][]byte, len(v.leaves)+ADD_LENGTH)
copy(leaves, v.leaves)
v.leaves = leaves
}
v.leaves[v.leafIndex] = v.h.Sum(nil)
v.leafIndex++
v.writeCount = 0 // reset writeCount
Expand Down Expand Up @@ -137,21 +148,28 @@ func (v *ValidationTree) Finalize() error {
if v.isFinalized {
return errors.New("already finalized")
}
if v.writtenSize != v.dataSize {
if v.dataSize > 0 && v.writtenSize != v.dataSize {
return fmt.Errorf("invalid size. Expected %d got %d", v.dataSize, v.writtenSize)
}

v.isFinalized = true

if v.writeCount > 0 {
v.leaves[v.leafIndex] = v.h.Sum(nil)
} else {
v.leafIndex--
}
if v.leafIndex < len(v.leaves) {
v.leaves = v.leaves[:v.leafIndex+1]
}
return nil
}

func NewValidationTree(dataSize int64) *ValidationTree {
totalLeaves := (dataSize + MaxMerkleLeavesSize - 1) / MaxMerkleLeavesSize

if totalLeaves == 0 {
totalLeaves = START_LENGTH
}
return &ValidationTree{
dataSize: dataSize,
h: sha256.New(),
Expand Down
2 changes: 2 additions & 0 deletions wasmsdk/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/0chain/gosdk/zcnbridge"
"github.com/0chain/gosdk/zcnbridge/errors"
"github.com/0chain/gosdk/zcnbridge/log"
"github.com/0chain/gosdk/zcnbridge/transaction"
"github.com/0chain/gosdk/zcnbridge/wallet"
"github.com/0chain/gosdk/zcncore"
Expand Down Expand Up @@ -124,6 +125,7 @@ func getNotProcessedWZCNBurnEvents() string {
return errors.New("getNotProcessedWZCNBurnEvents", "failed to retreive last ZCN processed mint nonce").Error()
}

log.Logger.Debug("MintNonce = " + strconv.Itoa(int(mintNonce)))
burnEvents, err := bridge.QueryEthereumBurnEvents(strconv.Itoa(int(mintNonce)))
if err != nil {
return errors.Wrap("getNotProcessedWZCNBurnEvents", "failed to retreive WZCN burn events", err).Error()
Expand Down
13 changes: 11 additions & 2 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type OperationRequest struct {
FileReader io.Reader
Mask *zboxutil.Uint128 // Required for delete repair operation
DownloadFile bool // Required for upload repair operation
StreamUpload bool // Required for streaming file when actualSize is not available
Opts []ChunkedUploadOption
}

Expand Down Expand Up @@ -402,11 +403,13 @@ func (a *Allocation) RepairFile(file sys.File, remotepath string, statusCallback
WithEncrypt(true),
WithStatusCallback(statusCallback),
WithEncryptedPoint(ref.EncryptedKeyPoint),
WithChunkNumber(100),
}
} else {
opts = []ChunkedUploadOption{
WithMask(mask),
WithStatusCallback(statusCallback),
WithChunkNumber(100),
}
}
op := &OperationRequest{
Expand Down Expand Up @@ -894,7 +897,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul
cancelLock.Lock()
CancelOpCtx[op.FileMeta.RemotePath] = mo.ctxCncl
cancelLock.Unlock()
operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.Opts...)
operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.StreamUpload, op.Opts...)

case constants.FileOperationDelete:
if op.Mask != nil {
Expand All @@ -907,7 +910,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul
cancelLock.Lock()
CancelOpCtx[op.FileMeta.RemotePath] = mo.ctxCncl
cancelLock.Unlock()
operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.Opts...)
operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.StreamUpload, op.Opts...)

case constants.FileOperationCreateDir:
operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
Expand Down Expand Up @@ -1106,6 +1109,9 @@ func (a *Allocation) generateDownloadRequest(
downloadReq.contentMode = contentMode
downloadReq.connectionID = connectionID
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}

return downloadReq, nil
}
Expand Down Expand Up @@ -2219,6 +2225,9 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str
downloadReq.fullconsensus = a.fullconsensus
downloadReq.consensusThresh = a.consensusThreshold
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}
downloadReq.connectionID = zboxutil.NewConnectionId()
downloadReq.completedCallback = func(remotepath string, remotepathHash string) {
a.mutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (su *ChunkedUpload) process() error {
}
if su.fileMeta.ActualSize == 0 {
su.fileMeta.ActualSize = su.progress.ReadLength
su.shardSize = getShardSize(su.fileMeta.ActualSize, su.allocationObj.DataShards, su.encryptOnUpload)
} else if su.fileMeta.ActualSize != su.progress.ReadLength && su.thumbnailBytes == nil {
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, thrown.New("upload_failed", "Upload failed. Uploaded size does not match with actual size: "+fmt.Sprintf("%d != %d", su.fileMeta.ActualSize, su.progress.ReadLength)))
Expand Down Expand Up @@ -568,7 +569,6 @@ func (su *ChunkedUpload) readChunks(num int) (*batchChunksData, error) {

// upload entire thumbnail in first chunk request only
if chunk.Index == 0 && len(su.thumbnailBytes) > 0 {
data.totalReadSize += int64(su.fileMeta.ActualThumbnailSize)

data.thumbnailShards, err = su.chunkReader.Read(su.thumbnailBytes)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions zboxcore/sdk/chunked_upload_blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
}

respbody := resp.Body()
if err != nil {
logger.Logger.Error("Error: Resp ", err)
return fmt.Errorf("Error while reading body. Error %s", err), false
}

if resp.StatusCode() == http.StatusTooManyRequests {
logger.Logger.Error("Got too many request error")
var r int
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/dirworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u

for i := 0; i < 3; i++ {
err, shouldContinue = func() (err error, shouldContinue bool) {
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 30))
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 10))
resp, err = zboxutil.Client.Do(httpreq.WithContext(ctx))
cncl()
if err != nil {
Expand Down
24 changes: 15 additions & 9 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type DownloadRequest struct {
bufferMap map[int]zboxutil.DownloadBuffer
downloadStorer DownloadProgressStorer
workdir string
downloadQueue downloadQueue
downloadQueue downloadQueue // Always initialize this queue with max time taken
}

type downloadPriority struct {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (req *DownloadRequest) downloadBlock(
if req.shouldVerify {
go AddBlockDownloadReq(req.ctx, blockDownloadReq, nil, req.effectiveBlockSize)
} else {
go AddBlockDownloadReq(req.ctx, blockDownloadReq, req.bufferMap[int(pos)], req.effectiveBlockSize)
go AddBlockDownloadReq(req.ctx, blockDownloadReq, req.bufferMap[blobberIdx], req.effectiveBlockSize)
}
}

Expand All @@ -308,7 +308,9 @@ func (req *DownloadRequest) downloadBlock(
downloadErrors[i] = fmt.Sprintf("Error %s from %s",
err.Error(), req.blobbers[result.idx].Baseurl)
logger.Logger.Error(err)
req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock / req.numBlocks))
if req.bufferMap != nil {
req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock / req.numBlocks))
}
} else if timeRequest {
req.downloadQueue[result.idx].timeTaken = result.timeTaken
}
Expand Down Expand Up @@ -538,10 +540,11 @@ func (req *DownloadRequest) processDownload() {
}
for i := req.downloadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
blobberIdx := req.downloadQueue[pos].blobberIdx
if writerAt {
req.bufferMap[int(pos)] = zboxutil.NewDownloadBufferWithChan(sz, int(numBlocks), req.effectiveBlockSize)
req.bufferMap[blobberIdx] = zboxutil.NewDownloadBufferWithChan(sz, int(numBlocks), req.effectiveBlockSize)
} else {
req.bufferMap[int(pos)] = zboxutil.NewDownloadBufferWithMask(sz, int(numBlocks), req.effectiveBlockSize)
req.bufferMap[blobberIdx] = zboxutil.NewDownloadBufferWithMask(sz, int(numBlocks), req.effectiveBlockSize)
}
}
}
Expand Down Expand Up @@ -678,14 +681,19 @@ func (req *DownloadRequest) processDownload() {
var progressLock sync.Mutex
firstReqWG := sync.WaitGroup{}
firstReqWG.Add(1)
eg, _ := errgroup.WithContext(ctx)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(downloadWorkerCount + EXTRA_COUNT)
for i := 0; i < n; i++ {
j := i
if i == 1 {
firstReqWG.Wait()
heap.Init(&req.downloadQueue)
}
select {
case <-egCtx.Done():
goto breakDownloadLoop
default:
}
eg.Go(func() error {

if j == 0 {
Expand Down Expand Up @@ -735,6 +743,7 @@ func (req *DownloadRequest) processDownload() {
}
return nil
})
breakDownloadLoop:
}
if err := eg.Wait(); err != nil {
writeCancel()
Expand Down Expand Up @@ -1082,11 +1091,8 @@ func GetFileRefFromBlobber(allocationID, blobberId, remotePath string) (fRef *fi
listReq.ctx = ctx
listReq.remotefilepath = remotePath

listReq.wg = &sync.WaitGroup{}
listReq.wg.Add(1)
rspCh := make(chan *fileMetaResponse, 1)
go listReq.getFileMetaInfoFromBlobber(listReq.blobbers[0], 0, rspCh)
listReq.wg.Wait()
resp := <-rspCh
return resp.fileref, resp.err
}
Expand Down
5 changes: 0 additions & 5 deletions zboxcore/sdk/filemetaworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"mime/multipart"
"net/http"
"sync"
"time"

"github.com/0chain/errors"
Expand All @@ -25,7 +24,6 @@ type fileMetaResponse struct {
}

func (req *ListRequest) getFileMetaInfoFromBlobber(blobber *blockchain.StorageNode, blobberIdx int, rspCh chan<- *fileMetaResponse) {
defer req.wg.Done()
body := new(bytes.Buffer)
formWriter := multipart.NewWriter(body)

Expand Down Expand Up @@ -92,13 +90,10 @@ func (req *ListRequest) getFileMetaInfoFromBlobber(blobber *blockchain.StorageNo

func (req *ListRequest) getFileMetaFromBlobbers() []*fileMetaResponse {
numList := len(req.blobbers)
req.wg = &sync.WaitGroup{}
req.wg.Add(numList)
rspCh := make(chan *fileMetaResponse, numList)
for i := 0; i < numList; i++ {
go req.getFileMetaInfoFromBlobber(req.blobbers[i], i, rspCh)
}
req.wg.Wait()
fileInfos := make([]*fileMetaResponse, len(req.blobbers))
for i := 0; i < numList; i++ {
ch := <-rspCh
Expand Down
4 changes: 0 additions & 4 deletions zboxcore/sdk/filemetaworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,9 @@ func TestListRequest_getFileMetaInfoFromBlobber(t *testing.T) {
authToken: &marker.AuthTicket{
Signature: mockSignature,
},
wg: &sync.WaitGroup{},
}
rspCh := make(chan *fileMetaResponse, 1)
req.wg.Add(1)
go req.getFileMetaInfoFromBlobber(blobber, 73, rspCh)
req.wg.Wait()

var resp *fileMetaResponse
select {
Expand Down Expand Up @@ -280,7 +277,6 @@ func TestListRequest_getFileConsensusFromBlobbers(t *testing.T) {
allocationTx: mockAllocationTxId,
ctx: context.TODO(),
blobbers: []*blockchain.StorageNode{},
wg: &sync.WaitGroup{},
Consensus: tt.consensus, //nolint
}
for i := 0; i < tt.numBlobbers; i++ {
Expand Down
Loading