Skip to content

Commit 54d9d77

Browse files
author
allegroai
committed
Allow services mode to re-register (docker can kill it and not exit gracefully)
1 parent ce02385 commit 54d9d77

File tree

2 files changed

+17
-9
lines changed

2 files changed

+17
-9
lines changed

trains_agent/commands/worker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -676,16 +676,16 @@ def daemon(self, queues, log_level, foreground=False, docker=False, detached=Fal
676676
# match previous behaviour when we validated queue names before everything else
677677
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
678678

679+
self._standalone_mode = kwargs.get('standalone_mode', False)
680+
self._services_mode = kwargs.get('services_mode', False)
681+
679682
# make sure we only have a single instance,
680683
# also make sure we set worker_id properly and cache folders
681684
self._singleton()
682685

683686
# check if we have the latest version
684687
start_check_update_daemon()
685688

686-
self._standalone_mode = kwargs.get('standalone_mode', False)
687-
self._services_mode = kwargs.get('services_mode', False)
688-
689689
self.check(**kwargs)
690690
self.log.debug("starting resource monitor thread")
691691
print("Worker \"{}\" - ".format(self.worker_id), end='')
@@ -2281,8 +2281,11 @@ def _singleton(self):
22812281
else:
22822282
worker_name = '{}:cpu'.format(worker_name)
22832283

2284+
# if we are running in services mode, we allow double register since
2285+
# docker-compose will kill instances before they cleanup
22842286
self.worker_id, worker_slot = Singleton.register_instance(
2285-
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client)
2287+
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
2288+
allow_double=bool(self._services_mode) and bool(ENV_DOCKER_HOST_MOUNT.get()))
22862289

22872290
if self.worker_id is None:
22882291
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))

trains_agent/helper/singleton.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def update_pid_file(cls):
3838
pass
3939

4040
@classmethod
41-
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
41+
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
4242
"""
4343
# Exit the process if another instance of us is using the same worker_id
4444
@@ -65,8 +65,9 @@ def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=N
6565
f.write(bytes(os.getpid()))
6666
f.flush()
6767
try:
68-
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
69-
api_client=api_client)
68+
ret = cls._register_instance(
69+
unique_worker_id=unique_worker_id, worker_name=worker_name,
70+
api_client=api_client, allow_double=allow_double)
7071
except:
7172
ret = None, None
7273

@@ -78,7 +79,7 @@ def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=N
7879
return ret
7980

8081
@classmethod
81-
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
82+
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
8283
if cls.worker_id:
8384
return cls.worker_id, cls.instance_slot
8485
# make sure we have a unique name
@@ -123,7 +124,11 @@ def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=
123124
continue
124125

125126
if uid == unique_worker_id:
126-
return None, None
127+
if allow_double:
128+
warning('Instance with the same WORKER_ID [{}] was found on this machine. '
129+
'We are ignoring it, make sure this not a mistake.'.format(unique_worker_id))
130+
else:
131+
return None, None
127132

128133
slots[slot] = uid
129134

0 commit comments

Comments
 (0)