Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (f *MemChanFile) Write(p []byte) (n int, err error) {
f.Buffer <- data
} else {
if cap(f.data) == 0 {
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = make([]byte, 0, len(p))
}
f.data = append(f.data, p...)
}
Expand All @@ -326,7 +326,7 @@ func (f *MemChanFile) Sync() error {
}
f.Buffer <- f.data[current:end]
}
f.data = make([]byte, 0, f.ChunkWriteSize)
f.data = nil
return nil
}
func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) {
Expand Down
8 changes: 8 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,13 @@ func (a *Allocation) RepairFile(file sys.File, remotepath string, statusCallback
WithEncrypt(true),
WithStatusCallback(statusCallback),
WithEncryptedPoint(ref.EncryptedKeyPoint),
WithChunkNumber(100),
}
} else {
opts = []ChunkedUploadOption{
WithMask(mask),
WithStatusCallback(statusCallback),
WithChunkNumber(100),
}
}
op := &OperationRequest{
Expand Down Expand Up @@ -1106,6 +1108,9 @@ func (a *Allocation) generateDownloadRequest(
downloadReq.contentMode = contentMode
downloadReq.connectionID = connectionID
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}

return downloadReq, nil
}
Expand Down Expand Up @@ -2219,6 +2224,9 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str
downloadReq.fullconsensus = a.fullconsensus
downloadReq.consensusThresh = a.consensusThreshold
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
for i := 0; i < len(a.Blobbers); i++ {
downloadReq.downloadQueue[i].timeTaken = 1000000
}
downloadReq.connectionID = zboxutil.NewConnectionId()
downloadReq.completedCallback = func(remotepath string, remotepathHash string) {
a.mutex.Lock()
Expand Down
10 changes: 8 additions & 2 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type DownloadRequest struct {
bufferMap map[int]zboxutil.DownloadBuffer
downloadStorer DownloadProgressStorer
workdir string
downloadQueue downloadQueue
downloadQueue downloadQueue // Always initialize this queue with max time taken
}

type downloadPriority struct {
Expand Down Expand Up @@ -681,14 +681,19 @@ func (req *DownloadRequest) processDownload() {
var progressLock sync.Mutex
firstReqWG := sync.WaitGroup{}
firstReqWG.Add(1)
eg, _ := errgroup.WithContext(ctx)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(downloadWorkerCount + EXTRA_COUNT)
for i := 0; i < n; i++ {
j := i
if i == 1 {
firstReqWG.Wait()
heap.Init(&req.downloadQueue)
}
select {
case <-egCtx.Done():
goto breakDownloadLoop
default:
}
eg.Go(func() error {

if j == 0 {
Expand Down Expand Up @@ -738,6 +743,7 @@ func (req *DownloadRequest) processDownload() {
}
return nil
})
breakDownloadLoop:
}
if err := eg.Wait(); err != nil {
writeCancel()
Expand Down
4 changes: 2 additions & 2 deletions zboxcore/sdk/repairworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *RepairRequest) processRepair(ctx context.Context, a *Allocation) {
if r.checkForCancel(a) {
return
}

SetNumBlockDownloads(100)
r.iterateDir(a, r.listDir)

if r.statusCB != nil {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR
return nil
}
memFile := &sys.MemChanFile{
Buffer: make(chan []byte, 10),
Buffer: make(chan []byte, 100),
ChunkWriteSize: int(a.GetChunkReadSize(ref.EncryptedKey != "")),
}
op = a.RepairFile(memFile, file.Path, statusCB, found, ref)
Expand Down