Skip to content

Commit e3b5eb9

Browse files
committed
sometimes, priority_sem is overkill
1 parent df08edc commit e3b5eb9

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

src/async_utils/_simple_lock.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2020-present Michael Hall
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import concurrent.futures as cf
17+
import threading
18+
from collections import deque
19+
20+
from . import _typings as t
21+
22+
# TODO: pick what public namespace to re-export this from.
23+
24+
25+
class AsyncLock:
26+
"""An async lock that doesn't bind to an event loop."""
27+
28+
def __init_subclass__(cls) -> t.Never:
29+
msg = "Don't subclass this"
30+
raise RuntimeError(msg)
31+
32+
__final__ = True
33+
34+
def __init__(self) -> None:
35+
self._waiters: deque[cf.Future[None]] = deque()
36+
self._internal_lock: threading.RLock = threading.RLock()
37+
self._locked: bool = False
38+
39+
async def __aenter__(self, /) -> None:
40+
with self._internal_lock:
41+
if not self._locked and (all(w.cancelled() for w in self._waiters)):
42+
self._locked = True
43+
return
44+
45+
fut: cf.Future[None] = cf.Future()
46+
47+
with self._internal_lock:
48+
self._waiters.append(fut)
49+
50+
try:
51+
await asyncio.wrap_future(fut)
52+
except (asyncio.CancelledError, cf.CancelledError):
53+
with self._internal_lock:
54+
if self._locked:
55+
self._maybe_wake()
56+
finally:
57+
self._waiters.remove(fut)
58+
59+
async def __aexit__(self, *_dont_care: object) -> t.Literal[False]:
60+
with self._internal_lock:
61+
if self._locked:
62+
self._locked = False
63+
self._maybe_wake()
64+
65+
return False
66+
67+
def _maybe_wake(self) -> None:
68+
with self._internal_lock:
69+
if self._waiters:
70+
try:
71+
fut = next(iter(self._waiters))
72+
except StopIteration:
73+
return
74+
if not fut.done:
75+
fut.set_result(None)

0 commit comments

Comments
 (0)