# 04_conformance-checking.py # # content: (1) Load data and create event log # (2) Check against normative Petri Net # # input: results/event_logfiles_2024-02-21_16-07-33.csv # results/normative_petrinet.pnml # output: results/eval_all-miners_complete.csv # results/eval_all-miners_clean.csv # ../../thesis/figures/petrinet_normative.png # ../../thesis/figures/petrinet_heuristics_clean.png # ../../thesis/figures/petrinet_alpha_clean.png # ../../thesis/figures/petrinet_inductive_clean.png # ../../thesis/figures/petrinet_ilp_clean.png # ../../thesis/figures/bpmn_normative.png # ../../thesis/figures/bpmn_inductive_clean.png # ../../thesis/figures/bpmn_ilp_clean.png # ../../thesis/figures/bpmn_alpha_clean.png # ../../thesis/figures/bpmn_heuristics_clean.png # # last mod: 2024-03-22 import pm4py import pandas as pd import numpy as np from python_helpers import eval_pm, pn_infos_miner #--------------- (1) Load data and create event logs --------------- dat = pd.read_csv("results/event_logfiles_2024-02-21_16-07-33.csv", sep = ";") event_log = pm4py.format_dataframe(dat, case_id = "path", activity_key = "event", timestamp_key = "date.start") ## Descriptives of log data # Distribution of events event_log.event.value_counts() event_log.event.value_counts(normalize = True) # Number of paths len(event_log.path.unique()) # Number of variants variants = pm4py.get_variants(event_log) len(variants) sorted_variants = dict(sorted(variants.items(), key=lambda item: item[1], reverse = True)) {k: sorted_variants[k] for k in list(sorted_variants)[:20]} filtered_log = event_log[event_log["event"] != "move"] variants_no_move = pm4py.get_variants(filtered_log) len(variants_no_move) sorted_variants_no_move = dict(sorted(variants_no_move.items(), key=lambda item: item[1], reverse = True)) {k: sorted_variants_no_move[k] for k in list(sorted_variants_no_move)[:20]} #--------------- (2) Check against normative Petri Net --------------- basenet, initial_marking, final_marking = pm4py.read_pnml("results/normative_petrinet.pnml") # TBR replayed_traces = pm4py.conformance_diagnostics_token_based_replay(event_log, basenet, initial_marking, final_marking) l1 = list() l2 = list() l3 = list() l4 = list() for i in range(len(replayed_traces)): l1.append(replayed_traces[i]["remaining_tokens"]) l2.append(replayed_traces[i]["missing_tokens"]) l3.append(replayed_traces[i]["reached_marking"]) l4.append(replayed_traces[i]["transitions_with_problems"]) set(l1) x1 = np.array(l1) index_broken = np.where(x1 == 1)[0].tolist() len(index_broken) set(l3) l4.count([]) [l3[i] for i in index_broken] [l4[i] for i in index_broken] broken_traces = [replayed_traces[i] for i in index_broken] event_log[event_log["@@case_index"] == index_broken[0]].event event_log[event_log["@@case_index"] == index_broken[0]].path.unique().tolist() event_log[event_log["@@case_index"] == index_broken[0]].item.unique().tolist() event_log[event_log["@@case_index"] == index_broken[0]]["fileId.start"].unique().tolist() # --> logging error in raw file ## Fitting different miners eval = pd.DataFrame(columns = ["fitness", "precision", "generalizability", "simplicity", "sound", "narcs", "ntrans", "nplaces", "nvariants", "mostfreq"]) for miner in ["normative", "alpha", "heuristics", "inductive", "ilp"]: eval = pd.concat([eval, pn_infos_miner(event_log, miner)]) eval.to_csv("results/eval_all-miners_complete.csv", sep = ";") ## Without broken trace event_log_clean = event_log[event_log["@@case_index"] != index_broken[0]] eval_clean = pd.DataFrame(columns = ["fitness", "precision", "generalizability", "simplicity", "sound", "narcs", "ntrans", "nplaces", "nvariants", "mostfreq"]) for miner in ["normative", "alpha", "heuristics", "inductive", "ilp"]: eval_clean = pd.concat([eval_clean, pn_infos_miner(event_log_clean, miner)]) eval_clean.to_csv("results/eval_all-miners_clean.csv", sep = ";") ## Directly-follows graph dfg, start_activities, end_activities = pm4py.discover_dfg(event_log_clean) pm4py.view_dfg(dfg, start_activities, end_activities) ## Export petri nets pm4py.vis.save_vis_petri_net(basenet, initial_marking, final_marking, "../../thesis/figures/petrinet_normative.png") h_net, h_im, h_fm = pm4py.discover_petri_net_heuristics(event_log_clean) pm4py.vis.save_vis_petri_net(h_net, h_im, h_fm, "../../thesis/figures/petrinet_heuristics_clean.png") a_net, a_im, a_fm = pm4py.discover_petri_net_alpha(event_log_clean) pm4py.vis.save_vis_petri_net(a_net, a_im, a_fm, "../../thesis/figures/petrinet_alpha_clean.png") i_net, i_im, i_fm = pm4py.discover_petri_net_inductive(event_log_clean) pm4py.vis.save_vis_petri_net(i_net, i_im, i_fm, "../../thesis/figures/petrinet_inductive_clean.png") ilp_net, ilp_im, ilp_fm = pm4py.discover_petri_net_ilp(event_log_clean) pm4py.vis.save_vis_petri_net(ilp_net, ilp_im, ilp_fm, "../../thesis/figures/petrinet_ilp_clean.png") # convert to BPMN base_bpmn = pm4py.convert.convert_to_bpmn(basenet, initial_marking, final_marking) pm4py.vis.save_vis_bpmn(base_bpmn, "../../thesis/figures/bpmn_normative.png") i_bpmn = pm4py.convert.convert_to_bpmn(i_net, i_im, i_fm) pm4py.vis.save_vis_bpmn(i_bpmn, "../../thesis/figures/bpmn_inductive_clean.png") ilp_bpmn = pm4py.convert.convert_to_bpmn(ilp_net, ilp_im, ilp_fm) pm4py.vis.save_vis_bpmn(ilp_bpmn, "../../thesis/figures/bpmn_ilp_clean.png") a_bpmn = pm4py.convert.convert_to_bpmn(a_net, a_im, a_fm) pm4py.vis.save_vis_bpmn(a_bpmn, "../../thesis/figures/bpmn_alpha_clean.png") h_bpmn = pm4py.convert.convert_to_bpmn(h_net, h_im, h_fm) pm4py.vis.save_vis_bpmn(h_bpmn, "../../thesis/figures/bpmn_heuristics_clean.png")