import pm4py import pandas as pd import numpy as np from python_helpers import eval_pm, pn_infos_miner ###### Load data and create event logs ###### dat = pd.read_csv("results/haum/event_logfiles_2024-02-21_16-07-33.csv", sep = ";") event_log = pm4py.format_dataframe(dat, case_id = "path", activity_key = "event", timestamp_key = "date.start") ###### Descriptives of log data ###### # Distribution of events event_log.event.value_counts() event_log.event.value_counts(normalize = True) # Number of paths len(event_log.path.unique()) # Number of variants variants = pm4py.get_variants(event_log) len(variants) sorted_variants = dict(sorted(variants.items(), key=lambda item: item[1], reverse = True)) {k: sorted_variants[k] for k in list(sorted_variants)[:20]} filtered_log = event_log[event_log["event"] != "move"] variants_no_move = pm4py.get_variants(filtered_log) len(variants_no_move) sorted_variants_no_move = dict(sorted(variants_no_move.items(), key=lambda item: item[1], reverse = True)) {k: sorted_variants_no_move[k] for k in list(sorted_variants_no_move)[:20]} ###### Check against "conformative" Petri Net ###### basenet, initial_marking, final_marking = pm4py.read_pnml("results/haum/conformative_petrinet_con.pnml") # TBR replayed_traces = pm4py.conformance_diagnostics_token_based_replay(event_log, basenet, initial_marking, final_marking) l1 = list() l2 = list() l3 = list() l4 = list() for i in range(len(replayed_traces)): l1.append(replayed_traces[i]["remaining_tokens"]) l2.append(replayed_traces[i]["missing_tokens"]) l3.append(replayed_traces[i]["reached_marking"]) l4.append(replayed_traces[i]["transitions_with_problems"]) set(l1) x1 = np.array(l1) index_broken = np.where(x1 == 1)[0].tolist() len(index_broken) set(l3) l4.count([]) [l3[i] for i in index_broken] [l4[i] for i in index_broken] broken_traces = [replayed_traces[i] for i in index_broken] event_log[event_log["@@case_index"] == index_broken[0]].event event_log[event_log["@@case_index"] == index_broken[0]].path.unique().tolist() event_log[event_log["@@case_index"] == index_broken[0]].item.unique().tolist() event_log[event_log["@@case_index"] == index_broken[0]]["fileId.start"].unique().tolist() # --> logging error in raw file ## Footprints from pm4py.algo.discovery.footprints import algorithm as footprints_discovery from pm4py.visualization.footprints import visualizer as fp_visualizer fp_log = footprints_discovery.apply(event_log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG) fp_net = footprints_discovery.apply(basenet, initial_marking, final_marking) gviz = fp_visualizer.apply(fp_net, parameters={fp_visualizer.Variants.SINGLE.value.Parameters.FORMAT: "svg"}) fp_visualizer.view(gviz) efg_graph = pm4py.discover_eventually_follows_graph(event_log) ## Directly-follows graph dfg, start_activities, end_activities = pm4py.discover_dfg(event_log) pm4py.view_dfg(dfg, start_activities, end_activities) pm4py.save_vis_dfg(dfg, start_activities, end_activities, "results/processmaps/dfg_complete_python.png") ## Fitting different miners eval = pd.DataFrame(columns = ["fitness", "precision", "generalizability", "simplicity", "sound", "narcs", "ntrans", "nplaces", "nvariants", "mostfreq"]) for miner in ["conformative", "alpha", "heuristics", "inductive", "ilp"]: eval = pd.concat([eval, pn_infos_miner(event_log, miner)]) eval.to_csv("results/eval_all-miners_complete.csv", sep = ";") ## Without broken trace event_log_clean = event_log[event_log["@@case_index"] != index_broken[0]] eval_clean = pd.DataFrame(columns = ["fitness", "precision", "generalizability", "simplicity", "sound", "narcs", "ntrans", "nplaces", "nvariants", "mostfreq"]) for miner in ["conformative", "alpha", "heuristics", "inductive", "ilp"]: eval_clean = pd.concat([eval_clean, pn_infos_miner(event_log_clean, miner)]) eval_clean.to_csv("results/eval_all-miners_clean.csv", sep = ";") ## Export petri nets pm4py.vis.save_vis_petri_net(basenet, initial_marking, final_marking, "results/processmaps/petrinet_conformative.png") h_net, h_im, h_fm = pm4py.discover_petri_net_heuristics(event_log_clean) pm4py.vis.save_vis_petri_net(h_net, h_im, h_fm, "results/processmaps/petrinet_heuristics_clean.png") a_net, a_im, a_fm = pm4py.discover_petri_net_alpha(event_log_clean) pm4py.vis.save_vis_petri_net(a_net, a_im, a_fm, "results/processmaps/petrinet_alpha_clean.png") i_net, i_im, i_fm = pm4py.discover_petri_net_inductive(event_log_clean) pm4py.vis.save_vis_petri_net(i_net, i_im, i_fm, "results/processmaps/petrinet_inductive_clean.png") ilp_net, ilp_im, ilp_fm = pm4py.discover_petri_net_ilp(event_log_clean) pm4py.vis.save_vis_petri_net(ilp_net, ilp_im, ilp_fm, "results/processmaps/petrinet_ilp_clean.png") # convert to BPMN base_bpmn = pm4py.convert.convert_to_bpmn(basenet, initial_marking, final_marking) pm4py.vis.save_vis_bpmn(base_bpmn, "results/processmaps/bpmn_conformative.png") i_bpmn = pm4py.convert.convert_to_bpmn(i_net, i_im, i_fm) pm4py.vis.save_vis_bpmn(i_bpmn, "results/processmaps/bpmn_inductive_clean.png") ilp_bpmn = pm4py.convert.convert_to_bpmn(ilp_net, ilp_im, ilp_fm) pm4py.vis.save_vis_bpmn(ilp_bpmn, "results/processmaps/bpmn_ilp_clean.png") a_bpmn = pm4py.convert.convert_to_bpmn(a_net, a_im, a_fm) pm4py.vis.save_vis_bpmn(a_bpmn, "results/processmaps/bpmn_alpha_clean.png") h_bpmn = pm4py.convert.convert_to_bpmn(h_net, h_im, h_fm) pm4py.vis.save_vis_bpmn(h_bpmn, "results/processmaps/bpmn_heuristics_clean.png")