@@ -1006,11 +1006,6 @@ def test_idle_process_reuse_multiple(self):
1006
1006
ProcessPoolForkserverMixin ,
1007
1007
ProcessPoolSpawnMixin ))
1008
1008
1009
- def hide_process_stderr ():
1010
- import io
1011
- sys .stderr = io .StringIO ()
1012
-
1013
-
1014
1009
def _crash (delay = None ):
1015
1010
"""Induces a segfault."""
1016
1011
if delay :
@@ -1027,13 +1022,18 @@ def _exit():
1027
1022
1028
1023
def _raise_error (Err ):
1029
1024
"""Function that raises an Exception in process."""
1030
- hide_process_stderr ()
1025
+ raise Err ()
1026
+
1027
+
1028
+ def _raise_error_ignore_stderr (Err ):
1029
+ """Function that raises an Exception in process and ignores stderr."""
1030
+ import io
1031
+ sys .stderr = io .StringIO ()
1031
1032
raise Err ()
1032
1033
1033
1034
1034
1035
def _return_instance (cls ):
1035
1036
"""Function that returns a instance of cls."""
1036
- hide_process_stderr ()
1037
1037
return cls ()
1038
1038
1039
1039
@@ -1072,17 +1072,12 @@ class ErrorAtUnpickle(object):
1072
1072
"""Bad object that triggers an error at unpickling time."""
1073
1073
def __reduce__ (self ):
1074
1074
from pickle import UnpicklingError
1075
- return _raise_error , (UnpicklingError , )
1075
+ return _raise_error_ignore_stderr , (UnpicklingError , )
1076
1076
1077
1077
1078
1078
class ExecutorDeadlockTest :
1079
1079
TIMEOUT = support .SHORT_TIMEOUT
1080
1080
1081
- @classmethod
1082
- def _sleep_id (cls , x , delay ):
1083
- time .sleep (delay )
1084
- return x
1085
-
1086
1081
def _fail_on_deadlock (self , executor ):
1087
1082
# If we did not recover before TIMEOUT seconds, consider that the
1088
1083
# executor is in a deadlock state and forcefully clean all its
@@ -1102,57 +1097,84 @@ def _fail_on_deadlock(self, executor):
1102
1097
self .fail (f"Executor deadlock:\n \n { tb } " )
1103
1098
1104
1099
1105
- def test_crash (self ):
1106
- # extensive testing for deadlock caused by crashes in a pool.
1100
+ def _check_crash (self , error , func , * args , ignore_stderr = False ):
1101
+ # test for deadlock caused by crashes in a pool
1107
1102
self .executor .shutdown (wait = True )
1108
- crash_cases = [
1109
- # Check problem occurring while pickling a task in
1110
- # the task_handler thread
1111
- (id , (ErrorAtPickle (),), PicklingError , "error at task pickle" ),
1112
- # Check problem occurring while unpickling a task on workers
1113
- (id , (ExitAtUnpickle (),), BrokenProcessPool ,
1114
- "exit at task unpickle" ),
1115
- (id , (ErrorAtUnpickle (),), BrokenProcessPool ,
1116
- "error at task unpickle" ),
1117
- (id , (CrashAtUnpickle (),), BrokenProcessPool ,
1118
- "crash at task unpickle" ),
1119
- # Check problem occurring during func execution on workers
1120
- (_crash , (), BrokenProcessPool ,
1121
- "crash during func execution on worker" ),
1122
- (_exit , (), SystemExit ,
1123
- "exit during func execution on worker" ),
1124
- (_raise_error , (RuntimeError , ), RuntimeError ,
1125
- "error during func execution on worker" ),
1126
- # Check problem occurring while pickling a task result
1127
- # on workers
1128
- (_return_instance , (CrashAtPickle ,), BrokenProcessPool ,
1129
- "crash during result pickle on worker" ),
1130
- (_return_instance , (ExitAtPickle ,), SystemExit ,
1131
- "exit during result pickle on worker" ),
1132
- (_return_instance , (ErrorAtPickle ,), PicklingError ,
1133
- "error during result pickle on worker" ),
1134
- # Check problem occurring while unpickling a task in
1135
- # the result_handler thread
1136
- (_return_instance , (ErrorAtUnpickle ,), BrokenProcessPool ,
1137
- "error during result unpickle in result_handler" ),
1138
- (_return_instance , (ExitAtUnpickle ,), BrokenProcessPool ,
1139
- "exit during result unpickle in result_handler" )
1140
- ]
1141
- for func , args , error , name in crash_cases :
1142
- with self .subTest (name ):
1143
- # The captured_stderr reduces the noise in the test report
1144
- with support .captured_stderr ():
1145
- executor = self .executor_type (
1146
- max_workers = 2 , mp_context = get_context (self .ctx ))
1147
- res = executor .submit (func , * args )
1148
- with self .assertRaises (error ):
1149
- try :
1150
- res .result (timeout = self .TIMEOUT )
1151
- except futures .TimeoutError :
1152
- # If we did not recover before TIMEOUT seconds,
1153
- # consider that the executor is in a deadlock state
1154
- self ._fail_on_deadlock (executor )
1155
- executor .shutdown (wait = True )
1103
+
1104
+ executor = self .executor_type (
1105
+ max_workers = 2 , mp_context = get_context (self .ctx ))
1106
+ res = executor .submit (func , * args )
1107
+
1108
+ if ignore_stderr :
1109
+ cm = support .captured_stderr ()
1110
+ else :
1111
+ cm = contextlib .nullcontext ()
1112
+
1113
+ try :
1114
+ with self .assertRaises (error ):
1115
+ with cm :
1116
+ res .result (timeout = self .TIMEOUT )
1117
+ except futures .TimeoutError :
1118
+ # If we did not recover before TIMEOUT seconds,
1119
+ # consider that the executor is in a deadlock state
1120
+ self ._fail_on_deadlock (executor )
1121
+ executor .shutdown (wait = True )
1122
+
1123
+ def test_error_at_task_pickle (self ):
1124
+ # Check problem occurring while pickling a task in
1125
+ # the task_handler thread
1126
+ self ._check_crash (PicklingError , id , ErrorAtPickle ())
1127
+
1128
+ def test_exit_at_task_unpickle (self ):
1129
+ # Check problem occurring while unpickling a task on workers
1130
+ self ._check_crash (BrokenProcessPool , id , ExitAtUnpickle ())
1131
+
1132
+ def test_error_at_task_unpickle (self ):
1133
+ # Check problem occurring while unpickling a task on workers
1134
+ self ._check_crash (BrokenProcessPool , id , ErrorAtUnpickle ())
1135
+
1136
+ def test_crash_at_task_unpickle (self ):
1137
+ # Check problem occurring while unpickling a task on workers
1138
+ self ._check_crash (BrokenProcessPool , id , CrashAtUnpickle ())
1139
+
1140
+ def test_crash_during_func_exec_on_worker (self ):
1141
+ # Check problem occurring during func execution on workers
1142
+ self ._check_crash (BrokenProcessPool , _crash )
1143
+
1144
+ def test_exit_during_func_exec_on_worker (self ):
1145
+ # Check problem occurring during func execution on workers
1146
+ self ._check_crash (SystemExit , _exit )
1147
+
1148
+ def test_error_during_func_exec_on_worker (self ):
1149
+ # Check problem occurring during func execution on workers
1150
+ self ._check_crash (RuntimeError , _raise_error , RuntimeError )
1151
+
1152
+ def test_crash_during_result_pickle_on_worker (self ):
1153
+ # Check problem occurring while pickling a task result
1154
+ # on workers
1155
+ self ._check_crash (BrokenProcessPool , _return_instance , CrashAtPickle )
1156
+
1157
+ def test_exit_during_result_pickle_on_worker (self ):
1158
+ # Check problem occurring while pickling a task result
1159
+ # on workers
1160
+ self ._check_crash (SystemExit , _return_instance , ExitAtPickle )
1161
+
1162
+ def test_error_during_result_pickle_on_worker (self ):
1163
+ # Check problem occurring while pickling a task result
1164
+ # on workers
1165
+ self ._check_crash (PicklingError , _return_instance , ErrorAtPickle )
1166
+
1167
+ def test_error_during_result_unpickle_in_result_handler (self ):
1168
+ # Check problem occurring while unpickling a task in
1169
+ # the result_handler thread
1170
+ self ._check_crash (BrokenProcessPool ,
1171
+ _return_instance , ErrorAtUnpickle ,
1172
+ ignore_stderr = True )
1173
+
1174
+ def test_exit_during_result_unpickle_in_result_handler (self ):
1175
+ # Check problem occurring while unpickling a task in
1176
+ # the result_handler thread
1177
+ self ._check_crash (BrokenProcessPool , _return_instance , ExitAtUnpickle )
1156
1178
1157
1179
def test_shutdown_deadlock (self ):
1158
1180
# Test that the pool calling shutdown do not cause deadlock
0 commit comments