mtt_haum/code/pm_conformance-checking.py

267 lines
11 KiB
Python

%reset
import pm4py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
###### Load data and create event logs ######
dat = pd.read_csv("results/haum/event_logfiles_2024-01-18_09-58-52.csv", sep = ";")
dat = dat[dat["date.start"] < "2020-03-13"]
# --> only pre corona (before artworks were updated)
event_log = pm4py.format_dataframe(dat, case_id='path', activity_key='event',
timestamp_key='date.start')
###### Descrptives of log data ######
# Distribution of events
event_log.event.value_counts()
event_log.event.value_counts(normalize=True)
# Number of paths
len(event_log.path.unique())
# Number of variants
variants = pm4py.get_variants(event_log)
len(variants)
sorted_variants = dict(sorted(variants.items(), key=lambda item: item[1], reverse = True))
{k: sorted_variants[k] for k in list(sorted_variants)[:20]}
filtered_log = event_log[event_log["event"] != "move"]
variants_no_move = pm4py.get_variants(filtered_log)
len(variants_no_move)
sorted_variants_no_move = dict(sorted(variants_no_move.items(), key=lambda item: item[1], reverse = True))
{k: sorted_variants_no_move[k] for k in list(sorted_variants_no_move)[:20]}
###### Read "conformative" Petri Net ######
basenet, initial_marking, final_marking = pm4py.read_pnml("results/conformative_petrinet_con.pnml")
def eval_pm(data, net, initial_marking, final_marking):
"""Caculate fitness, precision, generalizability, and simplicity for petri net"""
fitness = pm4py.fitness_token_based_replay(data, net, initial_marking, final_marking)
precisison = pm4py.precision_token_based_replay(data, net, initial_marking, final_marking)
generalizability = pm4py.algo.evaluation.generalization.algorithm.apply(data, net,
initial_marking, final_marking)
simplicity = pm4py.algo.evaluation.simplicity.algorithm.apply(net)
return [fitness['average_trace_fitness'], precisison, generalizability, simplicity]
baseline_eval = eval_pm(event_log, basenet, initial_marking, final_marking)
# TBR
replayed_traces = pm4py.conformance_diagnostics_token_based_replay(event_log, basenet, initial_marking, final_marking)
l1 = list()
l2 = list()
l3 = list()
l4 = list()
for i in range(len(replayed_traces)):
l1.append(replayed_traces[i]["remaining_tokens"])
l2.append(replayed_traces[i]["missing_tokens"])
l3.append(replayed_traces[i]["reached_marking"])
l4.append(replayed_traces[i]["transitions_with_problems"])
set(l1)
x1 = np.array(l1)
index_broken = np.where(x1 == 1)[0].tolist()
set(l3)
l4.count([])
[l3[i] for i in index_broken]
[l4[i] for i in index_broken]
broken_traces = [replayed_traces[i] for i in index_broken]
event_log[event_log['@@case_index'] == index_broken].event
event_log[event_log['@@case_index'] == index_broken].path.unique().tolist()
event_log[event_log['@@case_index'] == index_broken].item.unique().tolist()
event_log[event_log['@@case_index'] == index_broken]["fileId.start"].unique().tolist()
# --> logging error in raw file
# Footprints
from pm4py.algo.discovery.footprints import algorithm as footprints_discovery
from pm4py.visualization.footprints import visualizer as fp_visualizer
fp_log = footprints_discovery.apply(event_log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG)
fp_net = footprints_discovery.apply(basenet, initial_marking, final_marking)
gviz = fp_visualizer.apply(fp_net, parameters={fp_visualizer.Variants.SINGLE.value.Parameters.FORMAT: "svg"})
fp_visualizer.view(gviz)
pm4py.vis.view_petri_net(basenet, initial_marking, final_marking)
is_sound = pm4py.check_soundness(basenet, initial_marking, final_marking)
baseline_eval.append(is_sound[0])
baseline_eval.append(len(basenet.arcs))
baseline_eval.append(len(basenet.transitions))
baseline_eval.append(len(basenet.places))
efg_graph = pm4py.discover_eventually_follows_graph(event_log)
## Directly-follows graph
dfg, start_activities, end_activities = pm4py.discover_dfg(event_log)
pm4py.view_dfg(dfg, start_activities, end_activities)
pm4py.save_vis_dfg(dfg, start_activities, end_activities, '../figures/processmaps/dfg_complete.png')
## Fitting different miners
### Heuristics Miner
h_net, im, fm = pm4py.discover_petri_net_heuristics(event_log)
h_eval = eval_pm(event_log, h_net, im, fm)
is_sound = pm4py.check_soundness(h_net, im, fm)
h_eval.append(is_sound[0])
h_eval.append(len(h_net.arcs))
h_eval.append(len(h_net.transitions))
h_eval.append(len(h_net.places))
## Alpha Miner
a_net, im, fm = pm4py.discover_petri_net_alpha(event_log)
a_eval = eval_pm(event_log, a_net, im, fm)
is_sound = pm4py.check_soundness(a_net, im, fm)
a_eval.append(is_sound[0])
a_eval.append(len(a_net.arcs))
a_eval.append(len(a_net.transitions))
a_eval.append(len(a_net.places))
## Inductive Miner
i_net, im, fm = pm4py.discover_petri_net_inductive(event_log)
i_eval = eval_pm(event_log, i_net, im, fm)
is_sound = pm4py.check_soundness(i_net, im, fm)
i_eval.append(is_sound[0])
i_eval.append(len(i_net.arcs))
i_eval.append(len(i_net.transitions))
i_eval.append(len(i_net.places))
## ILP Miner
ilp_net, im, fm = pm4py.discover_petri_net_ilp(event_log)
ilp_eval = eval_pm(event_log, ilp_net, im, fm)
is_sound = pm4py.check_soundness(ilp_net, im, fm)
ilp_eval.append(is_sound[0])
ilp_eval.append(len(ilp_net.arcs))
ilp_eval.append(len(ilp_net.transitions))
ilp_eval.append(len(ilp_net.places))
## Export for all miners
eval = pd.DataFrame(np.row_stack([baseline_eval, h_eval, a_eval, i_eval, ilp_eval]))
eval.columns = ["fitness", "precision", "generalizability", "simplicity",
"sound", "narcs", "ntrans", "nplaces"]
eval.index = ["conformative", "heuristics", "alpha", "inductive", "ilp"]
eval
eval.to_csv("results/eval_all-miners_complete.csv", sep=" ")
## Without broken trace
event_log_clean = event_log[event_log['@@case_index'] != index_broken[0]]
h_net, h_im, h_fm = pm4py.discover_petri_net_heuristics(event_log_clean)
a_net, a_im, a_fm = pm4py.discover_petri_net_alpha(event_log_clean)
i_net, i_im, i_fm = pm4py.discover_petri_net_inductive(event_log_clean)
ilp_net, ilp_im, ilp_fm = pm4py.discover_petri_net_ilp(event_log_clean)
baseline_eval = eval_pm(event_log_clean, basenet, initial_marking, final_marking)
is_sound = pm4py.check_soundness(basenet, initial_marking, final_marking)
baseline_eval.append(is_sound[0])
baseline_eval.append(len(basenet.arcs))
baseline_eval.append(len(basenet.transitions))
baseline_eval.append(len(basenet.places))
h_eval = eval_pm(event_log_clean, h_net, h_im, h_fm)
is_sound = pm4py.check_soundness(h_net, h_im, h_fm)
h_eval.append(is_sound[0])
h_eval.append(len(h_net.arcs))
h_eval.append(len(h_net.transitions))
h_eval.append(len(h_net.places))
a_eval = eval_pm(event_log_clean, a_net, a_im, a_fm)
is_sound = pm4py.check_soundness(a_net, a_im, a_fm)
a_eval.append(is_sound[0])
a_eval.append(len(a_net.arcs))
a_eval.append(len(a_net.transitions))
a_eval.append(len(a_net.places))
i_eval = eval_pm(event_log_clean, i_net, i_im, i_fm)
is_sound = pm4py.check_soundness(i_net, i_im, i_fm)
i_eval.append(is_sound[0])
i_eval.append(len(i_net.arcs))
i_eval.append(len(i_net.transitions))
i_eval.append(len(i_net.places))
ilp_eval = eval_pm(event_log_clean, ilp_net, ilp_im, ilp_fm)
is_sound = pm4py.check_soundness(ilp_net, ilp_im, ilp_fm)
ilp_eval.append(is_sound[0])
ilp_eval.append(len(ilp_net.arcs))
ilp_eval.append(len(ilp_net.transitions))
ilp_eval.append(len(ilp_net.places))
eval = pd.DataFrame(np.row_stack([baseline_eval, h_eval, a_eval, i_eval, ilp_eval]))
eval.columns = ["fitness", "precision", "generalizability", "simplicity",
"sound", "narcs", "ntrans", "nplaces"]
eval.index = ["conformative", "heuristics", "alpha", "inductive", "ilp"]
eval
eval.to_csv("results/eval_all-miners_clean.csv", sep=" ")
# Export petri nets
pm4py.vis.save_vis_petri_net(h_net, h_im, h_fm, "results/processmaps/petrinet_heuristics_clean.png")
pm4py.vis.save_vis_petri_net(a_net, a_im, a_fm, "results/processmaps/petrinet_alpha_clean.png")
pm4py.vis.save_vis_petri_net(i_net, i_im, i_fm, "results/processmaps/petrinet_inductive_clean.png")
pm4py.vis.save_vis_petri_net(ilp_net, ilp_im, ilp_fm, "results/processmaps/petrinet_ilp_clean.png")
pm4py.vis.save_vis_petri_net(basenet, initial_marking, final_marking, "results/processmaps/petrinet_conformative.png")
# convert to BPMN
base_bpmn = pm4py.convert.convert_to_bpmn(basenet, initial_marking, final_marking)
pm4py.vis.save_vis_bpmn(base_bpmn, "results/processmaps/bpmn_conformative.png")
i_bpmn = pm4py.convert.convert_to_bpmn(i_net, i_im, i_fm)
pm4py.vis.save_vis_bpmn(i_bpmn, "results/processmaps/bpmn_inductive_clean.png")
ilp_bpmn = pm4py.convert.convert_to_bpmn(ilp_net, ilp_im, ilp_fm)
pm4py.vis.save_vis_bpmn(ilp_bpmn, "results/processmaps/bpmn_ilp_clean.png")
a_bpmn = pm4py.convert.convert_to_bpmn(a_net, a_im, a_fm)
pm4py.vis.save_vis_bpmn(a_bpmn, "results/processmaps/bpmn_alpha_clean.png")
h_bpmn = pm4py.convert.convert_to_bpmn(h_net, h_im, h_fm)
pm4py.vis.save_vis_bpmn(h_bpmn, "results/processmaps/bpmn_heuristics_clean.png")
###### Process Mining - individual artworks ######
def pm_artworks(miner):
retval1 = np.empty((len(event_log["item"].unique()), 4))
retval2 = np.empty((len(event_log["item"].unique()), 4))
for i in range(len(event_log["item"].unique())):
artwork = event_log["item"].unique()[i]
subdata = pm4py.filter_event_attribute_values(event_log, "item",
[artwork],
level="case", retain=True)
if miner == "heuristics":
subnet, subim, subfm = pm4py.discover_petri_net_heuristics(subdata)
elif miner == "inductive":
subnet, subim, subfm = pm4py.discover_petri_net_inductive(subdata)
elif miner == "alpha":
subnet, subim, subfm = pm4py.discover_petri_net_alpha(subdata)
elif miner == "ilp":
subnet, subim, subfm = pm4py.discover_petri_net_ilp(subdata)
#pm4py.save_vis_petri_net(subnet, subim, subfm,
# "results/processmaps/artworks/petrinet_" + miner + "_" + str(artwork).zfill(3) + ".png")
retval1[i] = eval_pm(subdata, basenet, initial_marking, final_marking)
retval2[i] = eval_pm(subdata, subnet, subim, subfm)
retval1 = pd.DataFrame(retval1)
retval1.columns = ["fitness", "precision", "generalizability", "simplicity"]
retval1.index = event_log["item"].unique()
retval1.insert(0, "nettype", "alldata")
retval2 = pd.DataFrame(retval2)
retval2.columns = ["fitness", "precision", "generalizability", "simplicity"]
retval2.index = event_log["item"].unique()
retval2.insert(0, "nettype", "subdata")
return pd.concat([retval1, retval2])
for miner in ["heuristics", "inductive", "alpha", "ilp"]:
eval_art = pm_artworks(miner = miner)
eval_art.to_csv("results/eval_artworks_" + miner + ".csv", sep=";")