Skip to content

Commit 7b68b7d

Browse files
authored
Merge pull request #72 from AutomatedProcessImprovement/multitasking
Las Version Multitasking - Updated After Second Round Journal
2 parents ae42c0f + adde413 commit 7b68b7d

File tree

86 files changed

+41078
-187054
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+41078
-187054
lines changed

prosimos/control_flow_manager.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ def __init__(self):
162162
self.gateway_execution_limit = 1000
163163
self.simulation_execution_stats = SimulationExecutionStats()
164164

165+
def set_element_probabilities(self, element_probability, task_resource_probability):
166+
self.element_probability = element_probability
167+
self.task_resource_probability = task_resource_probability
168+
165169
def set_additional_fields_from_json(self, element_probability, task_resource_probability,
166170
event_distribution, batch_processing, gateway_conditions,
167171
gateway_execution_limit):
@@ -361,9 +365,13 @@ def update_process_state(self, case_id, e_id, p_state, completed_datetime_prev_e
361365
return enabled_tasks, visited_at
362366

363367
def get_all_attributes(self, case_id):
368+
if self.all_attributes is None:
369+
return {}
364370
all_current_attributes = {}
365-
all_current_attributes.update(self.all_attributes["global"])
366-
all_current_attributes.update(self.all_attributes[case_id])
371+
if "global" in self.all_attributes:
372+
all_current_attributes.update(self.all_attributes["global"])
373+
if case_id in self.all_attributes:
374+
all_current_attributes.update(self.all_attributes[case_id])
367375
return all_current_attributes
368376

369377
def is_task_batched(self, task_id):
@@ -876,7 +884,9 @@ def _find_next(self, f_arc_and_duration, case_id, p_state, enabled_tasks, to_exe
876884
to_execute.append(next_e)
877885

878886
def _check_and_update_enabling_time(self, p_case, e_id, enabled_at: CustomDatetimeAndSeconds):
879-
if self.last_datetime[e_id][p_case] is None or self.last_datetime[e_id][p_case].datetime < enabled_at.datetime:
887+
if p_case not in self.last_datetime[e_id]:
888+
self.last_datetime[e_id][p_case] = enabled_at
889+
elif self.last_datetime[e_id][p_case] is None or self.last_datetime[e_id][p_case].datetime < enabled_at.datetime:
880890
self.last_datetime[e_id][p_case] = enabled_at
881891
elif self.last_datetime[e_id][p_case].datetime > enabled_at.datetime:
882892
enabled_at = self.last_datetime[e_id][p_case]

prosimos/multitasking/multitasking_parser.py

Whitespace-only changes.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import random
2+
from datetime import datetime
3+
4+
from pix_framework.discovery.probabilistic_multitasking.discovery import MultiType
5+
from prosimos.simulation_queues_ds import PriorityQueue
6+
7+
8+
class MultiTaskDS:
9+
def __init__(self, mt_type: str, g_size: int = 0):
10+
self.mt_type = MultiTaskDS._extract_type(mt_type)
11+
self.g_size = g_size
12+
self.total_granules = 1440 // g_size if g_size > 0 else 1
13+
self.res_multitask_info = dict()
14+
self.allocated_tasks = dict()
15+
self.active_datetimes = dict()
16+
self.relative_workload = dict()
17+
self.expected_workload = dict()
18+
self.executed_tasks = dict()
19+
self.total_tasks = dict()
20+
21+
def init_relative_workload(self, task_res_distr: dict):
22+
for t_id in task_res_distr:
23+
total_workload = 0.0
24+
for r_id in task_res_distr[t_id]:
25+
if r_id not in self.relative_workload:
26+
self.relative_workload[r_id] = dict()
27+
self.executed_tasks[r_id] = dict()
28+
total_workload += self.expected_workload[r_id]
29+
for r_id in task_res_distr[t_id]:
30+
if total_workload > 0:
31+
self.relative_workload[r_id][t_id] = self.expected_workload[r_id] / total_workload
32+
else:
33+
self.relative_workload[r_id][t_id] = 0.0
34+
self.executed_tasks[r_id][t_id] = 0
35+
self.total_tasks[t_id] = 0
36+
37+
def update_expected_workload(self, r_id, workload):
38+
self.expected_workload[r_id] = workload
39+
40+
def resource_workload(self, r_id, t_id):
41+
if r_id in self.executed_tasks and self.total_tasks[t_id] > 0:
42+
return self.executed_tasks[r_id][t_id] / self.total_tasks[t_id]
43+
return 0.0
44+
45+
def workload_diff(self, r_id, t_id):
46+
if r_id in self.executed_tasks:
47+
return abs(self.resource_workload(r_id, t_id) - self.relative_workload[r_id][t_id])
48+
return 0.0
49+
50+
def allocate_task_to(self, r_id: str, t_id: str, completed_at: datetime):
51+
if r_id in self.allocated_tasks:
52+
self.allocated_tasks[r_id] += 1
53+
if self.active_datetimes[r_id] is None:
54+
self.active_datetimes[r_id] = completed_at
55+
else:
56+
self.active_datetimes[r_id] = max(self.active_datetimes[r_id], completed_at)
57+
self.executed_tasks[r_id][t_id] += 1
58+
self.total_tasks[t_id] += 1
59+
60+
def release_tasks_from(self, r_id: str, completed_at: datetime):
61+
if r_id in self.allocated_tasks and self.active_datetimes[r_id] is not None:
62+
completed_at = max(self.active_datetimes[r_id], completed_at)
63+
self.active_datetimes[r_id] = None
64+
self.allocated_tasks[r_id] = 0
65+
return completed_at
66+
67+
def can_get_new_tasks(self, r_id: str, at_datetime: datetime):
68+
if r_id in self.allocated_tasks and self.allocated_tasks[r_id] > 0:
69+
if self.mt_type is MultiType.GLOBAL:
70+
if self.allocated_tasks[r_id] >= len(self.res_multitask_info[r_id]):
71+
return False
72+
return random.random() <= self.res_multitask_info[r_id][self.allocated_tasks[r_id]]
73+
else:
74+
wd = at_datetime.weekday()
75+
gr = self.interval_index(at_datetime)
76+
if self.allocated_tasks[r_id] >= len(self.res_multitask_info[r_id][wd][gr]):
77+
return False
78+
return random.random() <= self.res_multitask_info[r_id][wd][gr][self.allocated_tasks[r_id]]
79+
return True
80+
81+
def register_resource(self, r_id: str):
82+
if r_id not in self.res_multitask_info:
83+
self.allocated_tasks[r_id] = 0
84+
self.active_datetimes[r_id] = None
85+
if self.mt_type is MultiType.GLOBAL:
86+
self.res_multitask_info[r_id] = [0.0]
87+
else:
88+
total_granules = self.total_granules
89+
self.res_multitask_info[r_id] = [[[0.0] for _ in range(total_granules)] for _ in range(7)]
90+
91+
def register_multitasks(self, r_id: str, task_freq: int, prob: float, week_day: int = None, granule: int = None):
92+
if not 0 <= prob <= 1.0:
93+
raise ValueError("Probability 'prob' must be between 0 and 1.0 inclusive")
94+
if self.mt_type is MultiType.GLOBAL:
95+
self.res_multitask_info[r_id].extend([prob] * (task_freq - len(self.res_multitask_info[r_id])))
96+
self.res_multitask_info[r_id].append(prob)
97+
else:
98+
self.res_multitask_info[r_id][week_day][granule].extend(
99+
[prob] * (task_freq - len(self.res_multitask_info[r_id][week_day][granule]))
100+
)
101+
self.res_multitask_info[r_id][week_day][granule].append(prob)
102+
103+
def register_local_multitasks(self, r_id: str, week_day: int, from_dt: datetime, to_dt: datetime, task_freq: int,
104+
prob: float):
105+
if week_day is None or not 0 <= week_day <= 6:
106+
raise ValueError("Weekdays must be between 0 and 6 inclusive - from 0: MONDAY to 6: SUNDAY")
107+
from_i = self.interval_index(from_dt)
108+
to_i = self.interval_index(to_dt) - 1
109+
if to_i < 0:
110+
to_i = self.total_granules - 1
111+
112+
while from_i <= to_i:
113+
self.register_multitasks(r_id, task_freq, prob, week_day, from_i)
114+
from_i += 1
115+
116+
def interval_index(self, current_date: datetime):
117+
return (current_date.hour * 60 + current_date.minute) // self.g_size
118+
119+
@staticmethod
120+
def _extract_type(type_str: str):
121+
upp = type_str.upper()
122+
if upp == 'GLOBAL':
123+
return MultiType.GLOBAL
124+
elif upp == 'LOCAL':
125+
return MultiType.LOCAL
126+
raise ValueError("Multitasking Type (mt_type) must be GLOBAL or LOCAL")

prosimos/simulation_engine.py

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import numpy as np
55
from datetime import timedelta
66
from typing import List
7+
import random
78

89
import pytz
910

@@ -211,13 +212,15 @@ def pop_and_allocate_resource(self, task_id: str, num_allocated_tasks: int):
211212
return r_id, r_avail_at
212213

213214
def execute_task(self, c_event: EnabledEvent):
214-
r_id, r_avail_at = self.pop_and_allocate_resource(c_event.task_id, 1)
215-
216-
r_avail_at = max(c_event.enabled_at, r_avail_at)
217-
avail_datetime = self._datetime_from(r_avail_at)
218-
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
219-
if not is_working:
220-
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)
215+
if self.sim_setup.multitask_info is None:
216+
r_id, r_avail_at = self.pop_and_allocate_resource(c_event.task_id, 1)
217+
r_avail_at = max(c_event.enabled_at, r_avail_at)
218+
avail_datetime = self._datetime_from(r_avail_at)
219+
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
220+
if not is_working:
221+
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)
222+
else:
223+
r_id, r_avail_at = self.allocate_multitasking_resource(c_event)
221224

