1
1
"""
2
- [Internal use only ] Encapsulates update state management.
2
+ [API ] Encapsulates update state management.
3
3
"""
4
4
import asyncio
5
5
import datetime
6
6
import logging
7
+ from abc import ABCMeta , abstractmethod
7
8
from asyncio import Future
8
9
from typing import Dict , Awaitable , Union
9
10
10
11
from memoize .entry import CacheKey , CacheEntry
11
12
12
13
13
- class UpdateStatuses :
14
+ class UpdateStatuses (metaclass = ABCMeta ):
15
+ @abstractmethod
16
+ def is_being_updated (self , key : CacheKey ) -> bool :
17
+ """Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
18
+ raise NotImplementedError ()
19
+
20
+ @abstractmethod
21
+ def mark_being_updated (self , key : CacheKey ) -> None :
22
+ """Informs that update has been started.
23
+ Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
24
+ Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
25
+ raise NotImplementedError ()
26
+
27
+ def mark_updated (self , key : CacheKey , entry : CacheEntry ) -> None :
28
+ """Informs that update has been finished.
29
+ Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
30
+ raise NotImplementedError ()
31
+
32
+ @abstractmethod
33
+ def mark_update_aborted (self , key : CacheKey , exception : Exception ) -> None :
34
+ """Informs that update failed to complete.
35
+ Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
36
+ Accepts exception to propagate it across all clients awaiting an update."""
37
+ raise NotImplementedError ()
38
+
39
+ @abstractmethod
40
+ def await_updated (self , key : CacheKey ) -> Awaitable [Union [CacheEntry , Exception ]]:
41
+ """Waits (asynchronously) until update in progress has benn finished.
42
+ Returns awaitable with the updated entry
43
+ (or awaitable with an exception if update failed/timed-out).
44
+ Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
45
+ raise NotImplementedError ()
46
+
47
+
48
+ class InMemoryLocks (UpdateStatuses ):
49
+ """Manages in-memory locks (for each updated key) to prevent dog-piling. """
14
50
def __init__ (self , update_lock_timeout : datetime .timedelta = datetime .timedelta (minutes = 5 )) -> None :
15
51
self .logger = logging .getLogger (__name__ )
16
52
self ._update_lock_timeout = update_lock_timeout
17
53
self ._updates_in_progress : Dict [CacheKey , Future ] = {}
18
54
19
55
def is_being_updated (self , key : CacheKey ) -> bool :
20
- """Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
21
56
return key in self ._updates_in_progress
22
57
23
58
def mark_being_updated (self , key : CacheKey ) -> None :
24
- """Informs that update has been started.
25
- Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
26
- Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
27
59
if key in self ._updates_in_progress :
28
60
raise ValueError ('Key {} is already being updated' .format (key ))
29
61
@@ -42,27 +74,18 @@ def complete_on_timeout_passed():
42
74
callback = complete_on_timeout_passed )
43
75
44
76
def mark_updated (self , key : CacheKey , entry : CacheEntry ) -> None :
45
- """Informs that update has been finished.
46
- Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
47
77
if key not in self ._updates_in_progress :
48
78
raise ValueError ('Key {} is not being updated' .format (key ))
49
79
update = self ._updates_in_progress .pop (key )
50
80
update .set_result (entry )
51
81
52
82
def mark_update_aborted (self , key : CacheKey , exception : Exception ) -> None :
53
- """Informs that update failed to complete.
54
- Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
55
- Accepts exception to propagate it across all clients awaiting an update."""
56
83
if key not in self ._updates_in_progress :
57
84
raise ValueError ('Key {} is not being updated' .format (key ))
58
85
update = self ._updates_in_progress .pop (key )
59
86
update .set_result (exception )
60
87
61
88
def await_updated (self , key : CacheKey ) -> Awaitable [Union [CacheEntry , Exception ]]:
62
- """Waits (asynchronously) until update in progress has benn finished.
63
- Returns awaitable with the updated entry
64
- (or awaitable with an exception if update failed/timed-out).
65
- Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
66
89
if not self .is_being_updated (key ):
67
90
raise ValueError ('Key {} is not being updated' .format (key ))
68
91
return self ._updates_in_progress [key ]
0 commit comments