"""Utility script for running the fraud pipeline outside the notebook. This mirrors the notebook logic at a high level and can be extended into a production-style module later. """ from __future__ import annotations import argparse from pathlib import Path import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.pipeline import Pipeline from sklearn.ensemble import IsolationForest from sklearn.linear_model import LogisticRegression CAT_COLS = ["Merchant_Category", "Device_Type"] NUM_COLS = [ "Amount", "Distance_from_Home", "IP_Risk_Score", "Avg_Spending_Habit", "Is_Weekend", "Is_Night_Transaction", "hour", "dayofweek", "month", "log_amount", "amount_to_habit", "cust_avg_amount", "cust_std_amount", "cust_txn_count", "amount_z_cust", "cust_cat_rate", "cust_dev_rate", ] def engineer_features(df: pd.DataFrame) -> pd.DataFrame: d = df.copy() d["Timestamp_parsed"] = pd.to_datetime(d["Timestamp"], dayfirst=True, errors="coerce") d["hour"] = d["Timestamp_parsed"].dt.hour d["dayofweek"] = d["Timestamp_parsed"].dt.dayofweek d["month"] = d["Timestamp_parsed"].dt.month d["log_amount"] = np.log1p(d["Amount"]) d["amount_to_habit"] = d["Amount"] / d["Avg_Spending_Habit"].replace(0, np.nan) d["amount_to_habit"] = d["amount_to_habit"].fillna(d["Amount"]) g = d.groupby("Customer_ID")["Amount"] d["cust_avg_amount"] = g.transform("mean") d["cust_std_amount"] = g.transform("std").fillna(0) d["cust_txn_count"] = g.transform("count") d["amount_z_cust"] = (d["Amount"] - d["cust_avg_amount"]) / d["cust_std_amount"].replace(0, np.nan) d["amount_z_cust"] = d["amount_z_cust"].fillna(0) cust_cat_cnt = d.groupby(["Customer_ID", "Merchant_Category"]).size().rename("cust_cat_cnt").reset_index() d = d.merge(cust_cat_cnt, on=["Customer_ID", "Merchant_Category"], how="left") d["cust_cat_rate"] = d["cust_cat_cnt"] / d["cust_txn_count"] cust_dev_cnt = d.groupby(["Customer_ID", "Device_Type"]).size().rename("cust_dev_cnt").reset_index() d = d.merge(cust_dev_cnt, on=["Customer_ID", "Device_Type"], how="left") d["cust_dev_rate"] = d["cust_dev_cnt"] / d["cust_txn_count"] return d.sort_values("Timestamp_parsed").reset_index(drop=True) def add_rules_flags(df: pd.DataFrame) -> pd.DataFrame: x = df.copy() risk = np.zeros(len(x), dtype=float) risk += np.where(x["IP_Risk_Score"] >= 0.95, 2.0, np.where(x["IP_Risk_Score"] >= 0.90, 1.0, 0.0)) risk += np.where((x["amount_to_habit"] >= 5) | (x["amount_z_cust"] >= 3), 2.0, np.where((x["amount_to_habit"] >= 3) | (x["amount_z_cust"] >= 2), 1.0, 0.0)) risk += np.where(x["Distance_from_Home"] > 1.0, 1.0, 0.0) risk += np.where(x["Is_Night_Transaction"] == 1, 1.0, 0.0) risk += np.where(x["Is_Weekend"] == 1, 0.5, 0.0) risk += np.where(x["cust_cat_rate"] < 0.10, 1.0, 0.0) risk += np.where(x["cust_dev_rate"] < 0.10, 1.0, 0.0) x["rule_risk_score"] = risk x["rule_flag"] = (x["rule_risk_score"] >= 4.0).astype(int) return x def score_dataset(input_csv: Path, output_csv: Path, contamination: float = 0.02, block_rate: float = 0.035) -> None: df = pd.read_csv(input_csv) d = engineer_features(df) cutoff_date = d["Timestamp_parsed"].quantile(0.80) train_df = add_rules_flags(d[d["Timestamp_parsed"] <= cutoff_date].copy()) test_df = add_rules_flags(d[d["Timestamp_parsed"] > cutoff_date].copy()) preprocess = ColumnTransformer( transformers=[ ("cat", OneHotEncoder(handle_unknown="ignore"), CAT_COLS), ("num", StandardScaler(), NUM_COLS), ], remainder="drop", ) X_train_raw = train_df[CAT_COLS + NUM_COLS].copy() X_test_raw = test_df[CAT_COLS + NUM_COLS].copy() preprocess.fit(X_train_raw) X_train_trans = preprocess.transform(X_train_raw) X_test_trans = preprocess.transform(X_test_raw) iso = IsolationForest(n_estimators=300, contamination=contamination, random_state=42) iso.fit(X_train_trans) train_df["iso_anom_score"] = -iso.decision_function(X_train_trans) test_df["iso_anom_score"] = -iso.decision_function(X_test_trans) thr_iso = np.quantile(train_df["iso_anom_score"], 1 - contamination) train_df["iso_flag"] = (train_df["iso_anom_score"] >= thr_iso).astype(int) test_df["iso_flag"] = (test_df["iso_anom_score"] >= thr_iso).astype(int) train_df["pseudo_is_fraud"] = ((train_df["rule_flag"] == 1) | (train_df["iso_flag"] == 1)).astype(int) test_df["pseudo_is_fraud"] = ((test_df["rule_flag"] == 1) | (test_df["iso_flag"] == 1)).astype(int) d2 = pd.concat([train_df, test_df], axis=0).sort_values("Timestamp_parsed").reset_index(drop=True) X_all = d2[CAT_COLS + NUM_COLS].copy() y_all = d2["pseudo_is_fraud"].copy() clf = Pipeline(steps=[ ("preprocess", preprocess), ("model", LogisticRegression(max_iter=2000, class_weight="balanced")), ]) clf.fit(X_all, y_all) d2["fraud_probability"] = clf.predict_proba(X_all)[:, 1] thr_full = np.quantile(d2["fraud_probability"], 1 - block_rate) d2["is_fraud"] = (d2["fraud_probability"] >= thr_full).astype(int) out = df.copy() out["fraud_probability"] = d2["fraud_probability"].values out["is_fraud"] = d2["is_fraud"].values output_csv.parent.mkdir(parents=True, exist_ok=True) out.to_csv(output_csv, index=False) def main() -> None: parser = argparse.ArgumentParser(description="Score transactions for suspected fraud.") parser.add_argument("--infile", type=Path, required=True, help="Path to input CSV file") parser.add_argument("--outfile", type=Path, required=True, help="Path to output CSV file") parser.add_argument("--contamination", type=float, default=0.02, help="IsolationForest contamination") parser.add_argument("--block-rate", type=float, default=0.035, help="Final fraud block rate") args = parser.parse_args() score_dataset(args.infile, args.outfile, args.contamination, args.block_rate) if __name__ == "__main__": main()