222225
full_evt = TaskEvent(
223226
c_event.p_case,
@@ -231,13 +234,16 @@ def execute_task(self, c_event: EnabledEvent):
231234

232235
self.log_info.add_event_info(c_event.p_case, full_evt, self.sim_setup.resources_map[r_id].cost_per_hour)
233236

234-
r_next_available = full_evt.completed_at
237+
if self.sim_setup.multitask_info is None:
238+
r_next_available = full_evt.completed_at
235239

236-
if self.sim_resources[r_id].switching_time > 0:
237-
r_next_available += self.sim_setup.next_resting_time(r_id, full_evt.completed_datetime)
240+
if self.sim_resources[r_id].switching_time > 0:
241+
r_next_available += self.sim_setup.next_resting_time(r_id, full_evt.completed_datetime)
238242

239-
self.resource_queue.update_resource_availability(r_id, r_next_available)
240-
self.sim_resources[r_id].worked_time += full_evt.ideal_duration
243+
self.resource_queue.update_resource_availability(r_id, r_next_available)
244+
self.sim_resources[r_id].worked_time += full_evt.ideal_duration
245+
else:
246+
self.release_multitasking_resource(r_id, full_evt, r_avail_at)
241247

242248
self.update_attributes(c_event)
243249
self.log_writer.add_csv_row(self.get_csv_row_data(full_evt))
@@ -247,6 +253,67 @@ def execute_task(self, c_event: EnabledEvent):
247253

248254
return completed_at, completed_datetime
249255

256+
def allocate_multitasking_resource(self, c_event: EnabledEvent):
257+
r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)
258+
259+
candidates = [[r_id, r_avail_at]]
260+
while r_avail_at is not None and r_avail_at <= c_event.enabled_at:
261+
r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)
262+
if r_id is not None:
263+
candidates.append([r_id, r_avail_at])
264+
265+
if len(candidates) > 1:
266+
i = random.randint(0, len(candidates) - 1)
267+
[r_id, r_avail_at] = candidates[i]
268+
for j in range(0, len(candidates)):
269+
if j != i:
270+
self.resource_queue.update_resource_availability(candidates[j][0], candidates[j][1])
271+
elif r_id is None and len(candidates) == 1:
272+
[r_id, r_avail_at] = candidates[0]
273+
274+
# best_r, best_avail = r_id, r_avail_at
275+
# while r_avail_at is not None and r_avail_at <= c_event.enabled_at:
276+
# c_workload = self.sim_setup.multitask_info.workload_diff(r_id, c_event.task_id)
277+
# if c_workload > max_workload_diff:
278+
# candidates.append([best_r, best_avail])
279+
# best_r, best_avail = r_id, r_avail_at
280+
# max_workload_diff = c_workload
281+
# else:
282+
# candidates.append([r_id, r_avail_at])
283+
# r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)
284+
285+
# print(f'Selected: {best_r, best_avail}')
286+
# if 'Loan Officer' in best_r and len(candidates) > 1:
287+
# print("hola")
288+
# if len(candidates) > 0:
289+
# # for r in candidates:
290+
# # print(r)
291+
# # print("-------------------------------------")
292+
# for j in range(0, len(candidates)):
293+
# self.resource_queue.update_resource_availability(candidates[j][0], candidates[j][1])
294+
#
295+
# r_id, r_avail_at = best_r, best_avail
296+
# self.sim_resources[r_id].allocated_tasks += 1
297+
if c_event.enabled_at > r_avail_at:
298+
next_avail_at = c_event.enabled_at
299+
avail_datetime = self._datetime_from(next_avail_at)
300+
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
301+
if not is_working:
302+
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)
303+
304+
return r_id, r_avail_at
305+
306+
def release_multitasking_resource(self, r_id: str, full_evt: TaskEvent, r_init_avail):
307+
completed_dt = self._datetime_from(full_evt.completed_at)
308+
self.sim_setup.multitask_info.allocate_task_to(r_id, full_evt.task_id, completed_dt)
309+
r_next_avail = r_init_avail
310+
if not self.sim_setup.multitask_info.can_get_new_tasks(r_id, completed_dt):
311+
last_time = self.sim_setup.multitask_info.release_tasks_from(r_id, completed_dt)
312+
r_next_avail += self.sim_setup.next_resting_time(r_id, last_time)
313+
314+
self.resource_queue.update_resource_availability(r_id, r_next_avail)
315+
self.sim_resources[r_id].worked_time += full_evt.ideal_duration
316+
250317
def update_attributes(self, current_event):
251318
event_attributes = self.sim_setup.all_attributes.event_attributes.attributes
252319
global_event_attributes = self.sim_setup.all_attributes.global_event_attributes.attributes
@@ -623,7 +690,7 @@ def run_simulation(
623690
diffsim_info.set_starting_datetime(starting_at_datetime)
624691

625692
if stat_out_path is None and log_out_path is None:
626-
return run_simpy_simulation(diffsim_info, None, None)
693+
return run_simpy_simulation(diffsim_info, None, None, fixed_arrival_times)
627694

628695
csv_writer_config = {
629696
'delimiter': ',',

prosimos/simulation_properties_parser.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from prosimos.case_attributes import AllCaseAttributes, CaseAttribute
1414
from prosimos.control_flow_manager import BPMN, EVENT_TYPE, BPMNGraph, ElementInfo
1515
from prosimos.histogram_distribution import HistogramDistribution
16+
from prosimos.multitasking.multitasking_struct import MultiTaskDS
1617
from prosimos.prioritisation import AllPriorityRules
1718
from prosimos.prioritisation_parser import PrioritisationParser
1819
from prosimos.probability_distributions import Choice
@@ -41,6 +42,7 @@
4142
EVENT_ATTRIBUTES = "event_attributes"
4243
GLOBAL_ATTRIBUTES = "global_attributes"
4344
GATEWAY_EXECUTION_LIMIT = "gateway_execution_limit"
45+
MULTITASKING_SECTION = "multitask"
4446

4547
DEFAULT_GATEWAY_EXECUTION_LIMIT = 1000
4648

@@ -120,6 +122,10 @@ def parse_json_sim_parameters(json_path):
120122
if GATEWAY_EXECUTION_LIMIT in json_data \
121123
else DEFAULT_GATEWAY_EXECUTION_LIMIT
122124

125+
multitasking_info = parse_multitasking_model(json_data[MULTITASKING_SECTION], task_resource_distribution) \
126+
if MULTITASKING_SECTION in json_data \
127+
else None
128+
123129
return (
124130
resources_map,
125131
calendars_map,
@@ -133,10 +139,59 @@ def parse_json_sim_parameters(json_path):
133139
gateway_conditions,
134140
all_attributes,
135141
gateway_execution_limit,
136-
model_type
142+
model_type,
143+
multitasking_info
137144
)
138145

139146

147+
def parse_multitasking_model(json_data, task_resource_distribution):
148+
if json_data["type"] == "local":
149+
return _parse_local_multitasking(json_data, task_resource_distribution)
150+
elif json_data["type"] == "global":
151+
return _parse_global_multitasking(json_data, task_resource_distribution)
152+
return None
153+
154+
155+
def _parse_global_multitasking(json_data, task_res_distr):
156+
multi_info = MultiTaskDS(json_data["type"])
157+
for res_info in json_data["values"]:
158+
r_id = res_info["resource_id"]
159+
multi_info.update_expected_workload(r_id, res_info["r_workload"])
160+
multi_info.register_resource(r_id)
161+
for mt_info in res_info["multitask_info"]:
162+
multi_info.register_multitasks(r_id, mt_info["parallel_tasks"], mt_info["probability"])
163+
# multi_info.init_relative_workload(task_res_distr)
164+
return multi_info
165+
166+
167+
def _parse_local_multitasking(json_data, task_res_distr):
168+
multi_info = MultiTaskDS(
169+
json_data["type"],
170+
60
171+
)
172+
# multi_info = MultiTaskDS(
173+
# json_data["type"],
174+
# json_data["granule_size"]["value"] * granule_units[(json_data["granule_size"]["time_unit"]).upper()]
175+
# )
176+
for res_info in json_data["values"]:
177+
r_id = res_info["resource_id"]
178+
multi_info.register_resource(r_id)
179+
multi_info.update_expected_workload(r_id, res_info["r_workload"])
180+
for wd_info in res_info["weekly_probability"]:
181+
for gr_info in wd_info:
182+
time_periods = convert_to_fuzzy_time_periods(gr_info)
183+
for p_info in time_periods:
184+
for mt_info in gr_info["multitask_info"]:
185+
multi_info.register_local_multitasks(r_id,
186+
int_week_days[p_info["weekDay"]],
187+
parse_datetime(p_info["beginTime"], False),
188+
parse_datetime(p_info["endTime"], False),
189+
mt_info["parallel_tasks"],
190+
mt_info["probability"])
191+
# multi_info.init_relative_workload(task_res_distr)
192+
return multi_info
193+
194+
140195
def parse_fuzzy_calendar(json_data):
141196
granule_size = json_data["granule_size"]["value"] * granule_units[(json_data["granule_size"]["time_unit"]).upper()]
142197
fuzzy_calendars = dict()
@@ -173,7 +228,7 @@ def convert_to_fuzzy_time_periods(time_period):
173228
"weekDay": week_day,
174229
"beginTime": time_period["beginTime"],
175230
"endTime": time_period["endTime"],
176-
"probability": time_period["probability"],
231+
"probability": time_period["probability"] if "probability" in time_period else 0.0,
177232
}
178233
time_periods.append(time_period)
179234

0 commit comments

Comments
 (0)