3737from hax .common import HaxGlobalState
3838from hax .exception import HAConsistencyException , InterruptedException
3939from hax .types import (ByteCountStats , ConfHaProcess , Fid , FsStatsWithTime ,
40- ObjT , ObjHealth , ObjTMaskMap , Profile , PverInfo ,
41- PverState , m0HaProcessEvent , m0HaProcessType ,
42- KeyDelete , HaNoteStruct , m0HaObjState )
40+ FidTypeToObjT , ObjT , ObjHealth , ObjTMaskMap ,
41+ Profile , PverInfo , PverState , m0HaProcessEvent ,
42+ m0HaProcessType , KeyDelete , HaNoteStruct , m0HaObjState )
4343
4444from hax .consul .cache import (uses_consul_cache , invalidates_consul_cache ,
4545 supports_consul_cache )
@@ -1728,7 +1728,7 @@ def get_process_status(self,
17281728 fid : Fid ,
17291729 proc_node = None ,
17301730 kv_cache = None ) -> MotrConsulProcInfo :
1731- proc_base_fid = self .get_process_base_fid (fid )
1731+ proc_base_fid = self .get_base_fid (fid )
17321732 key = f'processes/{ proc_base_fid } '
17331733 status = self .kv .kv_get (key , kv_cache = kv_cache , allow_null = True )
17341734 if status :
@@ -1742,7 +1742,7 @@ def get_process_status_local(self,
17421742 fid : Fid ,
17431743 proc_node = None ,
17441744 kv_cache = None ) -> MotrConsulProcInfo :
1745- proc_base_fid = self .get_process_base_fid (fid )
1745+ proc_base_fid = self .get_base_fid (fid )
17461746 this_node = self .get_local_nodename ()
17471747 key = f'{ this_node } /processes/{ proc_base_fid } '
17481748 status = self .kv .kv_get (key , kv_cache = kv_cache , allow_null = True )
@@ -1753,12 +1753,12 @@ def get_process_status_local(self,
17531753 return MotrConsulProcInfo ('Unknown' , 'Unknown' )
17541754
17551755 @repeat_if_fails ()
1756- def get_process_full_fid (self , proc_base_fid : Fid ) -> Optional [Fid ]:
1757- proc_fid = self .kv .kv_get (str (proc_base_fid ), recurse = False )
1758- if proc_fid is not None :
1759- pfid : Fid = Fid .parse (json .loads (proc_fid ['Value' ]))
1760- return pfid
1761- return proc_base_fid
1756+ def get_obj_full_fid (self , base_fid : Fid ) -> Optional [Fid ]:
1757+ full_fid = self .kv .kv_get (str (base_fid ), recurse = False )
1758+ if full_fid is not None :
1759+ fid : Fid = Fid .parse (json .loads (full_fid ['Value' ]))
1760+ return fid
1761+ return base_fid
17621762
17631763 def is_proc_local (self , pfid : Fid ) -> bool :
17641764 local_node = self .get_local_nodename ()
@@ -1924,7 +1924,7 @@ def get_service_health(self,
19241924 @uses_consul_cache
19251925 def get_process_node (self , proc_fid : Fid , kv_cache = None ) -> str :
19261926 try :
1927- proc_base_fid = self .get_process_base_fid (proc_fid )
1927+ proc_base_fid = self .get_base_fid (proc_fid )
19281928 fidk = proc_base_fid .key
19291929 # 'node/<node_name>/process/<process_fidk>/service/type'
19301930 # node_items = self.kv.kv_get('m0conf/nodes',
@@ -2171,7 +2171,7 @@ def set_process_state(self,
21712171 ObjHealth .STOPPED : 'stopped' ,
21722172 ObjHealth .RECOVERING : 'dtm_recovering'
21732173 }
2174- proc_base_fid = self .get_process_base_fid (process_fid )
2174+ proc_base_fid = self .get_base_fid (process_fid )
21752175
21762176 # Example key is as follows
21772177 # m0conf/nodes/0x6e00000000000001:0x3/processes/0x7200000000000001:
@@ -2281,62 +2281,75 @@ def set_motr_processes_status(self, fid, status, bAdd=False):
22812281 motr_processes_status )
22822282
22832283 @repeat_if_fails ()
2284- def process_dynamic_fidk_lock (self ) -> bool :
2284+ def obj_dynamic_fidk_lock (self , objt : ObjT ) -> bool :
22852285 # Acquire lock to update last_updated_base_fidk.
22862286 # This will block until lock is acquired.
22872287 # Will break if any other exception than
22882288 # HAConsistencyException occurs.
22892289 try :
2290- while not self .kv .kv_put ('fidk_update_lock' , 'true' , cas = 0 ):
2290+ while not self .kv .kv_put (f'{ objt .name } /fidk_update_lock' ,
2291+ 'true' , cas = 0 ):
22912292 sleep (1 )
22922293 return True
22932294 except Exception :
22942295 return False
22952296
22962297 @repeat_if_fails ()
2297- def process_dynamic_fidk_unlock (self ):
2298+ def obj_dynamic_fidk_unlock (self , objt : ObjT ):
22982299 # Release fidk_update_lock.
22992300 # This will block until released.
23002301 try :
23012302 keys : List [KeyDelete ] = [
2302- KeyDelete (name = ' fidk_update_lock' , recurse = True ),
2303+ KeyDelete (name = f' { objt . name } / fidk_update_lock' , recurse = True ),
23032304 ]
23042305 while not self .kv .kv_delete_in_transaction (keys ):
23052306 sleep (1 )
23062307 except Exception :
23072308 raise RuntimeError ('Unreachable' )
23082309
23092310 @repeat_if_fails ()
2310- def get_process_next_dynamic_fidk_lock (self ) -> int :
2311- if self .process_dynamic_fidk_lock ( ):
2312- fidk = self .kv .kv_get ('last_dynamic_fid_key/process ' ,
2311+ def get_obj_next_dynamic_fidk_lock (self , objt : ObjT ) -> int :
2312+ if self .obj_dynamic_fidk_lock ( objt ):
2313+ fidk = self .kv .kv_get (f 'last_dynamic_fid_key/{ objt . name . lower () } ' ,
23132314 recurse = False )
23142315 new_fidk = int (json .loads (fidk ['Value' ])) + 1
23152316 # Update dynamic fid key.
2316- while not self .kv .kv_put ('last_dynamic_fid_key/process' ,
2317- json .dumps (str (new_fidk ))):
2317+ while not self .kv .kv_put (
2318+ f'last_dynamic_fid_key/{ objt .name .lower ()} ' ,
2319+ json .dumps (str (new_fidk ))):
23182320 sleep (1 )
2319- self .process_dynamic_fidk_unlock ( )
2321+ self .obj_dynamic_fidk_unlock ( objt )
23202322 return new_fidk
23212323
23222324 @repeat_if_fails ()
2323- def alloc_next_process_fid (self , process_fid : Fid ) -> Fid :
2324- next_fidk = self .get_process_next_dynamic_fidk_lock ()
2325- fid_mask : Fid = ObjTMaskMap [ObjT .PROCESS ]
2326- fid_cont = process_fid .container
2327- fid_key = process_fid .key + ((fid_mask .key * next_fidk ) + next_fidk )
2328- new_proc_fid = Fid (fid_cont , fid_key )
2329- base_fid = self .get_process_base_fid (new_proc_fid )
2325+ def alloc_next_obj_fid (self , obj_fid : Fid ) -> Fid :
2326+ objt : ObjT = FidTypeToObjT [obj_fid .container ]
2327+ next_fidk = self .get_obj_next_dynamic_fidk_lock (objt )
2328+ fid_mask : Fid = ObjTMaskMap [objt ]
2329+ fid_cont = obj_fid .container
2330+ fid_key = obj_fid .key + ((fid_mask .key * next_fidk ) + next_fidk )
2331+ new_obj_fid = Fid (fid_cont , fid_key )
2332+ base_fid = self .get_base_fid (new_obj_fid )
23302333 # Save base fid to actualy fid mapping in Consul.
23312334 while not self .kv .kv_put (f'{ base_fid } ' ,
2332- json .dumps (str (new_proc_fid ))):
2335+ json .dumps (str (new_obj_fid ))):
23332336 sleep (1 )
2334- return new_proc_fid
2335-
2336- def get_process_base_fid (self , proc_fid : Fid ) -> Fid :
2337- fid_mask : Fid = ObjTMaskMap [ObjT .PROCESS ]
2338- base_fid = Fid (proc_fid .container ,
2339- (proc_fid .key & fid_mask .key ))
2337+ return new_obj_fid
2338+
2339+ def update_process_fid (self , proc_fid : Fid ) -> List [Fid ]:
2340+ new_fids : List [Fid ] = []
2341+ # First allocate new process fid.
2342+ new_fids .append (self .alloc_next_obj_fid (proc_fid ))
2343+ service_list = self .get_services_by_parent_process (proc_fid )
2344+ for svc in service_list :
2345+ new_fids .append (self .alloc_next_obj_fid (svc .fid ))
2346+ return new_fids
2347+
2348+ def get_base_fid (self , obj_fid : Fid ) -> Fid :
2349+ objt : ObjT = FidTypeToObjT [obj_fid .container ]
2350+ fid_mask : Fid = ObjTMaskMap [objt ]
2351+ base_fid = Fid (obj_fid .container ,
2352+ (obj_fid .key & fid_mask .key ))
23402353 return base_fid
23412354
23422355 @repeat_if_fails ()
0 commit comments