Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.

Commit 70cc038

Browse files
committed
pyrefcnt: reimplement QueuedObjects with static table
This is a simpler version of the "QueuedObjects" list for biased reference counting with a statically allocated hashtable and a per-bucket lock. The design is similar to parking_lot.c. This also addresses the bug where an object could be deallocated while still in the queue. Fixes #66
1 parent 7609c9d commit 70cc038

File tree

7 files changed

+131
-160
lines changed

7 files changed

+131
-160
lines changed

Include/internal/pycore_pystate.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ extern "C" {
99
#endif
1010

1111
#include "pycore_gil.h" /* struct _gil_runtime_state */
12+
#include "pycore_llist.h"
1213
#include "pycore_pymem.h" /* struct _gc_runtime_state */
1314
#include "pycore_warnings.h" /* struct _warnings_runtime_state */
1415
#include "lock.h"
@@ -127,9 +128,6 @@ struct _is {
127128

128129
PyObject *dict; /* Stores per-interpreter state */
129130

130-
struct _Py_hashtable_t *object_queues;
131-
pthread_rwlock_t object_queues_lk;
132-
133131
PyObject *builtins_copy;
134132
PyObject *import_func;
135133
/* Initialized to PyEval_EvalFrameDefault(). */
@@ -364,6 +362,8 @@ struct PyThreadStateWaiter {
364362
int64_t time_to_be_fair;
365363
};
366364

365+
struct brc_queued_object;
366+
367367
struct PyThreadStateOS {
368368
struct PyThreadStateWaiter waiter;
369369

@@ -373,6 +373,12 @@ struct PyThreadStateOS {
373373
PyCOND_T waiter_cond;
374374
int waiter_counter;
375375

376+
struct _PyBrcState {
377+
struct llist_node node;
378+
uintptr_t thread_id;
379+
struct brc_queued_object *queue;
380+
} brc;
381+
376382
/* DEBUG info */
377383
PyThreadState *last_notifier;
378384
const char *last_notifier_msg;

Include/internal/pycore_refcnt.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ void _Py_queue_object(PyObject *ob);
1515
void _Py_queue_process(PyThreadState *tstate);
1616
void _Py_queue_create(PyThreadState *tstate);
1717
void _Py_queue_destroy(PyThreadState *tstate);
18+
void _Py_queue_after_fork(PyThreadState *tstate);
1819

1920
#ifdef __cplusplus
2021
}

Include/object.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ PyAPI_FUNC(void) _Py_NO_INLINE _Py_IncRefShared(PyObject *);
418418
PyAPI_FUNC(void) _Py_NO_INLINE _Py_DecRefShared(PyObject *);
419419
PyAPI_FUNC(int) _Py_TryIncRefShared(PyObject *);
420420
PyAPI_FUNC(void) _Py_MergeZeroRefcount(PyObject *);
421-
void _Py_ExplicitMergeRefcount(PyObject *);
421+
Py_ssize_t _Py_ExplicitMergeRefcount(PyObject *);
422422

423423
static inline uintptr_t
424424
_Py_ThreadId(void)

Objects/object.c

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,27 +2216,29 @@ _Py_Dealloc(PyObject *op)
22162216
void
22172217
_Py_MergeZeroRefcount(PyObject *op)
22182218
{
2219-
Py_ssize_t refcount;
2220-
int queued, merged;
2221-
int ok;
2219+
assert(_Py_atomic_load_uint32_relaxed(&op->ob_ref_local) == 0);
22222220

2223-
assert(!_Py_REF_IS_IMMORTAL(op->ob_ref_local));
2221+
_Py_atomic_store_uintptr_relaxed(&op->ob_tid, 0);
22242222

2225-
do {
2226-
uint32_t shared;
2227-
shared = _Py_atomic_load_uint32_relaxed(&op->ob_ref_shared);
2223+
Py_ssize_t refcount;
2224+
for (;;) {
2225+
uint32_t shared = _Py_atomic_load_uint32_relaxed(&op->ob_ref_shared);
2226+
2227+
int queued, merged;
22282228
_PyRef_UnpackShared(shared, &refcount, &queued, &merged);
2229+
2230+
// The object can't have negative shared reference count
2231+
// at this point because that would imply a negative total reference
2232+
// count (since the local refcount is zero).
2233+
assert(refcount >= 0);
22292234
assert(!merged);
22302235

22312236
if (queued) {
2232-
/* Note that the object can't have negative shared reference count
2233-
* at this point because that would imply a negative total reference
2234-
* count (since the local refcount is zero). However, it may still
2235-
* be queued, if the shared reference count was temporarily negative
2236-
* and hasn't been proceessed yet. We need to fix this case, because
2237-
* currently the queue points to garbage objects. FIXME(sgross).
2238-
*/
2239-
Py_FatalError("UH OH OBJECT IS QUEUED");
2237+
// Note that the object may still be queued, if the shared reference
2238+
// count was temporarily negative and hasn't been proceessed yet.
2239+
// We don't want to merge it yet because that might result in the
2240+
// object being freed while it's still in the queue.
2241+
break;
22402242
}
22412243

22422244
if (refcount == 0 &&
@@ -2253,21 +2255,22 @@ _Py_MergeZeroRefcount(PyObject *op)
22532255
return;
22542256
}
22552257

2256-
ok = _Py_atomic_compare_exchange_uint32(
2258+
int ok = _Py_atomic_compare_exchange_uint32(
22572259
&op->ob_ref_shared,
22582260
shared,
22592261
shared | _Py_REF_MERGED_MASK);
2260-
} while (!ok);
22612262

2262-
_Py_atomic_store_uintptr_relaxed(&op->ob_tid, 0);
2263-
_Py_atomic_store_uint32_relaxed(&op->ob_ref_local, 0);
2263+
if (ok) {
2264+
break;
2265+
}
2266+
}
22642267

22652268
if (refcount == 0) {
22662269
_Py_Dealloc(op);
22672270
}
22682271
}
22692272

2270-
void
2273+
Py_ssize_t
22712274
_Py_ExplicitMergeRefcount(PyObject *op)
22722275
{
22732276
uint32_t old_shared;
@@ -2283,7 +2286,8 @@ _Py_ExplicitMergeRefcount(PyObject *op)
22832286
_PyRef_UnpackShared(old_shared, &refcount, &queued, &merged);
22842287

22852288
if (merged) {
2286-
return;
2289+
assert(refcount > 0);
2290+
return refcount;
22872291
}
22882292

22892293
// TODO(sgross): implementation defined behavior!
@@ -2304,10 +2308,7 @@ _Py_ExplicitMergeRefcount(PyObject *op)
23042308

23052309
_Py_atomic_store_uint32_relaxed(&op->ob_ref_local, 0);
23062310
_Py_atomic_store_uintptr_relaxed(&op->ob_tid, 0);
2307-
2308-
if (refcount == 0) {
2309-
_Py_Dealloc(op);
2310-
}
2311+
return refcount;
23112312
}
23122313

23132314
void
@@ -2375,7 +2376,10 @@ _Py_DecRefShared(PyObject *op)
23752376
if (_Py_REF_IS_QUEUED(new_shared) != _Py_REF_IS_QUEUED(old_shared)) {
23762377
PyThreadState *tstate = _PyThreadState_GET();
23772378
if (tstate->interp->gc.collecting) {
2378-
_Py_ExplicitMergeRefcount(op);
2379+
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(op);
2380+
if (refcount == 0) {
2381+
_Py_Dealloc(op);
2382+
}
23792383
}
23802384
else {
23812385
_Py_queue_object(op);

Python/ceval.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,6 @@ _PyEval_ReInitThreads(_PyRuntimeState *runtime)
312312
Py_FatalError("Can't initialize threads for pending calls");
313313
}
314314

315-
PyInterpreterState *interp = current_tstate->interp;
316-
if (pthread_rwlock_init(&interp->object_queues_lk, NULL) != 0) {
317-
Py_FatalError("Can't initialize object queues mutex");
318-
}
319-
320315
/* Destroy all threads except the current one */
321316
_PyThreadState_DeleteExcept(runtime, current_tstate);
322317
}

0 commit comments

Comments
 (0)