diff --git a/Doc/library/interpreters.rst b/Doc/library/interpreters.rst new file mode 100644 index 00000000000000..4c12f343033a4a --- /dev/null +++ b/Doc/library/interpreters.rst @@ -0,0 +1,188 @@ +:mod:`interpreters` --- High-level Subinterpreters Module +========================================================== + +.. module:: interpreters + :synopsis: High-level SubInterpreters Module. + +**Source code:** :source:`Lib/interpreters.py` + +-------------- + +This module provides high-level tools for working with sub-interpreters, +such as creating them, running code in them, or sending data between them. +It is a wrapper around the low-level `_interpreters` module. + +.. versionchanged:: added in 3.9 + +Interpreter Objects +------------------- + +The Interpreter object represents a single interpreter. +.. class:: Interpreter(id) + + The class implementing a subinterpreter object. + + .. method:: is_running() + + Return whether or not the identified interpreter is running. + It returns `True` and `False` otherwise. + + .. method:: destroy() + + Destroy the interpreter. Attempting to destroy the current + interpreter results in a `RuntimeError`. + + .. method:: run(self, src_str, /, *, channels=None): + + Run the given source code in the interpreter. This blocks + the current thread until done. `channels` should be in + the form : `(RecvChannel, SendChannel)`. + +RecvChannel Objects +------------------- + +The RecvChannel object represents a recieving channel. + +.. class:: RecvChannel(id) + + This class represents the receiving end of a channel. + + .. method:: recv() + + Get the next object from the channel, and wait if + none have been sent. Associate the interpreter + with the channel. + + .. method:: recv_nowait(default=None) + + Like ``recv()``, but return the default result + instead of waiting. + + .. method:: release() + + Release the channel for the current interpreter. + By default both ends are released. Releasing an already + released end results in a ``ChannelReleasedError`` exception. + + .. method:: close(force=False) + + Close the channel in all interpreters. By default + both ends are closed. closing an already closed end + results in a ``ChannelClosedError`` exception. Without + seeting ``force`` to ``True`` a ``ChannelNotEmptyError`` + will be returned when a channel with data is closed. + + +SendChannel Objects +-------------------- + +The ``SendChannel`` object represents a sending channel. + +.. class:: SendChannel(id) + + This class represents the sending end of a channel. + + .. method:: send(obj) + + Send the object ``obj`` to the receiving end of the channel + and wait. Associate the interpreter with the channel. + + .. method:: send_nowait(obj) + + Like ``send()`` but return ``False`` if not received. + + .. method:: send_buffer(obj) + + Send the object's buffer to the receiving end of the + channel and wait. Associate the interpreter with the + channel. + + .. method:: send_buffer_nowait(obj) + + Like ``send_buffer()`` but return ``False`` if not received. + + .. method:: release() + + Release the channel for the current interpreter. + By default both ends are released. Releasing an already + released end results in a ``ChannelReleasedError`` exception. + + .. method:: close(force=False) + + Close the channel in all interpreters. By default + both ends are closed. closing an already closed end + results in a ``ChannelClosedError`` exception. + + +This module defines the following global functions: + + +.. function:: is_shareable(obj) + + Return ``True`` if the object's data can be shared between + interpreters. + +.. function:: create_channel() + + Create a new channel for passing data between interpreters. + +.. function:: list_all_channels() + + Return all open channels. + +.. function:: create() + + Initialize a new (idle) Python interpreter. Get the currently + running interpreter. This method returns an ``Interpreter`` object. + +.. function:: get_current() + + Get the currently running interpreter. This method returns + an ``Interpreter`` object. + +.. function:: list_all() + + Get all existing interpreters. Returns a list + of ``Interpreter`` objects. + +This module also defines the following exceptions. + +.. exception:: RunFailedError + + This exception, a subclass of :exc:`RuntimeError`, is raised when the + ``Interpreter.run()`` results in an uncaught exception. + +.. exception:: ChannelError + + This exception is a subclass of :exc:`Exception`, and is the base + class for all channel-related exceptions. + +.. exception:: ChannelNotFoundError + + This exception is a subclass of :exc:`ChannelError`, and is raised + when the the identified channel is not found. + +.. exception:: ChannelEmptyError + + This exception is a subclass of :exc:`ChannelError`, and is raised when + the channel is unexpectedly empty. + +.. exception:: ChannelNotEmptyError + + This exception is a subclass of :exc:`ChannelError`, and is raised when + the channel is unexpectedly not empty. + +.. exception:: NotReceivedError + + This exception is a subclass of :exc:`ChannelError`, and is raised when + nothing was waiting to receive a sent object. + +.. exception:: ChannelClosedError + + This exception is a subclass of :exc:`ChannelError`, and is raised when + the channel is closed. + +.. exception:: ChannelReleasedError + + This exception is a subclass of :exc:`ChannelClosedError`, and is raised + when the channel is released (but not yet closed). diff --git a/Lib/interpreters.py b/Lib/interpreters.py new file mode 100644 index 00000000000000..04e9d551a547dc --- /dev/null +++ b/Lib/interpreters.py @@ -0,0 +1,338 @@ +"""Subinterpreters High Level Module.""" + +import _interpreters +import logging + +__all__ = ['Interpreter', 'SendChannel', 'RecvChannel', + 'is_shareable', 'create_channel', + 'list_all_channels', 'get_current', + 'create'] + + +def create(): + """ create() -> Interpreter + + Initialize a new (idle) Python interpreter. + """ + id = _interpreters.create() + return Interpreter(id) + +def list_all(): + """ list_all() -> [Interpreter] + + Get all existing interpreters. + """ + return [Interpreter(id) for id in + _interpreters.list_all()] + +def get_current(): + """ get_current() -> Interpreter + + Get the currently running interpreter. + """ + id = _interpreters.get_current() + return Interpreter(id) + + +class Interpreter: + + def __init__(self, id): + self._id = id + + @property + def id(self): + return self._id + + def is_running(self): + """is_running() -> bool + + Return whether or not the identified + interpreter is running. + """ + return _interpreters.is_running(self._id) + + def destroy(self): + """destroy() + + Destroy the interpreter. + + Attempting to destroy the current + interpreter results in a RuntimeError. + """ + return _interpreters.destroy(self._id) + + def _handle_channels(self, channels): + # Looks like the only way for an interpreter to be associated + # to a channel is through sending or recving data. + if channels: + if channels[0] and channels[1] != None: + _interpreters.channel_recv(channels[0].id) + _interpreters.channel_send(channels[1].id, src_str) + elif channels[0] != None and channels[1] == None: + _interpreters.channel_recv(channels[0].id) + elif channels[0] == None and channels[1] != None: + _interpreters.channel_send(channels[1].id, src_str) + else: + pass + + def run(self, src_str, /, *, channels=None): + """run(src_str, /, *, channels=None) + + Run the given source code in the interpreter. + This blocks the current thread until done. + """ + self._handle_channels(channels) + try: + _interpreters.run_string(self._id, src_str) + except RunFailedError as err: + logger.error(err) + raise + + +def is_shareable(obj): + """ is_shareable(obj) -> Bool + + Return `True` if the object's data can be + shared between interpreters and `False` otherwise. + """ + return _interpreters.is_shareable(obj) + +def create_channel(): + """ create_channel() -> (RecvChannel, SendChannel) + + Create a new channel for passing data between + interpreters. + """ + + cid = _interpreters.channel_create() + return (RecvChannel(cid), SendChannel(cid)) + +def list_all_channels(): + """ list_all_channels() -> [(RecvChannel, SendChannel)] + + Return all open channels. + """ + return [(RecvChannel(cid), SendChannel(cid)) + for cid in _interpreters.channel_list_all()] + +def wait(timeout): + #The implementation for wait + # will be non trivial to be useful + import time + time.sleep(timeout) + +class RecvChannel: + + def __init__(self, id): + self.id = id + self.interpreters = _interpreters.\ + channel_list_interpreters(self.id,\ + send=False) + + def recv(self): + """ channel_recv() -> obj + + Get the next object from the channel, + and wait if none have been sent. + Associate the interpreter with the channel. + """ + try: + obj = _interpreters.channel_recv(self.id) + if obj == None: + wait(2) + obj = _interpreters.channel_recv(self.id) + except _interpreters.ChannelEmptyError: + raise ChannelEmptyError + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + return obj + + def recv_nowait(self, default=None): + """recv_nowait(default=None) -> object + + Like recv(), but return the default + instead of waiting. + """ + try: + obj = _interpreters.channel_recv(self.id) + if obj == None: + obj = default + except _interpreters.ChannelEmptyError: + raise ChannelEmptyError + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + return obj + + def release(self): + """ release() + + No longer associate the current interpreter + with the channel (on the receiving end). + """ + try: + _interpreters.channel_release(self.id, recv=True) + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.ChannelNotEmptyError: + raise ChannelNotEmptyError + + def close(self, force=False): + """close(force=False) + + Close the channel in all interpreters. + """ + try: + _interpreters.channel_close(self.id, + force=force, recv=True) + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.ChannelNotEmptyError: + raise ChannelNotEmptyError + + +class SendChannel: + + def __init__(self, id): + self.id = id + self.interpreters = _interpreters.\ + channel_list_interpreters(self.id,\ + send=True) + + def send(self, obj): + """ send(obj) + + Send the object (i.e. its data) to the receiving + end of the channel and wait. Associate the interpreter + with the channel. + """ + try: + _interpreters.channel_send(self.id, obj) + wait(2) + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + + def send_nowait(self, obj): + """ send_nowait(obj) + + Like send(), but return False if not received. + """ + try: + _interpreters.channel_send(self.id, obj) + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + + recv_obj = _interpreters.channel_recv(self.id) + if recv_obj: + return obj + else: + return False + + def send_buffer(self, obj): + """ send_buffer(obj) + + Send the object's buffer to the receiving + end of the channel and wait. Associate the interpreter + with the channel. + """ + try: + _interpreters.channel_send_buffer(self.id, obj) + wait(2) + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + + def send_buffer_nowait(self, obj): + """ send_buffer_nowait(obj) + + Like send(), but return False if not received. + """ + try: + _interpreters.channel_send_buffer(self.id, obj) + except _interpreters.ChannelNotFoundError: + raise ChannelNotFoundError + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.RunFailedError: + raise RunFailedError + recv_obj = _interpreters.channel_recv(self.id) + if recv_obj: + return obj + else: + return False + + def release(self): + """ release() + + No longer associate the current interpreter + with the channel (on the sending end). + """ + try: + _interpreters.channel_release(self.id, send=True) + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.ChannelNotEmptyError: + raise ChannelNotEmptyError + + def close(self, force=False): + """close(force=False) + + Close the channel in all interpreters.. + """ + try: + _interpreters.channel_close(self.id, + force=force, send=False) + except _interpreters.ChannelClosedError: + raise ChannelClosedError + except _interpreters.ChannelNotEmptyError: + raise ChannelNotEmptyError + + +class ChannelError(Exception): + pass + + +class ChannelNotFoundError(ChannelError): + pass + + +class ChannelEmptyError(ChannelError): + pass + + +class ChannelNotEmptyError(ChannelError): + pass + + +class NotReceivedError(ChannelError): + pass + + +class ChannelClosedError(ChannelError): + pass + + +class ChannelReleasedError(ChannelClosedError): + pass + + +class RunFailedError(RuntimeError): + pass diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 30f8f98acc9dd3..1422427217e1f3 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -13,7 +13,7 @@ from test.support import script_helper -interpreters = support.import_module('_xxsubinterpreters') +interpreters = support.import_module('_interpreters') ################################## @@ -446,7 +446,7 @@ def test_subinterpreter(self): main = interpreters.get_main() interp = interpreters.create() out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters cur = _interpreters.get_current() print(cur) assert isinstance(cur, _interpreters.InterpreterID) @@ -469,7 +469,7 @@ def test_from_subinterpreter(self): [expected] = interpreters.list_all() interp = interpreters.create() out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters main = _interpreters.get_main() print(main) assert isinstance(main, _interpreters.InterpreterID) @@ -495,7 +495,7 @@ def test_subinterpreter(self): def test_from_subinterpreter(self): interp = interpreters.create() out = _run_output(interp, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters if _interpreters.is_running({interp}): print(True) else: @@ -614,7 +614,7 @@ def test_in_subinterpreter(self): main, = interpreters.list_all() id1 = interpreters.create() out = _run_output(id1, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters id = _interpreters.create() print(id) assert isinstance(id, _interpreters.InterpreterID) @@ -630,7 +630,7 @@ def test_in_threaded_subinterpreter(self): def f(): nonlocal id2 out = _run_output(id1, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters id = _interpreters.create() print(id) """)) @@ -724,7 +724,7 @@ def test_from_current(self): main, = interpreters.list_all() id = interpreters.create() script = dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters try: _interpreters.destroy({id}) except RuntimeError: @@ -739,7 +739,7 @@ def test_from_sibling(self): id1 = interpreters.create() id2 = interpreters.create() script = dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.destroy({id2}) """) interpreters.run_string(id1, script) @@ -1059,7 +1059,7 @@ def test_still_running_at_exit(self): script = dedent(f""" from textwrap import dedent import threading - import _xxsubinterpreters as _interpreters + import _interpreters id = _interpreters.create() def f(): _interpreters.run_string(id, dedent(''' @@ -1191,7 +1191,7 @@ def test_sequential_ids(self): def test_ids_global(self): id1 = interpreters.create() out = _run_output(id1, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters cid = _interpreters.channel_create() print(cid) """)) @@ -1199,7 +1199,7 @@ def test_ids_global(self): id2 = interpreters.create() out = _run_output(id2, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters cid = _interpreters.channel_create() print(cid) """)) @@ -1207,6 +1207,87 @@ def test_ids_global(self): self.assertEqual(cid2, int(cid1) + 1) + def test_channel_list_interpreters_none(self): + """Test listing interpreters for a channel with no associations.""" + # Test for channel with no associated interpreters. + cid = interpreters.channel_create() + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(send_interps, []) + self.assertEqual(recv_interps, []) + + def test_channel_list_interpreters_basic(self): + """Test basic listing channel interpreters.""" + interp0 = interpreters.get_main() + cid = interpreters.channel_create() + interpreters.channel_send(cid, "send") + # Test for a channel that has one end associated to an interpreter. + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(send_interps, [interp0]) + self.assertEqual(recv_interps, []) + + interp1 = interpreters.create() + _run_output(interp1, dedent(f""" + import _interpreters + obj = _interpreters.channel_recv({cid}) + """)) + # Test for channel that has boths ends associated to an interpreter. + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(send_interps, [interp0]) + self.assertEqual(recv_interps, [interp1]) + + def test_channel_list_interpreters_multiple(self): + """Test listing interpreters for a channel with many associations.""" + interp0 = interpreters.get_main() + interp1 = interpreters.create() + interp2 = interpreters.create() + interp3 = interpreters.create() + cid = interpreters.channel_create() + + interpreters.channel_send(cid, "send") + _run_output(interp1, dedent(f""" + import _interpreters + obj = _interpreters.channel_send({cid}, "send") + """)) + _run_output(interp2, dedent(f""" + import _interpreters + obj = _interpreters.channel_recv({cid}) + """)) + _run_output(interp3, dedent(f""" + import _interpreters + obj = _interpreters.channel_recv({cid}) + """)) + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(set(send_interps), {interp0, interp1}) + self.assertEqual(set(recv_interps), {interp2, interp3}) + + @unittest.skip("Failing due to handling of destroyed interpreters") + def test_channel_list_interpreters_destroyed(self): + """Test listing channel interpreters with a destroyed interpreter.""" + interp0 = interpreters.get_main() + interp1 = interpreters.create() + cid = interpreters.channel_create() + interpreters.channel_send(cid, "send") + _run_output(interp1, dedent(f""" + import _interpreters + obj = _interpreters.channel_recv({cid}) + """)) + # Should be one interpreter associated with each end. + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(send_interps, [interp0]) + self.assertEqual(recv_interps, [interp1]) + + interpreters.destroy(interp1) + # Destroyed interpreter should not be listed. + send_interps = interpreters.channel_list_interpreters(cid, send=True) + recv_interps = interpreters.channel_list_interpreters(cid, send=False) + self.assertEqual(send_interps, [interp0]) + self.assertEqual(recv_interps, []) + #################### def test_send_recv_main(self): @@ -1221,7 +1302,7 @@ def test_send_recv_main(self): def test_send_recv_same_interpreter(self): id1 = interpreters.create() out = _run_output(id1, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters cid = _interpreters.channel_create() orig = b'spam' _interpreters.channel_send(cid, orig) @@ -1234,7 +1315,7 @@ def test_send_recv_different_interpreters(self): cid = interpreters.channel_create() id1 = interpreters.create() out = _run_output(id1, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_send({cid}, b'spam') """)) obj = interpreters.channel_recv(cid) @@ -1270,7 +1351,7 @@ def f(): nonlocal out out = _run_output(id1, dedent(f""" import time - import _xxsubinterpreters as _interpreters + import _interpreters while True: try: obj = _interpreters.channel_recv({cid}) @@ -1307,7 +1388,7 @@ def test_run_string_arg_unresolved(self): interp = interpreters.create() out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters print(cid.end) _interpreters.channel_send(cid, b'spam') """), @@ -1327,7 +1408,7 @@ def test_run_string_arg_resolved(self): interp = interpreters.create() out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters + import _interpreters print(chan.id.end) _interpreters.channel_send(chan.id, b'spam') """), @@ -1355,11 +1436,11 @@ def test_close_multiple_users(self): id1 = interpreters.create() id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_send({cid}, b'spam') """)) interpreters.run_string(id2, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_recv({cid}) """)) interpreters.channel_close(cid) @@ -1498,7 +1579,7 @@ def test_close_by_unassociated_interp(self): interpreters.channel_send(cid, b'spam') interp = interpreters.create() interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_close({cid}, force=True) """)) with self.assertRaises(interpreters.ChannelClosedError): @@ -1519,6 +1600,23 @@ def test_close_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) + def test_channel_list_interpreters_invalid_channel(self): + cid = interpreters.channel_create() + # Test for invalid channel ID. + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_list_interpreters(1000, send=True) + + interpreters.channel_close(cid) + # Test for a channel that has been closed. + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_list_interpreters(cid, send=True) + + def test_channel_list_interpreters_invalid_args(self): + # Tests for invalid arguments passed to the API. + cid = interpreters.channel_create() + with self.assertRaises(TypeError): + interpreters.channel_list_interpreters(cid) + class ChannelReleaseTests(TestBase): @@ -1579,11 +1677,11 @@ def test_multiple_users(self): id1 = interpreters.create() id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_send({cid}, b'spam') """)) out = _run_output(id2, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters obj = _interpreters.channel_recv({cid}) _interpreters.channel_release({cid}) print(repr(obj)) @@ -1637,7 +1735,7 @@ def test_by_unassociated_interp(self): interpreters.channel_send(cid, b'spam') interp = interpreters.create() interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters _interpreters.channel_release({cid}) """)) obj = interpreters.channel_recv(cid) @@ -1652,7 +1750,7 @@ def test_close_if_unassociated(self): cid = interpreters.channel_create() interp = interpreters.create() interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters + import _interpreters obj = _interpreters.channel_send({cid}, b'spam') _interpreters.channel_release({cid}) """)) @@ -1756,12 +1854,12 @@ def _new_channel(self, creator): else: ch = interpreters.channel_create() run_interp(creator.id, f""" - import _xxsubinterpreters - cid = _xxsubinterpreters.channel_create() + import _interpreters + cid = _interpreters.channel_create() # We purposefully send back an int to avoid tying the # channel to the other interpreter. - _xxsubinterpreters.channel_send({ch}, int(cid)) - del _xxsubinterpreters + _interpreters.channel_send({ch}, int(cid)) + del _interpreters """) self._cid = interpreters.channel_recv(ch) return self._cid @@ -1788,7 +1886,7 @@ def _prep_interpreter(self, interp): if interp.name == 'main': return run_interp(interp.id, f""" - import _xxsubinterpreters as interpreters + import _interpreters as interpreters import test.test__xxsubinterpreters as helpers ChannelState = helpers.ChannelState try: diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py new file mode 100644 index 00000000000000..a954ff9bd370c3 --- /dev/null +++ b/Lib/test/test_interpreters.py @@ -0,0 +1,727 @@ +import contextlib +import os +import threading +from textwrap import dedent +import unittest +import time +import interpreters +import _interpreters + +def _captured_script(script): + r, w = os.pipe() + indented = script.replace('\n', '\n ') + wrapped = dedent(f""" + import contextlib + with open({w}, 'w') as spipe: + with contextlib.redirect_stdout(spipe): + {indented} + """) + return wrapped, open(r) + +def clean_up_interpreters(): + for interp in interpreters.list_all(): + if interp.id == 0: # main + continue + try: + interp.destroy() + except RuntimeError: + pass # already destroyed + +def _run_output(interp, request, shared=None): + script, rpipe = _captured_script(request) + with rpipe: + interp.run(script) + return rpipe.read() + +@contextlib.contextmanager +def _running(interp): + r, w = os.pipe() + def run(): + interp.run(dedent(f""" + # wait for "signal" + with open({r}) as rpipe: + rpipe.read() + """)) + + t = threading.Thread(target=run) + t.start() + + yield + + with open(w, 'w') as spipe: + spipe.write('done') + t.join() + + +class TestBase(unittest.TestCase): + + def tearDown(self): + clean_up_interpreters() + + +class CreateTests(TestBase): + + def test_in_main(self): + interp = interpreters.create() + lst = interpreters.list_all() + self.assertEqual(interp.id, lst[1].id) + + def test_in_thread(self): + lock = threading.Lock() + id = None + interp = interpreters.create() + lst = interpreters.list_all() + def f(): + nonlocal id + id = interp.id + lock.acquire() + lock.release() + + t = threading.Thread(target=f) + with lock: + t.start() + t.join() + self.assertEqual(interp.id, lst[1].id) + + def test_in_subinterpreter(self): + main, = interpreters.list_all() + interp = interpreters.create() + out = _run_output(interp, dedent(""" + import interpreters + interp = interpreters.create() + print(interp) + """)) + interp2 = out.strip() + + self.assertEqual(len(set(interpreters.list_all())), len({main, interp, interp2})) + + def test_in_threaded_subinterpreter(self): + main, = interpreters.list_all() + interp = interpreters.create() + interp2 = None + def f(): + nonlocal interp2 + out = _run_output(interp, dedent(""" + import interpreters + interp = interpreters.create() + print(interp) + """)) + interp2 = int(out.strip()) + + t = threading.Thread(target=f) + t.start() + t.join() + + self.assertEqual(len(set(interpreters.list_all())), len({main, interp, interp2})) + + def test_after_destroy_all(self): + before = set(interpreters.list_all()) + # Create 3 subinterpreters. + interp_lst = [] + for _ in range(3): + interps = interpreters.create() + interp_lst.append(interps) + # Now destroy them. + for interp in interp_lst: + interp.destroy() + # Finally, create another. + interp = interpreters.create() + self.assertEqual(len(set(interpreters.list_all())), len(before | {interp})) + + def test_after_destroy_some(self): + before = set(interpreters.list_all()) + # Create 3 subinterpreters. + interp1 = interpreters.create() + interp2 = interpreters.create() + interp3 = interpreters.create() + # Now destroy 2 of them. + interp1.destroy() + interp2.destroy() + # Finally, create another. + interp = interpreters.create() + self.assertEqual(len(set(interpreters.list_all())), len(before | {interp3, interp})) + + +class GetCurrentTests(TestBase): + + def test_main(self): + main_interp_id = _interpreters.get_main() + cur_interp_id = interpreters.get_current().id + self.assertEqual(cur_interp_id, main_interp_id) + + def test_subinterpreter(self): + main = _interpreters.get_main() + interp = interpreters.create() + out = _run_output(interp, dedent(""" + import interpreters + cur = interpreters.get_current() + print(cur) + """)) + cur = out.strip() + self.assertNotEqual(cur, main) + + +class ListAllTests(TestBase): + + def test_initial(self): + interps = interpreters.list_all() + self.assertEqual(1, len(interps)) + + def test_after_creating(self): + main = interpreters.get_current() + first = interpreters.create() + second = interpreters.create() + + ids = [] + for interp in interpreters.list_all(): + ids.append(interp.id) + + self.assertEqual(ids, [main.id, first.id, second.id]) + + def test_after_destroying(self): + main = interpreters.get_current() + first = interpreters.create() + second = interpreters.create() + first.destroy() + + ids = [] + for interp in interpreters.list_all(): + ids.append(interp.id) + + self.assertEqual(ids, [main.id, second.id]) + + +class TestInterpreterId(TestBase): + + def test_in_main(self): + main = interpreters.get_current() + self.assertEqual(0, main.id) + + def test_with_custom_num(self): + interp = interpreters.Interpreter(1) + self.assertEqual(1, interp.id) + + def test_for_readonly_property(self): + interp = interpreters.Interpreter(1) + with self.assertRaises(AttributeError): + interp.id = 2 + + +class TestInterpreterIsRunning(TestBase): + + def test_main(self): + main = interpreters.get_current() + self.assertTrue(main.is_running()) + + def test_subinterpreter(self): + interp = interpreters.create() + self.assertFalse(interp.is_running()) + + with _running(interp): + self.assertTrue(interp.is_running()) + self.assertFalse(interp.is_running()) + + def test_from_subinterpreter(self): + interp = interpreters.create() + out = _run_output(interp, dedent(f""" + import _interpreters + if _interpreters.is_running({interp.id}): + print(True) + else: + print(False) + """)) + self.assertEqual(out.strip(), 'True') + + def test_already_destroyed(self): + interp = interpreters.create() + interp.destroy() + with self.assertRaises(RuntimeError): + interp.is_running() + + def test_bad_id(self): + interp = interpreters.Interpreter(-1) + with self.assertRaises(RuntimeError): + interp.is_running() + + +class TestInterpreterDestroy(TestBase): + + def test_basic(self): + interp1 = interpreters.create() + interp2 = interpreters.create() + interp3 = interpreters.create() + self.assertEqual(4, len(interpreters.list_all())) + interp2.destroy() + self.assertEqual(3, len(interpreters.list_all())) + + def test_all(self): + before = set(interpreters.list_all()) + interps = set() + for _ in range(3): + interp = interpreters.create() + interps.add(interp) + self.assertEqual(len(set(interpreters.list_all())), len(before | interps)) + for interp in interps: + interp.destroy() + self.assertEqual(len(set(interpreters.list_all())), len(before)) + + def test_main(self): + main, = interpreters.list_all() + with self.assertRaises(RuntimeError): + main.destroy() + + def f(): + with self.assertRaises(RuntimeError): + main.destroy() + + t = threading.Thread(target=f) + t.start() + t.join() + + def test_already_destroyed(self): + interp = interpreters.create() + interp.destroy() + with self.assertRaises(RuntimeError): + interp.destroy() + + def test_from_current(self): + main, = interpreters.list_all() + interp = interpreters.create() + script = dedent(f""" + import interpreters + try: + main = interpreters.get_current() + main.destroy() + except RuntimeError: + pass + """) + + interp.run(script) + self.assertEqual(len(set(interpreters.list_all())), len({main, interp})) + + def test_from_sibling(self): + main, = interpreters.list_all() + interp1 = interpreters.create() + script = dedent(f""" + import interpreters + interp2 = interpreters.create() + interp2.destroy() + """) + interp1.run(script) + + self.assertEqual(len(set(interpreters.list_all())), len({main, interp1})) + + def test_from_other_thread(self): + interp = interpreters.create() + def f(): + interp.destroy() + + t = threading.Thread(target=f) + t.start() + t.join() + + def test_still_running(self): + main, = interpreters.list_all() + interp = interpreters.create() + with _running(interp): + with self.assertRaises(RuntimeError): + interp.destroy() + self.assertTrue(interp.is_running()) + + +class TestInterpreterRun(TestBase): + + SCRIPT = dedent(""" + with open('{}', 'w') as out: + out.write('{}') + """) + FILENAME = 'spam' + + def setUp(self): + super().setUp() + self.interp = interpreters.create() + self._fs = None + + def tearDown(self): + if self._fs is not None: + self._fs.close() + super().tearDown() + + @property + def fs(self): + if self._fs is None: + self._fs = FSFixture(self) + return self._fs + + def test_success(self): + script, file = _captured_script('print("it worked!", end="")') + with file: + self.interp.run(script) + out = file.read() + + self.assertEqual(out, 'it worked!') + + def test_in_thread(self): + script, file = _captured_script('print("it worked!", end="")') + with file: + def f(): + self.interp.run(script) + + t = threading.Thread(target=f) + t.start() + t.join() + out = file.read() + + self.assertEqual(out, 'it worked!') + + def test_create_thread(self): + script, file = _captured_script(""" + import threading + def f(): + print('it worked!', end='') + + t = threading.Thread(target=f) + t.start() + t.join() + """) + with file: + self.interp.run(script) + out = file.read() + + self.assertEqual(out, 'it worked!') + + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + def test_fork(self): + import tempfile + with tempfile.NamedTemporaryFile('w+') as file: + file.write('') + file.flush() + + expected = 'spam spam spam spam spam' + script = dedent(f""" + import os + try: + os.fork() + except RuntimeError: + with open('{file.name}', 'w') as out: + out.write('{expected}') + """) + self.interp.run(script) + + file.seek(0) + content = file.read() + self.assertEqual(content, expected) + + def test_already_running(self): + with _running(self.interp): + with self.assertRaises(RuntimeError): + self.interp.run('print("spam")') + + def test_bad_script(self): + with self.assertRaises(TypeError): + self.interp.run(10) + + def test_bytes_for_script(self): + with self.assertRaises(TypeError): + self.interp.run(b'print("spam")') + + +class TestIsShareable(TestBase): + + def test_default_shareables(self): + shareables = [ + # singletons + None, + # builtin objects + b'spam', + 'spam', + 10, + -10, + ] + for obj in shareables: + with self.subTest(obj): + self.assertTrue( + interpreters.is_shareable(obj)) + + def test_not_shareable(self): + class Cheese: + def __init__(self, name): + self.name = name + def __str__(self): + return self.name + + class SubBytes(bytes): + """A subclass of a shareable type.""" + + not_shareables = [ + # singletons + True, + False, + NotImplemented, + ..., + # builtin types and objects + type, + object, + object(), + Exception(), + 100.0, + # user-defined types and objects + Cheese, + Cheese('Wensleydale'), + SubBytes(b'spam'), + ] + for obj in not_shareables: + with self.subTest(repr(obj)): + self.assertFalse( + interpreters.is_shareable(obj)) + + +class TestChannel(TestBase): + + def test_create_cid(self): + r, s = interpreters.create_channel() + self.assertIsInstance(r, interpreters.RecvChannel) + self.assertIsInstance(s, interpreters.SendChannel) + + def test_sequential_ids(self): + before = interpreters.list_all_channels() + channels1 = interpreters.create_channel() + channels2 = interpreters.create_channel() + channels3 = interpreters.create_channel() + after = interpreters.list_all_channels() + + self.assertEqual(len(set(after) - set(before)), + len({channels1, channels2, channels3})) + +class TestSendRecv(TestBase): + + def test_fields(self): + r, s = interpreters.create_channel() + self.assertEqual([], r.interpreters) + + def test_send_recv_main(self): + r, s = interpreters.create_channel() + orig = b'spam' + s.send(orig) + obj = r.recv() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_same_interpreter(self): + interp = interpreters.create() + out = _run_output(interp, dedent(""" + import interpreters + r, s = interpreters.create_channel() + orig = b'spam' + s.send(orig) + obj = r.recv() + assert obj is not orig + assert obj == orig + """)) + + def test_send_recv_different_threads(self): + r, s = interpreters.create_channel() + + def f(): + while True: + try: + obj = r.recv() + break + except interpreters.ChannelEmptyError: + time.sleep(0.1) + s.send(obj) + t = threading.Thread(target=f) + t.start() + + s.send(b'spam') + t.join() + obj = r.recv() + + self.assertEqual(obj, b'spam') + + def test_recv_empty(self): + r, s = interpreters.create_channel() + with self.assertRaises(interpreters.ChannelEmptyError): + r.recv() + + def test_send_recv_nowait_main(self): + r, s = interpreters.create_channel() + orig = b'spam' + s.send(orig) + obj = r.recv_nowait() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_nowait_same_interpreter(self): + interp = interpreters.create() + out = _run_output(interp, dedent(""" + import interpreters + r, s = interpreters.create_channel() + orig = b'spam' + s.send(orig) + obj = r.recv_nowait() + assert obj is not orig + assert obj == orig + """)) + + def test_send_recv_nowait_different_threads(self): + r, s = interpreters.create_channel() + + def f(): + while True: + try: + obj = r.recv_nowait() + break + except interpreters.ChannelEmptyError: + time.sleep(0.1) + s.send(obj) + t = threading.Thread(target=f) + t.start() + + s.send(b'spam') + t.join() + obj = r.recv_nowait() + + self.assertEqual(obj, b'spam') + + def test_recv_nowait_empty(self): + r, s = interpreters.create_channel() + with self.assertRaises(interpreters.ChannelEmptyError): + r.recv_nowait() + + # close + + def test_close_single_user(self): + r, s = interpreters.create_channel() + s.send(b'spam') + r.recv() + s.close() + + with self.assertRaises(interpreters.ChannelClosedError): + s.send(b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + r.recv() + + def test_close_multiple_times(self): + r, s = interpreters.create_channel() + s.send(b'spam') + r.recv() + s.close() + + with self.assertRaises(interpreters.ChannelClosedError): + s.close() + + def test_close_empty(self): + tests = [ + (False, False), + (True, False), + (False, True), + (True, True), + ] + for send, recv in tests: + with self.subTest((send, recv)): + r, s = interpreters.create_channel() + s.send(b'spam') + r.recv() + s.close() + + with self.assertRaises(interpreters.ChannelClosedError): + s.send(b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + r.recv() + + def test_close_defaults_with_unused_items(self): + r, s = interpreters.create_channel() + s.send(b'spam') + s.send(b'ham') + + with self.assertRaises(interpreters.ChannelNotEmptyError): + s.close() + r.recv() + s.send(b'eggs') + + def test_close_never_used(self): + r, s = interpreters.create_channel() + r.close() + + with self.assertRaises(interpreters.ChannelClosedError): + s.send(b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + r.recv() + + def test_close_used_multiple_times_by_single_user(self): + r, s = interpreters.create_channel() + s.send(b'spam') + s.send(b'spam') + s.send(b'spam') + r.recv() + s.close(force=True) + + with self.assertRaises(interpreters.ChannelClosedError): + s.send(b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + r.recv() + + # release + + def test_single_user(self): + r, s = interpreters.create_channel() + s.send(b'spam') + r.recv() + s.release() + + with self.assertRaises(interpreters.ChannelClosedError): + s.send(b'eggs') + + def test_with_unused_items(self): + r, s = interpreters.create_channel() + s.send(b'spam') + s.send(b'ham') + s.release() + + with self.assertRaises(interpreters.ChannelClosedError): + r.recv() + +class TestSendBuffer(TestBase): + def test_send_recv_main(self): + r, s = interpreters.create_channel() + orig = b'spam' + s.send_buffer(orig) + obj = r.recv() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_same_interpreter(self): + interp = interpreters.create() + out = _run_output(interp, dedent(""" + import interpreters + r, s = interpreters.create_channel() + orig = b'spam' + s.send_buffer(orig) + obj = r.recv() + assert obj is not orig + assert obj == orig + """)) + + def test_send_recv_different_threads(self): + r, s = interpreters.create_channel() + + def f(): + while True: + try: + obj = r.recv() + break + except interpreters.ChannelEmptyError: + time.sleep(0.1) + s.send_buffer(obj) + t = threading.Thread(target=f) + t.start() + + s.send_buffer(b'spam') + t.join() + obj = r.recv() + + self.assertEqual(obj, b'spam') diff --git a/Misc/NEWS.d/next/Core and Builtins/2020-03-06-22-09-03.bpo-39881.Wh2TTV.rst b/Misc/NEWS.d/next/Core and Builtins/2020-03-06-22-09-03.bpo-39881.Wh2TTV.rst new file mode 100644 index 00000000000000..96cf52e5fc191b --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2020-03-06-22-09-03.bpo-39881.Wh2TTV.rst @@ -0,0 +1,2 @@ +High-level Implementation of PEP 554. +(Patch By Joannah Nanjekye) \ No newline at end of file diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index cc4f5d9e6dc166..23a3fa487b87b7 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -288,6 +288,8 @@ static PyObject *ChannelNotFoundError; static PyObject *ChannelClosedError; static PyObject *ChannelEmptyError; static PyObject *ChannelNotEmptyError; +static PyObject *ChannelReleasedError; +static PyObject *NotReceivedError; static int channel_exceptions_init(PyObject *ns) @@ -295,7 +297,7 @@ channel_exceptions_init(PyObject *ns) // XXX Move the exceptions into per-module memory? // A channel-related operation failed. - ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError", + ChannelError = PyErr_NewException("_interpreters.ChannelError", PyExc_RuntimeError, NULL); if (ChannelError == NULL) { return -1; @@ -306,7 +308,7 @@ channel_exceptions_init(PyObject *ns) // An operation tried to use a channel that doesn't exist. ChannelNotFoundError = PyErr_NewException( - "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL); + "_interpreters.ChannelNotFoundError", ChannelError, NULL); if (ChannelNotFoundError == NULL) { return -1; } @@ -316,7 +318,7 @@ channel_exceptions_init(PyObject *ns) // An operation tried to use a closed channel. ChannelClosedError = PyErr_NewException( - "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL); + "_interpreters.ChannelClosedError", ChannelError, NULL); if (ChannelClosedError == NULL) { return -1; } @@ -324,9 +326,30 @@ channel_exceptions_init(PyObject *ns) return -1; } + // An operation tried to use a released channel. + ChannelReleasedError = PyErr_NewException( + "_interpreters.ChannelReleasedError", ChannelClosedError, NULL); + if (ChannelReleasedError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelReleasedError", ChannelReleasedError) != 0) { + return -1; + } + + // An operation trying to send an object when Nothing was waiting + // to receive it + NotReceivedError = PyErr_NewException( + "_interpreters.NotReceivedError", ChannelError, NULL); + if (NotReceivedError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "NotReceivedError", NotReceivedError) != 0) { + return -1; + } + // An operation tried to pop from an empty channel. ChannelEmptyError = PyErr_NewException( - "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL); + "_interpreters.ChannelEmptyError", ChannelError, NULL); if (ChannelEmptyError == NULL) { return -1; } @@ -336,7 +359,7 @@ channel_exceptions_init(PyObject *ns) // An operation tried to close a non-empty channel. ChannelNotEmptyError = PyErr_NewException( - "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL); + "_interpreters.ChannelNotEmptyError", ChannelError, NULL); if (ChannelNotEmptyError == NULL) { return -1; } @@ -486,6 +509,7 @@ typedef struct _channelend { struct _channelend *next; int64_t interp; int open; + int release; } _channelend; static _channelend * @@ -620,6 +644,10 @@ _channelends_associate(_channelends *ends, int64_t interp, int send) PyErr_SetString(ChannelClosedError, "channel already closed"); return -1; } + if (end->release && !end->open) { + PyErr_SetString(ChannelReleasedError, "channel released"); + return -1; + } // already associated return 0; } @@ -629,6 +657,27 @@ _channelends_associate(_channelends *ends, int64_t interp, int send) return 0; } +static int64_t * +_channelends_list_interpreters(_channelends *ends, int64_t *count, int send) +{ + int64_t numopen = send ? ends->numsendopen : ends->numrecvopen; + + int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)numopen); + if (ids == NULL) { + PyErr_NoMemory(); + return NULL; + } + + _channelend *ref = send ? ends->send : ends->recv; + for (int64_t i=0; ref != NULL; ref = ref->next, i++) { + ids[i] = ref->interp; + } + + *count = numopen; + + return ids; +} + static int _channelends_is_open(_channelends *ends) { @@ -713,6 +762,7 @@ typedef struct _channel { _channelqueue *queue; _channelends *ends; int open; + int release; struct _channel_closing *closing; } _PyChannelState; @@ -770,6 +820,10 @@ _channel_add(_PyChannelState *chan, int64_t interp, PyErr_SetString(ChannelClosedError, "channel closed"); goto done; } + if (chan->release && !chan->open) { + PyErr_SetString(ChannelReleasedError, "channel released"); + return -1; + } if (_channelends_associate(chan->ends, interp, 1) != 0) { goto done; } @@ -794,6 +848,10 @@ _channel_next(_PyChannelState *chan, int64_t interp) PyErr_SetString(ChannelClosedError, "channel closed"); goto done; } + if (chan->release && !chan->open) { + PyErr_SetString(ChannelReleasedError, "channel released"); + goto done; + } if (_channelends_associate(chan->ends, interp, 0) != 0) { goto done; } @@ -1284,6 +1342,83 @@ _channel_destroy(_channels *channels, int64_t id) return 0; } +static int +_channel_send_buffer(_channels *channels, int64_t id, PyObject *obj) +{ + Py_buffer buffer; + PyObject *bytes; + + if (PyObject_GetBuffer(obj, &buffer, PyBUF_SIMPLE) < 0) { + PyErr_Format(PyExc_TypeError, + "Error creating object buffer, %.80s found", + Py_TYPE(obj)->tp_name); + return -1; + } + + if (buffer.len == 0) { + PyBuffer_Release(&buffer); + return -1; + } + + PyInterpreterState *interp = _get_current(); + if (interp == NULL) { + PyBuffer_Release(&buffer); + return -1; + } + + // Look up the channel. + PyThread_type_lock mutex = NULL; + _PyChannelState *chan = _channels_lookup(channels, id, &mutex); + if (chan == NULL) { + PyBuffer_Release(&buffer); + return -1; + } + // Past this point we are responsible for releasing the mutex. + + if (chan->closing != NULL) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id); + PyThread_release_lock(mutex); + PyBuffer_Release(&buffer); + return -1; + } + + // Convert the buffer to cross-interpreter data. + _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1); + if (data == NULL) { + PyThread_release_lock(mutex); + PyBuffer_Release(&buffer); + return -1; + } + + if (buffer.buf != NULL) + bytes = PyBytes_FromStringAndSize(buffer.buf, buffer.len); + else { + Py_INCREF(Py_None); + bytes = Py_None; + } + + if (_PyObject_GetCrossInterpreterData(bytes, data) != 0) { + PyThread_release_lock(mutex); + PyMem_Free(data); + PyBuffer_Release(&buffer); + return -1; + } + + // Add the data to the channel. + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); + PyThread_release_lock(mutex); + if (res != 0) { + _PyCrossInterpreterData_Release(data); + PyMem_Free(data); + PyBuffer_Release(&buffer); + return -1; + } + + PyBuffer_Release(&buffer); + + return 0; +} + static int _channel_send(_channels *channels, int64_t id, PyObject *obj) { @@ -1383,6 +1518,13 @@ _channel_drop(_channels *channels, int64_t id, int send, int recv) } // Past this point we are responsible for releasing the mutex. + // Release the channel + if (!chan->release) { + PyErr_SetString(ChannelClosedError, "channel already released"); + return -1; + } + chan->release = 1; + // Close one or both of the two ends. int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv); PyThread_release_lock(mutex); @@ -1752,7 +1894,7 @@ PyDoc_STRVAR(channelid_doc, static PyTypeObject ChannelIDtype = { PyVarObject_HEAD_INIT(&PyType_Type, 0) - "_xxsubinterpreters.ChannelID", /* tp_name */ + "_interpreters.ChannelID", /* tp_name */ sizeof(channelid), /* tp_basicsize */ 0, /* tp_itemsize */ (destructor)channelid_dealloc, /* tp_dealloc */ @@ -1808,7 +1950,7 @@ interp_exceptions_init(PyObject *ns) if (RunFailedError == NULL) { // An uncaught exception came out of interp_run_string(). - RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError", + RunFailedError = PyErr_NewException("_interpreters.RunFailedError", PyExc_RuntimeError, NULL); if (RunFailedError == NULL) { return -1; @@ -2325,6 +2467,65 @@ PyDoc_STRVAR(channel_list_all_doc, \n\ Return the list of all IDs for active channels."); + +static PyObject * +channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "send", NULL}; + int64_t cid; /* Channel ID */ + int send = 0; /* Send or receive end? */ + PyObject *ret = NULL; + + if (!PyArg_ParseTupleAndKeywords( + args, kwds, "O&$p:channel_list_interpreters", + kwlist, channel_id_converter, &cid, &send)) { + return NULL; + } + + _PyChannelState *chan = _channels_lookup(&_globals.channels, cid, NULL); + if (chan == NULL) { + return NULL; + } + + int64_t count = 0; /* Number of interpreters */ + int64_t *ids = _channelends_list_interpreters(chan->ends, &count, send); + if (ids == NULL) { + goto except; + } + + ret = PyList_New((Py_ssize_t)count); + if (ret == NULL) { + goto except; + } + + for (int64_t i=0; i < count; i++) { + PyObject *id_obj = _PyInterpreterID_New(ids[i]); + if (id_obj == NULL) { + goto except; + } + PyList_SET_ITEM(ret, i, id_obj); + } + + goto finally; + +except: + Py_XDECREF(ret); + ret = NULL; + +finally: + PyMem_Free(ids); + return ret; +} + +PyDoc_STRVAR(channel_list_interpreters_doc, +"channel_list_interpreters(cid, *, send) -> [id]\n\ +\n\ +Return the list of all interpreter IDs associated with an end of the channel.\n\ +\n\ +The 'send' argument should be a boolean indicating whether to use the send or\n\ +receive end."); + + static PyObject * channel_send(PyObject *self, PyObject *args, PyObject *kwds) { @@ -2347,6 +2548,28 @@ PyDoc_STRVAR(channel_send_doc, \n\ Add the object's data to the channel's queue."); +static PyObject * +channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "obj", NULL}; + int64_t cid; + PyObject *obj; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, + channel_id_converter, &cid, &obj)) { + return NULL; + } + + if (_channel_send_buffer(&_globals.channels, cid, obj) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(channel_send_buffer_doc, +"channel_send_buffer(cid, obj)\n\ +\n\ +Add the object's buffer to the channel's queue."); + static PyObject * channel_recv(PyObject *self, PyObject *args, PyObject *kwds) { @@ -2478,8 +2701,12 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, {"channel_list_all", channel_list_all, METH_NOARGS, channel_list_all_doc}, + {"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters, + METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, {"channel_send", (PyCFunction)(void(*)(void))channel_send, METH_VARARGS | METH_KEYWORDS, channel_send_doc}, + {"channel_send_buffer", (PyCFunction)(void(*)(void))channel_send_buffer, + METH_VARARGS | METH_KEYWORDS, channel_send_buffer_doc}, {"channel_recv", (PyCFunction)(void(*)(void))channel_recv, METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, {"channel_close", (PyCFunction)(void(*)(void))channel_close, @@ -2501,7 +2728,7 @@ The 'interpreters' module provides a more convenient interface."); static struct PyModuleDef interpretersmodule = { PyModuleDef_HEAD_INIT, - "_xxsubinterpreters", /* m_name */ + "_interpreters", /* m_name */ module_doc, /* m_doc */ -1, /* m_size */ module_functions, /* m_methods */ @@ -2513,7 +2740,7 @@ static struct PyModuleDef interpretersmodule = { PyMODINIT_FUNC -PyInit__xxsubinterpreters(void) +PyInit__interpreters(void) { if (_init_globals() != 0) { return NULL; diff --git a/PC/config.c b/PC/config.c index 8eaeb31c9b934b..38d9506ffdd23e 100644 --- a/PC/config.c +++ b/PC/config.c @@ -35,7 +35,7 @@ extern PyObject* PyInit__codecs(void); extern PyObject* PyInit__weakref(void); /* XXX: These two should really be extracted to standalone extensions. */ extern PyObject* PyInit_xxsubtype(void); -extern PyObject* PyInit__xxsubinterpreters(void); +extern PyObject* PyInit__interpreters(void); extern PyObject* PyInit__random(void); extern PyObject* PyInit_itertools(void); extern PyObject* PyInit__collections(void); @@ -133,7 +133,7 @@ struct _inittab _PyImport_Inittab[] = { {"_json", PyInit__json}, {"xxsubtype", PyInit_xxsubtype}, - {"_xxsubinterpreters", PyInit__xxsubinterpreters}, + {"_interpreters", PyInit__interpreters}, #ifdef _Py_HAVE_ZLIB {"zlib", PyInit_zlib}, #endif diff --git a/setup.py b/setup.py index a3313158179508..8908fa697f1f38 100644 --- a/setup.py +++ b/setup.py @@ -908,7 +908,7 @@ def detect_simple_extensions(self): self.add(Extension('syslog', ['syslogmodule.c'])) # Python interface to subinterpreter C-API. - self.add(Extension('_xxsubinterpreters', ['_xxsubinterpretersmodule.c'])) + self.add(Extension('_interpreters', ['_xxsubinterpretersmodule.c'])) # # Here ends the simple stuff. From here on, modules need certain