Skip to content

Commit 373d164

Browse files
authored
Merge pull request #9 from OpenDroneMap/chunked
Chunked upload support
2 parents 5a5249b + ef42976 commit 373d164

File tree

3 files changed

+265
-30
lines changed

3 files changed

+265
-30
lines changed

internal/cmd/root.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
var outputPath string
3434
var nodeName string
3535
var force bool
36+
var parallelConnections int
3637

3738
var rootCmd = &cobra.Command{
3839
Use: "odm [flags] <images> [<gcp>] [args]",
@@ -91,7 +92,7 @@ var rootCmd = &cobra.Command{
9192
}
9293
}
9394

94-
odm.Run(inputFiles, parseOptions(options, nodeOptions), *node, outputPath)
95+
odm.Run(inputFiles, parseOptions(options, nodeOptions), *node, outputPath, parallelConnections)
9596
},
9697

9798
TraverseChildren: true,
@@ -115,6 +116,7 @@ func init() {
115116
rootCmd.Flags().BoolVarP(&force, "force", "f", false, "replace the contents of the output directory if it already exists")
116117
rootCmd.Flags().StringVarP(&outputPath, "output", "o", "./output", "directory where to store processing results")
117118
rootCmd.Flags().StringVarP(&nodeName, "node", "n", "default", "Processing node to use")
119+
rootCmd.Flags().IntVarP(&parallelConnections, "parallel-connections", "p", 5, "Parallel upload connections. Set to 1 to disable parallel uploads")
118120

119121
rootCmd.Flags().SetInterspersed(false)
120122
}

internal/odm/node.go

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"io/ioutil"
2424
"math"
25+
"mime/multipart"
2526
"net/http"
2627
"net/url"
2728
"os"
@@ -76,6 +77,11 @@ type ApiActionResponse struct {
7677
Error string `json:"error"`
7778
}
7879

