Skip to content

Commit eb4abb2

Browse files
dthainbtovar
andauthored
Vine: Send cache-update on all file types. (#4128)
* - Manager now puts replicas of all types into PENDING state when sent. - Worker now responds with cache-update for all file types. * format --------- Co-authored-by: Benjamin Tovar <[email protected]>
1 parent 922e731 commit eb4abb2

File tree

7 files changed

+27
-35
lines changed

7 files changed

+27
-35
lines changed

taskvine/src/manager/vine_manager_put.c

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -441,24 +441,12 @@ static vine_result_code_t vine_manager_put_input_file_if_needed(struct vine_mana
441441
/* Now send the actual file. */
442442
vine_result_code_t result = vine_manager_put_input_file(q, w, t, m, file_to_send);
443443

444-
/* If the send succeeded, then record it in the worker */
445-
if (result == VINE_SUCCESS) {
446-
struct vine_file_replica *replica = vine_file_replica_table_get_or_create(q, w, f->cached_name, f->type, f->cache_level, f->size, f->mtime);
444+
/* If the send succeeded, then note that we have a PENDING replica */
445+
/* If will be marked as READY when a cache-update message comes back. */
447446

448-
switch (file_to_send->type) {
449-
case VINE_URL:
450-
case VINE_TEMP:
451-
/* For these types, a cache-update will arrive when the replica actually exists. */
452-
replica->state = VINE_FILE_REPLICA_STATE_CREATING;
453-
break;
454-
case VINE_FILE:
455-
case VINE_MINI_TASK:
456-
case VINE_BUFFER:
457-
/* For these types, we sent the data, so we know it exists. */
458-
replica->state = VINE_FILE_REPLICA_STATE_READY;
459-
f->state = VINE_FILE_STATE_CREATED;
460-
break;
461-
}
447+
if (result == VINE_SUCCESS) {
448+
struct vine_file_replica *replica = vine_file_replica_create(f->type, f->cache_level, f->size, f->mtime);
449+
vine_file_replica_table_insert(q, w, f->cached_name, replica);
462450
}
463451

464452
return result;

taskvine/src/manager/vine_protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API.
1313
#ifndef VINE_PROTOCOL_H
1414
#define VINE_PROTOCOL_H
1515

16-
#define VINE_PROTOCOL_VERSION 12
16+
#define VINE_PROTOCOL_VERSION 13
1717

1818
#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */
1919

taskvine/src/worker/vine_cache.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ void vine_cache_scan(struct vine_cache *c, struct link *manager)
122122
char *cachename;
123123
HASH_TABLE_ITERATE(c->table, cachename, f)
124124
{
125-
vine_worker_send_cache_update(manager, cachename, f->original_type, f->cache_level, f->size, f->mode, f->transfer_time, f->start_time);
125+
vine_worker_send_cache_update(manager, cachename, f);
126126
}
127127
}
128128

@@ -262,7 +262,7 @@ and writing out the metadata to the proper location.
262262
*/
263263

264264
int vine_cache_add_file(
265-
struct vine_cache *c, const char *cachename, const char *transfer_path, vine_cache_level_t level, int mode, uint64_t size, time_t mtime, timestamp_t transfer_time)
265+
struct vine_cache *c, const char *cachename, const char *transfer_path, vine_cache_level_t level, int mode, uint64_t size, time_t mtime, timestamp_t start_time, timestamp_t transfer_time, struct link *manager)
266266
{
267267
char *data_path = vine_cache_data_path(c, cachename);
268268
char *meta_path = vine_cache_meta_path(c, cachename);
@@ -285,12 +285,16 @@ int vine_cache_add_file(
285285
f->size = size;
286286
f->mtime = mtime;
287287
f->transfer_time = transfer_time;
288+
f->start_time = start_time;
288289

289290
/* File has data and is ready to use. */
290291
f->status = VINE_CACHE_STATUS_READY;
291292

292293
vine_cache_file_save_metadata(f, meta_path);
293294

295+
/* Inform the manager that we now have the file */
296+
vine_worker_send_cache_update(manager, cachename, f);
297+
294298
result = 1;
295299
} else {
296300
result = 0;
@@ -829,7 +833,7 @@ static void vine_cache_check_outputs(struct vine_cache *c, struct vine_cache_fil
829833
debug(D_VINE, "cache: measuring %s", transfer_path);
830834
if (vine_cache_file_measure_metadata(transfer_path, &mode, &size, &mtime)) {
831835
debug(D_VINE, "cache: created %s with size %lld in %lld usec", cachename, (long long)size, (long long)transfer_time);
832-
if (vine_cache_add_file(c, cachename, transfer_path, f->cache_level, mode, size, mtime, transfer_time)) {
836+
if (vine_cache_add_file(c, cachename, transfer_path, f->cache_level, mode, size, mtime, f->start_time, transfer_time, manager)) {
833837
f->status = VINE_CACHE_STATUS_READY;
834838
} else {
835839
debug(D_VINE, "cache: unable to move %s to %s: %s\n", transfer_path, cache_path, strerror(errno));
@@ -849,7 +853,7 @@ static void vine_cache_check_outputs(struct vine_cache *c, struct vine_cache_fil
849853

850854
if (manager) {
851855
if (f->status == VINE_CACHE_STATUS_READY) {
852-
vine_worker_send_cache_update(manager, cachename, f->original_type, f->cache_level, f->size, f->mode, transfer_time, f->start_time);
856+
/* a positive cache-update message was sent by vine_cache_add_file */
853857
} else {
854858
char *error_path = vine_cache_error_path(c, cachename);
855859
char *error_message = NULL;

taskvine/src/worker/vine_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ char *vine_cache_meta_path( struct vine_cache *c, const char *cachename );
5555
char *vine_cache_transfer_path( struct vine_cache *c, const char *cachename );
5656
char *vine_cache_error_path( struct vine_cache *c, const char *cachename );
5757

58-
int vine_cache_add_file( struct vine_cache *c, const char *cachename, const char *transfer_path, vine_cache_level_t level, int mode, uint64_t size, time_t mtime, timestamp_t transfer_time );
58+
int vine_cache_add_file( struct vine_cache *c, const char *cachename, const char *transfer_path, vine_cache_level_t level, int mode, uint64_t size, time_t mtime, timestamp_t start_time, timestamp_t transfer_time, struct link *manager );
5959
int vine_cache_add_transfer( struct vine_cache *c, const char *cachename, const char *source, vine_cache_level_t level, int mode, uint64_t size, vine_cache_flags_t flags );
6060
int vine_cache_add_mini_task( struct vine_cache *c, const char *cachename, const char *source, struct vine_task *mini_task, vine_cache_level_t level, int mode, uint64_t size );
6161

taskvine/src/worker/vine_sandbox.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,8 @@ static int stage_output_file(struct vine_process *p, struct vine_mount *m, struc
201201
debug(D_VINE, "output: measuring %s", sandbox_path);
202202
if (vine_cache_file_measure_metadata(sandbox_path, &mode, &size, &mtime)) {
203203
debug(D_VINE, "output: moving %s to %s", sandbox_path, cache_path);
204-
if (vine_cache_add_file(cache, f->cached_name, sandbox_path, f->cache_level, mode, size, mtime, transfer_time)) {
204+
if (vine_cache_add_file(cache, f->cached_name, sandbox_path, f->cache_level, mode, size, mtime, p->execution_start, transfer_time, manager)) {
205205
f->size = size;
206-
vine_worker_send_cache_update(manager, f->cached_name, f->type, f->cache_level, f->size, mode, transfer_time, p->execution_start);
207206
result = 1;
208207
} else {
209208
debug(D_VINE, "output: unable to move %s to %s: %s\n", sandbox_path, cache_path, strerror(errno));

taskvine/src/worker/vine_worker.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ See the file COPYING for details.
55
*/
66

77
#include "vine_cache.h"
8+
#include "vine_cache_file.h"
89
#include "vine_catalog.h"
910
#include "vine_file.h"
1011
#include "vine_gpus.h"
@@ -495,8 +496,7 @@ Send an asynchronmous message to the manager indicating that an item was success
495496
its size in bytes and transfer time in usec.
496497
*/
497498

498-
void vine_worker_send_cache_update(struct link *manager, const char *cachename, vine_file_type_t type, vine_cache_level_t cache_level, int64_t size, time_t mtime,
499-
timestamp_t transfer_time, timestamp_t transfer_start)
499+
void vine_worker_send_cache_update(struct link *manager, const char *cachename, struct vine_cache_file *f)
500500
{
501501
char *transfer_id = hash_table_remove(current_transfers, cachename);
502502
if (!transfer_id) {
@@ -506,12 +506,12 @@ void vine_worker_send_cache_update(struct link *manager, const char *cachename,
506506
send_async_message(manager,
507507
"cache-update %s %d %d %lld %lld %lld %lld %s\n",
508508
cachename,
509-
type,
510-
cache_level,
511-
(long long)size,
512-
(long long)mtime,
513-
(long long)transfer_time,
514-
(long long)transfer_start,
509+
f->original_type,
510+
f->cache_level,
511+
(long long)f->size,
512+
(long long)f->mtime,
513+
(long long)f->transfer_time,
514+
(long long)f->start_time,
515515
transfer_id);
516516

517517
free(transfer_id);
@@ -986,7 +986,7 @@ static int do_put(struct link *manager, const char *cachename, vine_cache_level_
986986
/* XXX actual_size should equal expected size, but only for a simple file, not a dir. */
987987

988988
if (r) {
989-
vine_cache_add_file(cache_manager, cachename, transfer_path, cache_level, mode, actual_size, mtime, stop - start);
989+
vine_cache_add_file(cache_manager, cachename, transfer_path, cache_level, mode, actual_size, mtime, start, stop - start, manager);
990990
} else {
991991
trash_file(transfer_path);
992992
}

taskvine/src/worker/vine_worker.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55

66
#include "vine_workspace.h"
77
#include "vine_worker_options.h"
8+
#include "vine_cache_file.h"
89

910
#include "timestamp.h"
1011
#include "link.h"
1112

12-
void vine_worker_send_cache_update( struct link *manager, const char *cachename, vine_file_type_t type, vine_cache_level_t cache_level, int64_t size, time_t mtime, timestamp_t transfer_time, timestamp_t transfer_start );
13+
void vine_worker_send_cache_update( struct link *manager, const char *cachename, struct vine_cache_file *f );
1314
void vine_worker_send_cache_invalid( struct link *manager, const char *cachename, const char *message );
1415

1516
extern struct vine_workspace *workspace;

0 commit comments

Comments
 (0)