-
-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathcloudflare.py
More file actions
97 lines (81 loc) · 3.82 KB
/
cloudflare.py
File metadata and controls
97 lines (81 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
from urllib.parse import urljoin
import requests
class Cloudflare:
base_url = "https://api.cloudflare.com/client/v4/"
def __init__(self, email, key):
self.__auth_email = email
self.__auth_key = key
self.__auth_headers = {
"X-AUTH-KEY": key,
"X-AUTH-EMAIL": email,
}
def get_http_headers(self):
headers = self.__auth_headers.copy()
headers["Content-Type"] = "application/json"
return headers
def request(self, path, query_string=None, data=None, headers=None, method="get"):
request_headers = self.get_http_headers()
method = method.lower()
if headers is not None:
request_headers.update(headers)
if method in ["post", "put", "delete"] and data:
data = json.dumps(data)
http_request_func = getattr(requests, method)
url = urljoin(self.base_url, path)
response = http_request_func(url, params=query_string, headers=request_headers, data=data)
data = response.json()
if not data.get("success", False):
raise ValueError(f"Error on respose: {data}")
return data
def paginate(self, path, query_string=None, data=None, headers=None, method="get", result_class=dict):
query_string = query_string or {}
finished, page, cursor = False, 1, None
while not finished:
if page is not None:
query_string["page"] = page
else:
if "page" in query_string:
del query_string["page"]
query_string["cursor"] = cursor
response = self.request(path, query_string=query_string, data=data, headers=headers, method=method)
for result in response["result"]:
yield result_class(result)
pagination_info = response.get("result_info", None)
if pagination_info is None: # No pagination
finished = True
continue
if "page" in pagination_info:
page += 1
finished = pagination_info["page"] == pagination_info["total_pages"]
elif "cursors" in pagination_info:
page = None
if "after" in pagination_info["cursors"]:
cursor = pagination_info["cursors"]["after"]
else:
finished = True
else:
raise RuntimeError("Received unrecognized pagination information")
def accounts(self):
yield from self.paginate("accounts")
def rules_list(self, account_id):
yield from self.paginate(f"accounts/{account_id}/rules/lists")
def rules_list_items(self, account_id, list_id):
yield from self.paginate(f"accounts/{account_id}/rules/lists/{list_id}/items")
def add_rule_list_items(self, account_id, list_id, ips, comment=None):
# docs: https://api.cloudflare.com/#rules-lists-create-list-items
path = f"accounts/{account_id}/rules/lists/{list_id}/items"
comment = (comment or "").strip()
data = [{"ip": ip, "comment": comment} for ip in ips]
response = self.request(path, data=data, method="POST")
return response["result"]
def get_operation_status(self, account_id, operation_id):
# docs: https://api.cloudflare.com/#rules-lists-get-bulk-operation
path = f"accounts/{account_id}/rules/lists/bulk_operations/{operation_id}"
return self.request(path)
def delete_rule_list_items(self, account_id, list_id, list_items_ids):
# docs: https://api.cloudflare.com/#rules-lists-delete-list-items
path = f"accounts/{account_id}/rules/lists/{list_id}/items"
data = {"items": [{"id": id_} for id_ in list_items_ids]}
response = self.request(path, data=data, method="DELETE")
return response["result"]