-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathPredictiveModel.py
More file actions
107 lines (86 loc) · 4.54 KB
/
PredictiveModel.py
File metadata and controls
107 lines (86 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from SequenceEncoder import SequenceEncoder
from TextTransformers import LDATransformer, PVTransformer, BoNGTransformer, NBLogCountRatioTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import pandas as pd
import time
import numpy as np
class PredictiveModel():
def __init__(self, nr_events, case_id_col, label_col, encoder_kwargs, transformer_kwargs, cls_kwargs, text_col=None,
text_transformer_type=None, cls_method="rf"):
self.text_col = text_col
self.case_id_col = case_id_col
self.label_col = label_col
self.encoder = SequenceEncoder(nr_events=nr_events, case_id_col=case_id_col, label_col=label_col, **encoder_kwargs)
if text_transformer_type is None:
self.transformer = None
elif text_transformer_type == "LDATransformer":
self.transformer = LDATransformer(**transformer_kwargs)
elif text_transformer_type == "BoNGTransformer":
self.transformer = BoNGTransformer(**transformer_kwargs)
elif text_transformer_type == "NBLogCountRatioTransformer":
self.transformer = NBLogCountRatioTransformer(**transformer_kwargs)
elif text_transformer_type == "PVTransformer":
self.transformer = PVTransformer(**transformer_kwargs)
else:
print("Transformer type not known")
if cls_method == "logit":
self.cls = LogisticRegression(**cls_kwargs)
elif cls_method == "rf":
self.cls = RandomForestClassifier(**cls_kwargs)
else:
print("Classifier method not known")
self.hardcoded_prediction = None
self.test_encode_time = None
self.test_preproc_time = None
self.test_time = None
self.nr_test_cases = None
def fit(self, dt_train):
preproc_start_time = time.time()
train_encoded = self.encoder.fit_transform(dt_train)
train_X = train_encoded.drop([self.case_id_col, self.label_col], axis=1)
train_y = train_encoded[self.label_col]
if self.transformer is not None:
text_cols = [col for col in train_X.columns.values if col.startswith(self.text_col)]
for col in text_cols:
train_X[col] = train_X[col].astype('str')
train_text = self.transformer.fit_transform(train_X[text_cols], train_y)
train_X = pd.concat([train_X.drop(text_cols, axis=1), train_text], axis=1)
self.train_X = train_X
preproc_end_time = time.time()
self.preproc_time = preproc_end_time - preproc_start_time
cls_start_time = time.time()
if len(train_y.unique()) < 2: # less than 2 classes are present
self.hardcoded_prediction = train_y.iloc[0]
self.cls.classes_ = train_y.unique()
else:
self.cls.fit(train_X, train_y)
cls_end_time = time.time()
self.cls_time = cls_end_time - cls_start_time
def predict_proba(self, dt_test):
encode_start_time = time.time()
test_encoded = self.encoder.transform(dt_test)
encode_end_time = time.time()
self.test_encode_time = encode_end_time - encode_start_time
test_preproc_start_time = time.time()
test_X = test_encoded.drop([self.case_id_col, self.label_col], axis=1)
if self.transformer is not None:
text_cols = [col for col in test_X.columns.values if col.startswith(self.text_col)]
for col in text_cols:
test_encoded[col] = test_encoded[col].astype('str')
test_text = self.transformer.transform(test_encoded[text_cols])
test_X = pd.concat([test_X.drop(text_cols, axis=1), test_text], axis=1)
self.test_case_names = test_encoded[self.case_id_col]
self.test_X = test_X
self.test_y = test_encoded[self.label_col]
test_preproc_end_time = time.time()
self.test_preproc_time = test_preproc_end_time - test_preproc_start_time
test_start_time = time.time()
if self.hardcoded_prediction is not None: # e.g. model was trained with one class only
predictions_proba = np.array([1.0,0.0]*test_X.shape[0]).reshape(test_X.shape[0],2)
else:
predictions_proba = self.cls.predict_proba(test_X)
test_end_time = time.time()
self.test_time = test_end_time - test_start_time
self.nr_test_cases = len(predictions_proba)
return predictions_proba