80+
type TaskNewResponse struct {
81+
UUID string `json:"uuid"`
82+
Error string `json:"error"`
83+
}
84+
7985
// Node is a NodeODM processing node
8086
type Node struct {
8187
URL string `json:"url"`
@@ -258,7 +264,12 @@ func (n Node) TaskDownload(uuid string, asset string, outputFile string) error {
258264
bar.Prefix("[" + asset + "]")
259265
}
260266

261-
writer := io.MultiWriter(out, bar)
267+
var writer io.Writer
268+
if bar != nil {
269+
writer = io.MultiWriter(out, bar)
270+
} else {
271+
writer = out
272+
}
262273

263274
written, err := io.Copy(writer, resp.Body)
264275
if written == 0 {
@@ -291,6 +302,120 @@ func (n Node) TaskCancel(uuid string) error {
291302
return nil
292303
}
293304

305+
// TaskNewInit POST: /task/new/init
306+
func (n Node) TaskNewInit(jsonOptions []byte) TaskNewResponse {
307+
var err error
308+
reqBody := &bytes.Buffer{}
309+
mpw := multipart.NewWriter(reqBody)
310+
mpw.WriteField("skipPostProcessing", "true")
311+
mpw.WriteField("options", string(jsonOptions))
312+
if err = mpw.Close(); err != nil {
313+
return TaskNewResponse{"", err.Error()}
314+
}
315+
316+
resp, err := http.Post(n.URLFor("/task/new/init"), mpw.FormDataContentType(), reqBody)
317+
defer resp.Body.Close()
318+
if err != nil {
319+
return TaskNewResponse{"", err.Error()}
320+
}
321+
322+
body, err := ioutil.ReadAll(resp.Body)
323+
if err != nil {
324+
return TaskNewResponse{"", err.Error()}
325+
}
326+
327+
var res TaskNewResponse
328+
if err := json.Unmarshal(body, &res); err != nil {
329+
return TaskNewResponse{"", err.Error()}
330+
}
331+
332+
return res
333+
}
334+
335+
// TaskNewUpload POST: /task/new/upload/{uuid}
336+
func (n Node) TaskNewUpload(file string, uuid string, bar *pb.ProgressBar) error {
337+
var f *os.File
338+
var fi os.FileInfo
339+
var err error
340+
r, w := io.Pipe()
341+
mpw := multipart.NewWriter(w)
342+
343+
go func() {
344+
var part io.Writer
345+
defer w.Close()
346+
defer f.Close()
347+
348+
if f, err = os.Open(file); err != nil {
349+
return
350+
}
351+
if fi, err = f.Stat(); err != nil {
352+
return
353+
}
354+
355+
if bar != nil {
356+
bar.SetTotal64(fi.Size())
357+
bar.Set64(0)
358+
bar.Prefix("[" + fi.Name() + "]")
359+
}
360+
361+
if part, err = mpw.CreateFormFile("images", fi.Name()); err != nil {
362+
return
363+
}
364+
365+
if bar != nil {
366+
part = io.MultiWriter(part, bar)
367+
}
368+
369+
if _, err = io.Copy(part, f); err != nil {
370+
return
371+
}
372+
373+
if err = mpw.Close(); err != nil {
374+
return
375+
}
376+
}()
377+
378+
resp, err := http.Post(n.URLFor("/task/new/upload/"+uuid), mpw.FormDataContentType(), r)
379+
if err != nil {
380+
return err
381+
}
382+
defer resp.Body.Close()
383+
body, err := ioutil.ReadAll(resp.Body)
384+
if err != nil {
385+
return err
386+
}
387+
388+
var res ApiActionResponse
389+
if err := json.Unmarshal(body, &res); err != nil {
390+
return err
391+
}
392+
393+
if res.Error != "" {
394+
return errors.New(res.Error)
395+
}
396+
397+
if !res.Success {
398+
return errors.New("Cannot complete upload. /task/new/upload failed with success: false")
399+
}
400+
401+
return nil
402+
}
403+
404+
// TaskNewCommit POST: /task/new/commit/{uuid}
405+
func (n Node) TaskNewCommit(uuid string) TaskNewResponse {
406+
var res TaskNewResponse
407+
408+
body, err := n.apiPOST("/task/new/commit/"+uuid, map[string]string{})
409+
if err != nil {
410+
return TaskNewResponse{"", err.Error()}
411+
}
412+
if err := json.Unmarshal(body, &res); err != nil {
413+
return TaskNewResponse{"", err.Error()}
414+
}
415+
416+
return res
417+
}
418+
294419
func (n Node) CheckAuthentication(err error) error {
295420
if err != nil {
296421
if err == ErrUnauthorized {

internal/odm/run.go

Lines changed: 136 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package odm
1717

1818
import (
1919
"encoding/json"
20+
"errors"
2021
"io"
2122
"io/ioutil"
2223
"mime/multipart"
@@ -34,44 +35,41 @@ import (
3435
"github.com/cheggaaa/pb"
3536
)
3637

37-
type TaskNewResponse struct {
38-
UUID string `json:"uuid"`
39-
Error string `json:"error"`
38+
type fileUpload struct {
39+
filename string
40+
retries int
4041
}
4142

42-
// Run processes a dataset
43-
func Run(files []string, options []Option, node Node, outputPath string) {
44-
var err error
45-
var bar *pb.ProgressBar
43+
type fileUploadResult struct {
44+
filename string
45+
err error
46+
retries int
47+
}
4648

49+
func singleUpload(node Node, files []string, jsonOptions []byte) TaskNewResponse {
4750
var f *os.File
4851
var fi os.FileInfo
49-
50-
var totalBytes int64
52+
var err error
53+
var bar *pb.ProgressBar
54+
var res TaskNewResponse
5155

5256
showProgress := !logger.QuietFlag
5357

54-
// Calculate total upload size
55-
for _, file := range files {
56-
if fi, err = os.Stat(file); err != nil {
57-
logger.Error(err)
58+
if showProgress {
59+
var totalBytes int64
60+
61+
// Calculate total upload size
62+
for _, file := range files {
63+
if fi, err = os.Stat(file); err != nil {
64+
logger.Error(err)
65+
}
66+
totalBytes += fi.Size()
5867
}
59-
totalBytes += fi.Size()
60-
f.Close()
61-
}
6268

63-
if showProgress {
6469
bar = pb.New64(totalBytes).SetUnits(pb.U_BYTES).SetRefreshRate(time.Millisecond * 10)
6570
bar.Start()
6671
}
6772

68-
// Convert options to JSON
69-
jsonOptions, err := json.Marshal(options)
70-
if err != nil {
71-
logger.Error(err)
72-
}
73-
74-
// Setup pipe
7573
r, w := io.Pipe()
7674
mpw := multipart.NewWriter(w)
7775

@@ -122,18 +120,128 @@ func Run(files []string, options []Option, node Node, outputPath string) {
122120
logger.Error(err)
123121
}
124122

125-
res := TaskNewResponse{}
123+
if showProgress {
124+
bar.Finish()
125+
}
126+
126127
if err := json.Unmarshal(body, &res); err != nil {
127128
logger.Error(err)
128129
}
129130

130-
// Handle error response from API
131+
return res
132+
}
133+
134+
func uploadWorker(id int, node Node, uuid string, barPool *pb.Pool, filesToProcess <-chan fileUpload, results chan<- fileUploadResult) {
135+
var bar *pb.ProgressBar
136+
137+
if barPool != nil {
138+
bar = pb.New64(0).SetUnits(pb.U_BYTES).SetRefreshRate(time.Millisecond * 10)
139+
barPool.Add(bar)
140+
}
141+
142+
for f := range filesToProcess {
143+
results <- fileUploadResult{f.filename, node.TaskNewUpload(f.filename, uuid, bar), f.retries}
144+
}
145+
}
146+
147+
func chunkedUpload(node Node, files []string, jsonOptions []byte, parallelUploads int) TaskNewResponse {
148+
var err error
149+
var barPool *pb.Pool
150+
var mainBar *pb.ProgressBar
151+
var res TaskNewResponse
152+
153+
showProgress := !logger.QuietFlag
154+
155+
// Invoke /task/new/init
156+
res = node.TaskNewInit(jsonOptions)
131157
if res.Error != "" {
132-
logger.Error(res.Error)
158+
logger.Error(err)
133159
}
134160

135161
if showProgress {
136-
bar.Finish()
162+
barPool = pb.NewPool()
163+
}
164+
165+
// Create workers
166+
filesToProcess := make(chan fileUpload, len(files))
167+
results := make(chan fileUploadResult, len(files))
168+
169+
for w := 1; w <= parallelUploads; w++ {
170+
go uploadWorker(w, node, res.UUID, barPool, filesToProcess, results)
171+
}
172+
173+
if barPool != nil {
174+
barPool.Start()
175+
176+
mainBar = pb.New(len(files)).SetUnits(pb.U_NO).SetRefreshRate(time.Millisecond * 10)
177+
mainBar.Format("[\x00#\x00\x00_\x00]")
178+
mainBar.Prefix("Files Uploaded:")
179+
mainBar.Start()
180+
}
181+
182+
// Fill queue
183+
for _, file := range files {
184+
filesToProcess <- fileUpload{file, 0}
185+
}
186+
187+
// Wait
188+
MaxRetries := 10
189+
filesLeft := len(files)
190+
for filesLeft > 0 {
191+
fur := <-results
192+
193+
if fur.err != nil {
194+
if fur.retries < MaxRetries {
195+
// Retry
196+
filesToProcess <- fileUpload{fur.filename, fur.retries + 1}
197+
} else {
198+
logger.Error(errors.New("Cannot upload " + fur.filename + ", exceeded max retries (" + string(MaxRetries) + ")"))
199+
}
200+
} else {
201+
filesLeft--
202+
if mainBar != nil {
203+
mainBar.Set(len(files) - filesLeft)
204+
}
205+
}
206+
}
207+
close(filesToProcess)
208+
209+
if barPool != nil {
210+
barPool.Stop()
211+
}
212+
if mainBar != nil {
213+
mainBar.Finish()
214+
}
215+
216+
// Commit
217+
res = node.TaskNewCommit(res.UUID)
218+
if res.Error != "" {
219+
logger.Error(res.Error)
220+
}
221+
222+
return res
223+
}
224+
225+
// Run processes a dataset
226+
func Run(files []string, options []Option, node Node, outputPath string, parallelConnections int) {
227+
var err error
228+
229+
// Convert options to JSON
230+
jsonOptions, err := json.Marshal(options)
231+
if err != nil {
232+
logger.Error(err)
233+
}
234+
235+
var res TaskNewResponse
236+
if parallelConnections <= 1 {
237+
res = singleUpload(node, files, jsonOptions)
238+
} else {
239+
res = chunkedUpload(node, files, jsonOptions, parallelConnections)
240+
}
241+
242+
// Handle error response from API
243+
if res.Error != "" {
244+
logger.Error(res.Error)
137245
}
138246

139247
// We should have a UUID

0 commit comments

Comments
 (0)