Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/workshop/core/pipelines/adf/adf_pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"name": "Azure_SQL_ML_Pipeline",
"properties": {
"activities": [
{
"name": "copy data from sql",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "@concat('select * from green_taxi WHERE lpepPickupDatetime >','''',formatDateTime(adddays(utcnow(),-3190), 'yyyy-MM-dd'),'''')\n",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings"
},
"formatSettings": {
"type": "ParquetWriteSettings"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "AzureSqlDemo",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "parquetdata",
"type": "DatasetReference"
}
]
},
{
"name": "Machine Learning Execute Pipeline",
"type": "AzureMLExecutePipeline",
"dependsOn": [
{
"activity": "copy data from sql",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"mlPipelineEndpointId": "3337b14a-4a0a-47d3-817b-e88e1e7c68e6"
},
"linkedServiceName": {
"referenceName": "amlws01ent",
"type": "LinkedServiceReference"
}
}
],
"annotations": [],
"lastPublishTime": "2022-10-05T21:24:10Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
54 changes: 54 additions & 0 deletions src/workshop/core/pipelines/batch_scoring_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: MLOps-Batch-Scoring-Pipeline
compute: azureml:cpu-cluster
settings:
force_rerun: true
jobs:
data_engineering:
type: command
component: .components/data_engineering_comp.yml
inputs:
input_folder:
type: uri_folder
mode: ro_mount
path: azureml://datastores/workspaceblobstore/paths/mlops_workshop_data/batch_scoring/inputs
outputs:
output_folder:
type: mltable
mode: rw_mount
scoring:
type: parallel
mini_batch_size: "1"
mini_batch_error_threshold: -1
max_concurrency_per_instance: 2
retry_settings:
max_retries: 1
timeout: 60
resources:
instance_count: 2
inputs:
scoring_data_folder:
type: mltable
mode: eval_mount
path: ${{parent.jobs.data_engineering.outputs.output_folder}}
outputs:
predictions_data_folder:
type: uri_folder
mode: rw_mount
path: azureml://datastores/workspaceblobstore/paths/mlops_workshop_data/batch_scoring/predictions
prediction_log:
type: uri_file
mode: rw_mount
input_data: ${{inputs.scoring_data_folder}}
task:
type: function
code: ../scoring/batch_scoring
entry_script: batch_score.py
environment:
name: mlops_batchscoring
conda_file: ../scoring/conda.yml
image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest
program_arguments: --predictions_data_folder ${{outputs.predictions_data_folder}}
append_row_to: ${{outputs.prediction_log}}

19 changes: 19 additions & 0 deletions src/workshop/core/pipelines/components/data_engineering_comp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: data_engineering
display_name: data engineering
version: 10
type: command
code: ../scoring/batch_scoring
command: >-
python data_engineering.py --input_folder ${{inputs.input_folder}} --output_folder ${{outputs.output_folder}};
inputs:
input_folder:
type: uri_folder
outputs:
output_folder:
type: mltable
is_deterministic: false
environment:
name: mlops_batchscoring
conda_file: ../scoring/conda.yml
image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest
34 changes: 0 additions & 34 deletions src/workshop/core/scoring/batch_score.py

This file was deleted.

48 changes: 48 additions & 0 deletions src/workshop/core/scoring/batch_scoring/batch_score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

import os
import tempfile
import logging
from azureml.core.model import Model
import pickle
import pandas as pd
from azureml.core import Run
import os
import mlflow
import argparse,os,datetime

def init():
global model,predictions_data_folder
parser = argparse.ArgumentParser()
parser.add_argument("--predictions_data_folder", type=str)
parser.add_argument("--model_name",default='nyc_fare_prediction',type=str, help="Name of the model in workspace")
args, unknown = parser.parse_known_args()
predictions_data_folder = args.predictions_data_folder
print("predictions_data_folder",predictions_data_folder)
current_run = Run.get_context()
ws = current_run.experiment.workspace
model = Model(ws,args.model_name)
model.download(exist_ok=True)
model = mlflow.sklearn.load_model(args.model_name)

def run(mini_batch):


print(f'run method start: {__file__}, run({mini_batch})')
i =0
for file in mini_batch:
# prepare each image
data = pd.read_parquet(file)
print("data shape ", data.shape)
predictions = model.predict(data)
data["prediction"] =predictions
today = datetime.datetime.today()
year = today.year
month = today.month
day = today.day
folder = "{:02d}-{:02d}-{:4d}".format(month,day,year)
os.makedirs(predictions_data_folder+"/"+folder, exist_ok=True)
data.to_csv(predictions_data_folder+"/"+folder+"/prediction.csv")
i+=1


