-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfraud_model_pipeline.py
More file actions
133 lines (110 loc) · 6.01 KB
/
fraud_model_pipeline.py
File metadata and controls
133 lines (110 loc) · 6.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Utility script for running the fraud pipeline outside the notebook.
This mirrors the notebook logic at a high level and can be extended into a
production-style module later.
"""
from __future__ import annotations
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.ensemble import IsolationForest
from sklearn.linear_model import LogisticRegression
CAT_COLS = ["Merchant_Category", "Device_Type"]
NUM_COLS = [
"Amount", "Distance_from_Home", "IP_Risk_Score", "Avg_Spending_Habit",
"Is_Weekend", "Is_Night_Transaction", "hour", "dayofweek", "month",
"log_amount", "amount_to_habit", "cust_avg_amount", "cust_std_amount",
"cust_txn_count", "amount_z_cust", "cust_cat_rate", "cust_dev_rate",
]
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
d = df.copy()
d["Timestamp_parsed"] = pd.to_datetime(d["Timestamp"], dayfirst=True, errors="coerce")
d["hour"] = d["Timestamp_parsed"].dt.hour
d["dayofweek"] = d["Timestamp_parsed"].dt.dayofweek
d["month"] = d["Timestamp_parsed"].dt.month
d["log_amount"] = np.log1p(d["Amount"])
d["amount_to_habit"] = d["Amount"] / d["Avg_Spending_Habit"].replace(0, np.nan)
d["amount_to_habit"] = d["amount_to_habit"].fillna(d["Amount"])
g = d.groupby("Customer_ID")["Amount"]
d["cust_avg_amount"] = g.transform("mean")
d["cust_std_amount"] = g.transform("std").fillna(0)
d["cust_txn_count"] = g.transform("count")
d["amount_z_cust"] = (d["Amount"] - d["cust_avg_amount"]) / d["cust_std_amount"].replace(0, np.nan)
d["amount_z_cust"] = d["amount_z_cust"].fillna(0)
cust_cat_cnt = d.groupby(["Customer_ID", "Merchant_Category"]).size().rename("cust_cat_cnt").reset_index()
d = d.merge(cust_cat_cnt, on=["Customer_ID", "Merchant_Category"], how="left")
d["cust_cat_rate"] = d["cust_cat_cnt"] / d["cust_txn_count"]
cust_dev_cnt = d.groupby(["Customer_ID", "Device_Type"]).size().rename("cust_dev_cnt").reset_index()
d = d.merge(cust_dev_cnt, on=["Customer_ID", "Device_Type"], how="left")
d["cust_dev_rate"] = d["cust_dev_cnt"] / d["cust_txn_count"]
return d.sort_values("Timestamp_parsed").reset_index(drop=True)
def add_rules_flags(df: pd.DataFrame) -> pd.DataFrame:
x = df.copy()
risk = np.zeros(len(x), dtype=float)
risk += np.where(x["IP_Risk_Score"] >= 0.95, 2.0, np.where(x["IP_Risk_Score"] >= 0.90, 1.0, 0.0))
risk += np.where((x["amount_to_habit"] >= 5) | (x["amount_z_cust"] >= 3), 2.0,
np.where((x["amount_to_habit"] >= 3) | (x["amount_z_cust"] >= 2), 1.0, 0.0))
risk += np.where(x["Distance_from_Home"] > 1.0, 1.0, 0.0)
risk += np.where(x["Is_Night_Transaction"] == 1, 1.0, 0.0)
risk += np.where(x["Is_Weekend"] == 1, 0.5, 0.0)
risk += np.where(x["cust_cat_rate"] < 0.10, 1.0, 0.0)
risk += np.where(x["cust_dev_rate"] < 0.10, 1.0, 0.0)
x["rule_risk_score"] = risk
x["rule_flag"] = (x["rule_risk_score"] >= 4.0).astype(int)
return x
def score_dataset(input_csv: Path, output_csv: Path, contamination: float = 0.02, block_rate: float = 0.035) -> None:
df = pd.read_csv(input_csv)
d = engineer_features(df)
cutoff_date = d["Timestamp_parsed"].quantile(0.80)
train_df = add_rules_flags(d[d["Timestamp_parsed"] <= cutoff_date].copy())
test_df = add_rules_flags(d[d["Timestamp_parsed"] > cutoff_date].copy())
preprocess = ColumnTransformer(
transformers=[
("cat", OneHotEncoder(handle_unknown="ignore"), CAT_COLS),
("num", StandardScaler(), NUM_COLS),
],
remainder="drop",
)
X_train_raw = train_df[CAT_COLS + NUM_COLS].copy()
X_test_raw = test_df[CAT_COLS + NUM_COLS].copy()
preprocess.fit(X_train_raw)
X_train_trans = preprocess.transform(X_train_raw)
X_test_trans = preprocess.transform(X_test_raw)
iso = IsolationForest(n_estimators=300, contamination=contamination, random_state=42)
iso.fit(X_train_trans)
train_df["iso_anom_score"] = -iso.decision_function(X_train_trans)
test_df["iso_anom_score"] = -iso.decision_function(X_test_trans)
thr_iso = np.quantile(train_df["iso_anom_score"], 1 - contamination)
train_df["iso_flag"] = (train_df["iso_anom_score"] >= thr_iso).astype(int)
test_df["iso_flag"] = (test_df["iso_anom_score"] >= thr_iso).astype(int)
train_df["pseudo_is_fraud"] = ((train_df["rule_flag"] == 1) | (train_df["iso_flag"] == 1)).astype(int)
test_df["pseudo_is_fraud"] = ((test_df["rule_flag"] == 1) | (test_df["iso_flag"] == 1)).astype(int)
d2 = pd.concat([train_df, test_df], axis=0).sort_values("Timestamp_parsed").reset_index(drop=True)
X_all = d2[CAT_COLS + NUM_COLS].copy()
y_all = d2["pseudo_is_fraud"].copy()
clf = Pipeline(steps=[
("preprocess", preprocess),
("model", LogisticRegression(max_iter=2000, class_weight="balanced")),
])
clf.fit(X_all, y_all)
d2["fraud_probability"] = clf.predict_proba(X_all)[:, 1]
thr_full = np.quantile(d2["fraud_probability"], 1 - block_rate)
d2["is_fraud"] = (d2["fraud_probability"] >= thr_full).astype(int)
out = df.copy()
out["fraud_probability"] = d2["fraud_probability"].values
out["is_fraud"] = d2["is_fraud"].values
output_csv.parent.mkdir(parents=True, exist_ok=True)
out.to_csv(output_csv, index=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Score transactions for suspected fraud.")
parser.add_argument("--infile", type=Path, required=True, help="Path to input CSV file")
parser.add_argument("--outfile", type=Path, required=True, help="Path to output CSV file")
parser.add_argument("--contamination", type=float, default=0.02, help="IsolationForest contamination")
parser.add_argument("--block-rate", type=float, default=0.035, help="Final fraud block rate")
args = parser.parse_args()
score_dataset(args.infile, args.outfile, args.contamination, args.block_rate)
if __name__ == "__main__":
main()