|
def parse_qbp_simulation_process(qbp_bpmn_path, out_file): |
|
tree = ET.parse(qbp_bpmn_path) |
|
root = tree.getroot() |
|
simod_root = root.find("qbp:processSimulationInfo", simod_ns) |
|
if simod_root is None: |
|
print('PARSING ABORTED: Input BPMN model is not a simulation model, i.e., simulation parameters are missing.') |
|
return |
|
|
|
# 1. Extracting gateway branching probabilities |
|
gateways_branching = dict() |
|
reverse_map = dict() |
|
for process in root.findall('xmlns:process', bpmn_element_ns): |
|
for xmlns_key in ['xmlns:exclusiveGateway', 'xmlns:inclusiveGateway']: |
|
for bpmn_element in process.findall(xmlns_key, bpmn_element_ns): |
|
if bpmn_element.attrib["gatewayDirection"] == "Diverging": |
|
gateways_branching[bpmn_element.attrib["id"]] = dict() |
|
for out_flow in bpmn_element.findall("xmlns:outgoing", bpmn_element_ns): |
|
arc_id = out_flow.text.strip() |
|
gateways_branching[bpmn_element.attrib["id"]][arc_id] = 0 |
|
reverse_map[arc_id] = bpmn_element.attrib["id"] |
|
for flow_prob in simod_root.find("qbp:sequenceFlows", simod_ns).findall("qbp:sequenceFlow", simod_ns): |
|
flow_id = flow_prob.attrib["elementId"] |
|
gateways_branching[reverse_map[flow_id]][flow_id] = flow_prob.attrib["executionProbability"] |
|
|
|
# 2. Extracting Resource Calendars |
|
resource_pools = dict() |
|
calendars_map = dict() |
|
bpmn_calendars = simod_root.find("qbp:timetables", simod_ns) |
|
arrival_calendar_id = None |
|
|
|
for calendar_info in bpmn_calendars: |
|
calendar_id = calendar_info.attrib["id"] |
|
if calendar_id not in calendars_map: |
|
calendars_map[calendar_id] = list() |
|
|
|
time_tables = calendar_info.find("qbp:rules", simod_ns).findall("qbp:rule", simod_ns) |
|
if 'ARRIVAL_CALENDAR' in calendar_id or (arrival_calendar_id is None and 'DEFAULT_TIMETABLE' in calendar_id): |
|
arrival_calendar_id = calendar_id |
|
for time_table in time_tables: |
|
calendars_map[calendar_id].append({"from": time_table.attrib["fromWeekDay"], |
|
"to": time_table.attrib["toWeekDay"], |
|
"beginTime": format_date(time_table.attrib["fromTime"]), |
|
"endTime": format_date(time_table.attrib["toTime"])}) |
|
|
|
# 3. Extracting Arrival time distribution |
|
arrival_time_dist = extract_dist_params(simod_root.find("qbp:arrivalRateDistribution", simod_ns)) |
|
|
|
# 4. Extracting task-resource duration distributions |
|
bpmn_resources = simod_root.find("qbp:resources", simod_ns) |
|
simod_elements = simod_root.find("qbp:elements", simod_ns) |
|
pools_json = dict() |
|
|
|
resource_calendars = dict() |
|
for resource in bpmn_resources: |
|
pools_json[resource.attrib["id"]] = {"name": resource.attrib["name"], "resource_list": list()} |
|
resource_pools[resource.attrib["id"]] = list() |
|
calendar_id = resource.attrib["timetableId"] |
|
for i in range(1, int(resource.attrib["totalAmount"]) + 1): |
|
nr_id = "%s_%d" % (resource.attrib["id"], i) |
|
pools_json[resource.attrib["id"]]["resource_list"].append({ |
|
"id": nr_id, |
|
"name": "%s_%d" % (resource.attrib["name"], i), |
|
"cost_per_hour": resource.attrib["costPerHour"], |
|
"amount": 1 |
|
}) |
|
resource_pools[resource.attrib["id"]].append(nr_id) |
|
resource_calendars[nr_id] = calendars_map[calendar_id] |
|
|
|
task_resource_dist = dict() |
|
for e_inf in simod_elements: |
|
task_id = e_inf.attrib["elementId"] |
|
rpool_id = e_inf.find("qbp:resourceIds", simod_ns).find("qbp:resourceId", simod_ns).text |
|
dist_info = e_inf.find("qbp:durationDistribution", simod_ns) |
|
|
|
t_dist = extract_dist_params(dist_info) |
|
if task_id not in task_resource_dist: |
|
task_resource_dist[task_id] = dict() |
|
for rp_id in resource_pools[rpool_id]: |
|
task_resource_dist[task_id][rp_id] = t_dist |
|
|
|
# 5.Saving all in a single JSON file |
|
|
|
to_save = { |
|
"resource_profiles": pools_json, |
|
"arrival_time_distribution": arrival_time_dist, |
|
"arrival_time_calendar": calendars_map[arrival_calendar_id], |
|
"gateway_branching_probabilities": gateways_branching, |
|
"task_resource_distribution": task_resource_dist, |
|
"resource_calendars": resource_calendars, |
|
} |
|
with open(out_file, 'w') as file_writter: |
|
json.dump(to_save, file_writter) |
Hey, @orlenyslp. It seems like this function produces the outdated JSON format. Is there a new one somewhere I'm missing?
Prosimos/bpdfr_simulation_engine/simulation_properties_parser.py
Lines 151 to 242 in ee59295