forked from kianhean/AWSBuildMonitor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
118 lines (84 loc) · 3.31 KB
/
app.py
File metadata and controls
118 lines (84 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from flask import Flask
from flask import render_template
import datetime
import arrow
import boto3
import json
import configparser
def loop_pipines(pipelines):
""" Given List of Pipes, Return the Stages """
pipes = []
for pipe in pipelines:
data = get_pipeline_status(pipe)
output = parse_pipeline_status(data)
pipes.append(output)
return pipes
def get_pipeline_status(pipelinename):
""" Get Pipeline State From Config File
http://boto3.readthedocs.io/en/latest/reference/services/codepipeline.html#CodePipeline.Client.get_pipeline_state
Returns : Dictionary
"""
client = boto3.client('codepipeline')
return client.get_pipeline_state(name=pipelinename)
def parse_pipeline_status(dict_data):
""" Parse State Json """
pipeline_name = dict_data['pipelineName']
blocks = {}
list_blocks = []
# Compile Data
for item in dict_data['stageStates']:
blocks['name'] = item['actionStates'][0]['actionName']
try:
blocks['status'] = item['actionStates'][0]['latestExecution']['status']
except:
blocks['status'] = 'InProgress'
try:
blocks['percentage'] = item['actionStates'][0]['latestExecution']['percentComplete']
except:
blocks['percentage'] = 'Unknown'
# Get Human Readable Arrow
try:
last = item['actionStates'][0]['latestExecution']['lastStatusChange']
blocks['last'] = (arrow.get(last)).humanize()
except:
blocks['last'] = 'Never'
if blocks['status'] == 'Failed':
blocks['error_msg'] = item['actionStates'][0]['latestExecution']['errorDetails']['message']
list_blocks.append(blocks.copy())
return {'Name': pipeline_name, 'Stages':list_blocks}
app = Flask(__name__)
@app.route("/")
def dashboard():
""" Dashboard Live Page """
# Set Keys from Config
config = configparser.ConfigParser()
config.sections()
config.read('enviroment.ini')
pipelines = config['development']['pipelinename'].split(",")
projectname = config['development']['projectname']
refresh = config['development']['refresh']
# Build List of Pipelines
pipes = loop_pipines(pipelines)
return render_template('pipeline.html', pipes=pipes, projectname=projectname, refresh=refresh)
@app.route("/local")
def dashboard_test():
""" Dashboard Local Test Page """
# Set Keys from Config
config = configparser.ConfigParser()
config.sections()
config.read('enviroment.ini.sample')
pipelines = config['development']['pipelinename'].split(",")
projectname = config['development']['projectname']
refresh = config['development']['refresh']
# Gather Data for Local Page
pipes = []
for pipe in pipelines:
with open("fixtures/" + pipe + ".json") as json_data:
# Load JSON File and Mock Output
data = json.load(json_data)
# Convert Datetime to Python Datetime to Simulate Boto3
for time_last in data['stageStates']:
time_last['actionStates'][0]['latestExecution']['lastStatusChange'] = datetime.date.today()
output = parse_pipeline_status(data)
pipes.append(output)
return render_template('pipeline.html', pipes=pipes, projectname=projectname, refresh=refresh)