Skip to content

Commit 338f410

Browse files
authored
Add ability to log upload rates (#39)
1 parent 028f4eb commit 338f410

File tree

4 files changed

+27
-17
lines changed

4 files changed

+27
-17
lines changed

src/blobs.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ API.startMultipartUpload(x::Container, key; kw...) = nothing
4646

4747
function API.uploadPart(x::Container, url, part, partNumber, uploadId; kw...)
4848
blockid = base64encode(lpad(partNumber - 1, 64, '0'))
49-
Azure.put(url, [], part; query=Dict("comp" => "block", "blockid" => blockid), kw...)
50-
return blockid
49+
resp = Azure.put(url, [], part; query=Dict("comp" => "block", "blockid" => blockid), kw...)
50+
return (blockid, Base.get(resp.request.context, :nbytes_written, 0))
5151
end
5252

5353
function API.completeMultipartUpload(x::Container, url, eTags, uploadId; kw...)

src/get.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ function getObjectImpl(x::AbstractStore, key::String, out::ResponseBodyType=noth
6363
allowMultipart::Bool=true,
6464
decompress::Bool=false,
6565
zlibng::Bool=false,
66-
headers=HTTP.Headers(), kw...)
66+
headers=HTTP.Headers(),
67+
lograte::Bool=false, kw...)
6768

6869
# keyword arg handling
6970
if (out isa AbstractVector{UInt8} && length(out) <= multipartThreshold)
@@ -170,6 +171,6 @@ function getObjectImpl(x::AbstractStore, key::String, out::ResponseBodyType=noth
170171
end_time = time()
171172
bytes = nbytes[]
172173
gbits_per_second = bytes == 0 ? 0 : (((8 * bytes) / 1e9) / (end_time - start_time))
173-
@debug "CloudStore.get complete with bandwidth: $(gbits_per_second) Gbps"
174+
lograte && @info "CloudStore.get complete with bandwidth: $(gbits_per_second) Gbps"
174175
return res
175176
end

src/put.jl

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,18 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
5252
batchSize::Int=defaultBatchSize(),
5353
allowMultipart::Bool=true,
5454
zlibng::Bool=false,
55-
compress::Bool=false, credentials=nothing, kw...)
55+
compress::Bool=false, credentials=nothing,
56+
lograte::Bool=false, kw...)
5657

58+
start_time = time()
5759
N = nbytes(in)
60+
wbytes = Threads.Atomic{Int}(0)
5861
if N <= multipartThreshold || !allowMultipart
5962
body = prepBody(in, compress, zlibng)
6063
resp = putObject(x, key, body; credentials, kw...)
61-
return Object(x, credentials, key, N, etag(HTTP.header(resp, "ETag")))
64+
wbytes[] = get(resp.request.context, :nbytes_written, 0)
65+
obj = Object(x, credentials, key, N, etag(HTTP.header(resp, "ETag")))
66+
@goto done
6267
end
6368
# multipart upload
6469
uploadState = startMultipartUpload(x, key; credentials, kw...)
@@ -76,15 +81,13 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
7681
eof(body) && break
7782
n = (j - 1) * batchSize + i
7883
part = _read(body, partSize)
79-
let n=n, part=part
80-
Threads.@spawn begin
81-
eTag = uploadPart(x, url, part, n, uploadState; credentials, kw...)
82-
let eTag=eTag
83-
# we synchronize the eTags here because the order matters
84-
# for the final call to completeMultipartUpload
85-
put!(() -> push!(eTags, eTag), sync, n)
86-
end
87-
end
84+
Threads.@spawn begin
85+
_n = $n
86+
parteTag, wb = uploadPart(x, url, $part, _n, uploadState; credentials, kw...)
87+
Threads.atomic_add!(wbytes, wb)
88+
# we synchronize the eTags here because the order matters
89+
# for the final call to completeMultipartUpload
90+
put!(() -> push!(eTags, parteTag), sync, _n)
8891
end
8992
end
9093
eof(body) && break
@@ -97,5 +100,11 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
97100
close(body)
98101
end
99102
eTag = completeMultipartUpload(x, url, eTags, uploadState; credentials, kw...)
100-
return Object(x, credentials, key, N, eTag)
103+
obj = Object(x, credentials, key, N, eTag)
104+
@label done
105+
end_time = time()
106+
bytes = wbytes[]
107+
gbits_per_second = bytes == 0 ? 0 : (((8 * bytes) / 1e9) / (end_time - start_time))
108+
lograte && @info "CloudStore.put complete with bandwidth: $(gbits_per_second) Gbps"
109+
return obj
101110
end

src/s3.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ end
4848

4949
function API.uploadPart(x::Bucket, url, part, partNumber, uploadId; kw...)
5050
resp = AWS.put(url, [], part; query=Dict("partNumber" => string(partNumber), "uploadId" => uploadId), service="s3", kw...)
51-
return HTTP.header(resp, "ETag")
51+
return (HTTP.header(resp, "ETag"), Base.get(resp.request.context, :nbytes_written, 0))
5252
end
5353

5454
function API.completeMultipartUpload(x::Bucket, url, eTags, uploadId; kw...)

0 commit comments

Comments
 (0)