mtt_haum/code/04_conformance-checking.py

145 lines
5.8 KiB
Python

# 04_conformance-checking.py
#
# content: (1) Load data and create event log
# (2) Check against normative Petri Net
#
# input: results/event_logfiles_2024-02-21_16-07-33.csv
# results/normative_petrinet.pnml
# output: results/eval_all-miners_complete.csv
# results/eval_all-miners_clean.csv
# ../../thesis/figures/petrinet_normative.png
# ../../thesis/figures/petrinet_heuristics_clean.png
# ../../thesis/figures/petrinet_alpha_clean.png
# ../../thesis/figures/petrinet_inductive_clean.png
# ../../thesis/figures/petrinet_ilp_clean.png
# ../../thesis/figures/bpmn_normative.png
# ../../thesis/figures/bpmn_inductive_clean.png
# ../../thesis/figures/bpmn_ilp_clean.png
# ../../thesis/figures/bpmn_alpha_clean.png
# ../../thesis/figures/bpmn_heuristics_clean.png
#
# last mod: 2024-03-22
import pm4py
import pandas as pd
import numpy as np
from python_helpers import eval_pm, pn_infos_miner
#--------------- (1) Load data and create event logs ---------------
dat = pd.read_csv("results/event_logfiles_2024-02-21_16-07-33.csv", sep = ";")
event_log = pm4py.format_dataframe(dat, case_id = "path",
activity_key = "event",
timestamp_key = "date.start")
## Descriptives of log data
# Distribution of events
event_log.event.value_counts()
event_log.event.value_counts(normalize = True)
# Number of paths
len(event_log.path.unique())
# Number of variants
variants = pm4py.get_variants(event_log)
len(variants)
sorted_variants = dict(sorted(variants.items(), key=lambda item: item[1], reverse = True))
{k: sorted_variants[k] for k in list(sorted_variants)[:20]}
filtered_log = event_log[event_log["event"] != "move"]
variants_no_move = pm4py.get_variants(filtered_log)
len(variants_no_move)
sorted_variants_no_move = dict(sorted(variants_no_move.items(), key=lambda item: item[1], reverse = True))
{k: sorted_variants_no_move[k] for k in list(sorted_variants_no_move)[:20]}
#--------------- (2) Check against normative Petri Net ---------------
basenet, initial_marking, final_marking = pm4py.read_pnml("results/normative_petrinet.pnml")
# TBR
replayed_traces = pm4py.conformance_diagnostics_token_based_replay(event_log, basenet, initial_marking, final_marking)
l1 = list()
l2 = list()
l3 = list()
l4 = list()
for i in range(len(replayed_traces)):
l1.append(replayed_traces[i]["remaining_tokens"])
l2.append(replayed_traces[i]["missing_tokens"])
l3.append(replayed_traces[i]["reached_marking"])
l4.append(replayed_traces[i]["transitions_with_problems"])
set(l1)
x1 = np.array(l1)
index_broken = np.where(x1 == 1)[0].tolist()
len(index_broken)
set(l3)
l4.count([])
[l3[i] for i in index_broken]
[l4[i] for i in index_broken]
broken_traces = [replayed_traces[i] for i in index_broken]
event_log[event_log["@@case_index"] == index_broken[0]].event
event_log[event_log["@@case_index"] == index_broken[0]].path.unique().tolist()
event_log[event_log["@@case_index"] == index_broken[0]].item.unique().tolist()
event_log[event_log["@@case_index"] == index_broken[0]]["fileId.start"].unique().tolist()
# --> logging error in raw file
## Fitting different miners
eval = pd.DataFrame(columns = ["fitness", "precision", "generalizability",
"simplicity", "sound", "narcs", "ntrans",
"nplaces", "nvariants", "mostfreq"])
for miner in ["normative", "alpha", "heuristics", "inductive", "ilp"]:
eval = pd.concat([eval, pn_infos_miner(event_log, miner)])
eval.to_csv("results/eval_all-miners_complete.csv", sep = ";")
## Without broken trace
event_log_clean = event_log[event_log["@@case_index"] != index_broken[0]]
eval_clean = pd.DataFrame(columns = ["fitness", "precision", "generalizability",
"simplicity", "sound", "narcs", "ntrans",
"nplaces", "nvariants", "mostfreq"])
for miner in ["normative", "alpha", "heuristics", "inductive", "ilp"]:
eval_clean = pd.concat([eval_clean, pn_infos_miner(event_log_clean, miner)])
eval_clean.to_csv("results/eval_all-miners_clean.csv", sep = ";")
## Directly-follows graph
dfg, start_activities, end_activities = pm4py.discover_dfg(event_log_clean)
pm4py.view_dfg(dfg, start_activities, end_activities)
## Export petri nets
pm4py.vis.save_vis_petri_net(basenet, initial_marking, final_marking,
"../../thesis/figures/petrinet_normative.png")
h_net, h_im, h_fm = pm4py.discover_petri_net_heuristics(event_log_clean)
pm4py.vis.save_vis_petri_net(h_net, h_im, h_fm, "../../thesis/figures/petrinet_heuristics_clean.png")
a_net, a_im, a_fm = pm4py.discover_petri_net_alpha(event_log_clean)
pm4py.vis.save_vis_petri_net(a_net, a_im, a_fm, "../../thesis/figures/petrinet_alpha_clean.png")
i_net, i_im, i_fm = pm4py.discover_petri_net_inductive(event_log_clean)
pm4py.vis.save_vis_petri_net(i_net, i_im, i_fm, "../../thesis/figures/petrinet_inductive_clean.png")
ilp_net, ilp_im, ilp_fm = pm4py.discover_petri_net_ilp(event_log_clean)
pm4py.vis.save_vis_petri_net(ilp_net, ilp_im, ilp_fm, "../../thesis/figures/petrinet_ilp_clean.png")
# convert to BPMN
base_bpmn = pm4py.convert.convert_to_bpmn(basenet, initial_marking, final_marking)
pm4py.vis.save_vis_bpmn(base_bpmn, "../../thesis/figures/bpmn_normative.png")
i_bpmn = pm4py.convert.convert_to_bpmn(i_net, i_im, i_fm)
pm4py.vis.save_vis_bpmn(i_bpmn, "../../thesis/figures/bpmn_inductive_clean.png")
ilp_bpmn = pm4py.convert.convert_to_bpmn(ilp_net, ilp_im, ilp_fm)
pm4py.vis.save_vis_bpmn(ilp_bpmn, "../../thesis/figures/bpmn_ilp_clean.png")
a_bpmn = pm4py.convert.convert_to_bpmn(a_net, a_im, a_fm)
pm4py.vis.save_vis_bpmn(a_bpmn, "../../thesis/figures/bpmn_alpha_clean.png")
h_bpmn = pm4py.convert.convert_to_bpmn(h_net, h_im, h_fm)
pm4py.vis.save_vis_bpmn(h_bpmn, "../../thesis/figures/bpmn_heuristics_clean.png")