return [1]*i
11 changes: 11 additions & 0 deletions src/workshop/core/scoring/batch_scoring/conda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: workshop-online-scoring
channels:
- conda-forge
dependencies:
- python=3.8.12
- pip=21.3.1
- pip:
- azureml-mlflow==1.38.0
- azureml-defaults==1.38.0
- pandas==1.3.5
- scikit-learn==1.0.2
121 changes: 121 additions & 0 deletions src/workshop/core/scoring/batch_scoring/data_engineering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import pandas as pd
import numpy as np
from datetime import datetime
import argparse
import os

import argparse,os
import pandas as pd
import datetime
# data engineering

# read arguments

def parse_args():
# setup arg parser
parser = argparse.ArgumentParser()


# add arguments
parser.add_argument("--nyc_file_name", type=str, default="green_taxi.parquet")
parser.add_argument("--public_holiday_file_name", type=str, default="holidays.parquet")
parser.add_argument("--weather_file_name", type=str, default="weather.parquet")
parser.add_argument('--input_folder', type=str)
parser.add_argument('--output_folder', type=str)

# parse args
args = parser.parse_args()

# return args
return args


def build_time_features(vector):
pickup_datetime = vector[0]
month_num = pickup_datetime.month
day_of_month = pickup_datetime.day
day_of_week = pickup_datetime.weekday()
hour_of_day = pickup_datetime.hour
country_code = "US"
hr_sin = np.sin(hour_of_day*(2.*np.pi/24))
hr_cos = np.cos(hour_of_day*(2.*np.pi/24))
dy_sin = np.sin(day_of_week*(2.*np.pi/7))
dy_cos = np.cos(day_of_week*(2.*np.pi/7))

return pd.Series((month_num, day_of_month, day_of_week, hour_of_day, country_code, hr_sin, hr_cos, dy_sin, dy_cos))
def engineer_features(green_taxi_df,holidays_df,weather_df ):

green_taxi_df[["month_num", "day_of_month","day_of_week", "hour_of_day", "country_code", "hr_sin", "hr_cos", "dy_sin", "dy_cos"]] = \
green_taxi_df[["lpepPickupDatetime"]].apply(build_time_features, axis=1)

columns_to_remove = ["lpepDropoffDatetime", "puLocationId", "doLocationId", "extra", "mtaTax",
"improvementSurcharge", "tollsAmount", "ehailFee", "tripType", "rateCodeID",
"storeAndFwdFlag", "paymentType", "fareAmount", "tipAmount"]

green_taxi_df.drop(columns_to_remove, axis=1, inplace=True)


green_taxi_df["datetime"] = green_taxi_df["lpepPickupDatetime"].dt.normalize()


holidays_df = holidays_df.rename(columns={"countryRegionCode": "country_code"})
holidays_df["datetime"] = holidays_df["date"].dt.normalize()

holidays_df.drop(["countryOrRegion", "holidayName", "date"], axis=1, inplace=True)

taxi_holidays_df = pd.merge(green_taxi_df, holidays_df, how="left", on=["datetime", "country_code"])
taxi_holidays_df[taxi_holidays_df["normalizeHolidayName"].notnull()]



weather_df["datetime"] = weather_df["datetime"].dt.normalize()

# group by datetime
aggregations = {"precipTime": "max", "temperature": "mean", "precipDepth": "max"}
weather_df_grouped = weather_df.groupby("datetime").agg(aggregations)

taxi_holidays_weather_df = pd.merge(taxi_holidays_df, weather_df_grouped, how="left", on=["datetime"])

final_df = taxi_holidays_weather_df.query("pickupLatitude>=40.53 and pickupLatitude<=40.88 and \
pickupLongitude>=-74.09 and pickupLongitude<=-73.72 and \
tripDistance>0 and tripDistance<75 and \
passengerCount>0 and passengerCount<100")
return final_df

def main(args):

# read in data
today = datetime.datetime.today()
year = today.year
month = today.month
day = today.day
folder = "{:02d}-{:02d}-{:4d}".format(month,day,year)
green_taxi_df = pd.read_parquet(os.path.join(args.input_folder,folder, args.nyc_file_name))


holidays_df = pd.read_parquet(os.path.join(args.input_folder,folder, args.public_holiday_file_name))

weather_df = pd.read_parquet(os.path.join(args.input_folder,folder,args.weather_file_name))

final_df = engineer_features(green_taxi_df, holidays_df, weather_df)
# if os.path.exists(args.output_folder):
# os.remove(args.output_folder)

final_df.to_parquet(args.output_folder+"/data.parquet")
print("done writing data")
ml_table_content = """
paths:
- pattern: ./*.parquet
"""
with open(os.path.join(args.output_folder,"MLTable"),'w') as mltable_file:
mltable_file.writelines(ml_table_content)



# run script
if __name__ == "__main__":
# parse args
args = parse_args()

# run main function
main(args)