-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
123 lines (102 loc) · 3.18 KB
/
utils.py
File metadata and controls
123 lines (102 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import re
from typing import List
import ldap
# import all models and types
from otypes import ProfileType
# instantiate LDAP client
LDAP = ldap.initialize("ldap://ldap.iiit.ac.in")
async def ldap_search(filterstr: str) -> List[tuple]:
"""
Fetchs details from LDAP server of user matching the filters.
Args:
filterstr (str): LDAP filter string.
Returns:
(List[tuple]): List of tuples containing the details of the user.
"""
global LDAP
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None,
lambda: LDAP.search_s(
"ou=Users,dc=iiit,dc=ac,dc=in",
ldap.SCOPE_SUBTREE,
filterstr,
),
)
except ldap.SERVER_DOWN:
# Reconnect to LDAP server and retry the search
LDAP = ldap.initialize("ldap://ldap.iiit.ac.in")
result = await loop.run_in_executor(
None,
lambda: LDAP.search_s(
"ou=Users,dc=iiit,dc=ac,dc=in",
ldap.SCOPE_SUBTREE,
filterstr,
),
)
return result
def get_profile(ldap_result: List) -> ProfileType:
"""
Fetches user's ProfileType from the result of the request to LDAP server.
Args:
ldap_result (List): List of tuples containing the details of the user.
Returns:
(otypes.ProfileType): Contains the profile of the user.
"""
dn, details = ldap_result
ous = re.findall(
r"ou=\w.*?,", dn
) # get list of OUs the current DN belongs to
if "cn" in details:
fullNameList = details["cn"][0].decode().split()
firstName = fullNameList[0]
lastName = " ".join(fullNameList[1:])
elif "givenName" in details and "sn" in details:
firstName = details["givenName"][0].decode()
lastName = details["sn"][0].decode()
else:
small_fn, small_ln = details["uid"].split(".")
firstName = small_fn.capitalize()
lastName = small_ln.capitalize()
# extract optional attributes
gender = None
if "gender" in details:
gender = details["gender"][0].decode()
rollno = None
if "uidNumber" in details:
rollno = details["uidNumber"][0].decode()
elif "sambaSID" in details:
rollno = details["sambaSID"][0].decode()
batch = None
if len(ous) > 1:
# extract batch code from OUs
batch = re.sub(r"ou=(.*)?,", r"\1", ous[1])
# remove the 'dual' suffix if it exists
batch = re.sub(r"dual$", "", batch, flags=re.IGNORECASE)
stream = None
if len(ous) > 0:
# extract stream code from OUs
stream = re.sub(r"ou=(.*)?,", r"\1", ous[0])
uid = None
if "uid" in details:
uid = details["uid"][0].decode()
email = None
if "mail" in details:
email = details["mail"][0].decode()
elif uid is not None:
email = f"{uid}@iiit.ac.in"
else:
email = ""
profile = ProfileType(
uid=uid,
firstName=firstName,
lastName=lastName,
email=email,
gender=gender,
batch=batch,
stream=stream,
rollno=rollno,
)
return profile