Skip to content

Commit 13b34ae

Browse files
rkentishRobert Kentishtimo-reymann
authored
feat: Added client_secret support and refresh token functionality (#90)
* Added client_secret and refresh token function * Update oauth2_cli_auth/code_grant.py Co-authored-by: Timo Reymann <mail@timo-reymann.de> * Update oauth2_cli_auth/code_grant.py Co-authored-by: Timo Reymann <mail@timo-reymann.de> * Update oauth2_cli_auth/code_grant.py Co-authored-by: Timo Reymann <mail@timo-reymann.de> * Update oauth2_cli_auth/code_grant.py Co-authored-by: Timo Reymann <mail@timo-reymann.de> * Implement suggestions from PR review and added unit tests for token refresh * Fixed SonarCloud warnings --------- Co-authored-by: Robert Kentish <robert@jakarisolutions.com> Co-authored-by: Timo Reymann <mail@timo-reymann.de>
1 parent 3c33c14 commit 13b34ae

File tree

5 files changed

+68
-13
lines changed

5 files changed

+68
-13
lines changed

oauth2_cli_auth/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
"""
44
from oauth2_cli_auth.__version__ import __version__
55
from oauth2_cli_auth.http_server import OAuthCallbackHttpServer
6-
from oauth2_cli_auth.code_grant import OAuth2ClientInfo, exchange_code_for_access_token, get_auth_url, open_browser, load_oidc_config
6+
from oauth2_cli_auth.code_grant import OAuth2ClientInfo, refresh_access_token, exchange_code_for_access_token, exchange_code_for_response, get_auth_url, open_browser, load_oidc_config
77
from oauth2_cli_auth.simplified_flow import get_access_token_with_browser_open

oauth2_cli_auth/code_grant.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ class OAuth2ClientInfo:
2626
client_id: str
2727
"""Id of the client to request for"""
2828

29+
client_secret: str | None
30+
"""Secret of the client to request for"""
31+
2932
scopes: list[str]
3033
"""List of scopes to request"""
3134

3235
@staticmethod
33-
def from_oidc_endpoint(oidc_config_endpoint: str, client_id: str, scopes: list[str]) -> "OAuth2ClientInfo":
36+
def from_oidc_endpoint(oidc_config_endpoint: str, client_id: str, scopes: list[str] = [], client_secret: str | None = None) -> "OAuth2ClientInfo":
3437
"""
3538
Create client information object from well known endpoint in format as specified in
3639
https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
@@ -45,6 +48,7 @@ def from_oidc_endpoint(oidc_config_endpoint: str, client_id: str, scopes: list[s
4548
authorization_url=config.get("authorization_endpoint"),
4649
token_url=config.get("token_endpoint"),
4750
client_id=client_id,
51+
client_secret=client_secret,
4852
scopes=scopes,
4953
)
5054

@@ -102,7 +106,7 @@ def exchange_code_for_response(
102106
"""
103107
headers = {
104108
"Content-Type": "application/x-www-form-urlencoded",
105-
"Authorization": "Basic " + base64.b64encode(f"{client_info.client_id}:".encode()).decode(),
109+
"Authorization": "Basic " + base64.b64encode(f"{client_info.client_id}:{client_info.client_secret if client_info.client_secret is not None else ''}".encode()).decode(),
106110
}
107111

108112
data = {
@@ -117,6 +121,32 @@ def exchange_code_for_response(
117121

118122
return json_response
119123

124+
def refresh_access_token(
125+
client_info: OAuth2ClientInfo,
126+
refresh_token: str,
127+
) -> dict:
128+
"""
129+
Refresh an access token using the endpoints from client info
130+
131+
:param client_info: Info about oauth2 client
132+
:param refresh_token: Refresh token to use
133+
:return: Response from OAuth2 endpoint
134+
"""
135+
headers = {
136+
"Content-Type": "application/x-www-form-urlencoded",
137+
"Authorization": "Basic " + base64.b64encode(f"{client_info.client_id}:{client_info.client_secret if client_info.client_secret is not None else ''}".encode()).decode(),
138+
}
139+
140+
data = {
141+
"refresh_token": refresh_token,
142+
"grant_type": "refresh_token",
143+
}
144+
encoded_data = urllib.parse.urlencode(data).encode('utf-8')
145+
146+
request = urllib.request.Request(client_info.token_url, data=encoded_data, headers=headers)
147+
json_response = _load_json(request)
148+
149+
return json_response
120150

121151
def exchange_code_for_access_token(
122152
client_info: OAuth2ClientInfo,

oauth2_cli_auth/code_grant_test.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,46 @@
33
from unittest.mock import patch
44

55
from oauth2_cli_auth import OAuth2ClientInfo, get_auth_url, exchange_code_for_access_token, load_oidc_config
6+
from oauth2_cli_auth.code_grant import exchange_code_for_response, refresh_access_token
7+
8+
REDIRECT_URI = "http://localhost:123"
69

710
client_info = OAuth2ClientInfo(
811
client_id="dummy",
12+
client_secret=None,
913
authorization_url="https://auth.com/oauth/authorize",
1014
token_url="https://auth.com/oauth/token",
11-
scopes=["openid", "profile"]
15+
scopes=["openid", "profile"],
1216
)
1317

1418

1519
def test_get_auth_url():
16-
auth_url = get_auth_url(client_info, "http://localhost:123")
20+
auth_url = get_auth_url(client_info, REDIRECT_URI)
1721
assert auth_url == (
18-
'https://auth.com/oauth/authorize?client_id=dummy&redirect_uri=http://localhost:123&scope=openid+'
22+
f'https://auth.com/oauth/authorize?client_id=dummy&redirect_uri={REDIRECT_URI}&scope=openid+'
1923
'profile&response_type=code'
2024
)
2125

26+
def test_exchange_code_for_response(create_urlopen_mock):
27+
with create_urlopen_mock(io.BytesIO(b'{"access_token":"the_token","token_type":"Bearer","expires_in":3600,"refresh_token":"the_refresh_token","scope":"create"}')):
28+
response = exchange_code_for_response(client_info, REDIRECT_URI, "code")
29+
assert "the_token" == response.get("access_token")
30+
assert "Bearer" == response.get("token_type")
31+
assert 3600 == response.get("expires_in")
32+
assert "the_refresh_token" == response.get("refresh_token")
33+
assert "create" == response.get("scope")
34+
35+
def test_refresh_access_token(create_urlopen_mock):
36+
with create_urlopen_mock(io.BytesIO(b'{"access_token":"the_token","token_type":"Bearer","expires_in":3600,"refresh_token":"the_refresh_token"}')):
37+
response = refresh_access_token(client_info, "the_refresh_token")
38+
assert "the_token" == response.get("access_token")
39+
assert "Bearer" == response.get("token_type")
40+
assert 3600 == response.get("expires_in")
41+
assert "the_refresh_token" == response.get("refresh_token")
2242

2343
def test_exchange_code_for_access_token(create_urlopen_mock):
2444
with create_urlopen_mock(io.BytesIO(b'{"access_token": "the_token"}')):
25-
assert "the_token" == exchange_code_for_access_token(client_info, "http://localhost:123", "code")
26-
45+
assert "the_token" == exchange_code_for_access_token(client_info, REDIRECT_URI, "code")
2746

2847
def test_load_oidc_config(create_urlopen_mock):
2948
with create_urlopen_mock(io.BytesIO(b'{"token_endpoint": "https://gitlab.com/oauth/token","authorization_endpoint": "https://gitlab.com/oauth/authorize"}')):
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import threading
23
import urllib
34
from urllib.error import HTTPError
@@ -6,21 +7,22 @@
67

78
from oauth2_cli_auth import OAuthCallbackHttpServer
89

10+
PORT = 49152 + random.randrange(15000)
911

1012
def test_http_server_ok():
11-
server = OAuthCallbackHttpServer(5000)
13+
server = OAuthCallbackHttpServer(PORT)
1214

1315
threading.Thread(target=server.handle_request).start()
14-
with urllib.request.urlopen("http://localhost:5000?code=foo") as response:
16+
with urllib.request.urlopen(f"http://localhost:{PORT}?code=foo") as response:
1517
content = response.read().decode("utf-8")
1618
assert content is not None
1719

1820

1921
def test_http_server_bad_request():
20-
server = OAuthCallbackHttpServer(5000)
22+
server = OAuthCallbackHttpServer(PORT)
2123

2224
threading.Thread(target=server.handle_request).start()
2325
with pytest.raises(HTTPError, match="HTTP Error 400: Bad Request") as e:
24-
with urllib.request.urlopen("http://localhost:5000") as response:
26+
with urllib.request.urlopen(f"http://localhost:{PORT}") as response:
2527
content = response.read().decode("utf-8")
2628
assert "Oh snap" in content

oauth2_cli_auth/simplified_flow_test.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import io
2+
import random
23
import urllib
34
import webbrowser
45
from unittest.mock import patch
56

67
from oauth2_cli_auth import get_access_token_with_browser_open, OAuth2ClientInfo
78

9+
PORT = 49152 + random.randrange(15000)
10+
811
client_info = OAuth2ClientInfo(
912
client_id="dummy",
13+
client_secret=None,
1014
authorization_url="http://auth.com/oauth/authorize",
1115
token_url="http://auth.com/oauth/token",
1216
scopes=["openid", "profile"]
@@ -19,7 +23,7 @@
1923
def test_get_access_token_with_browser_open(webbrowser_open, get_code, handle_request):
2024
with patch.object(urllib.request, 'urlopen', return_value=io.BytesIO(b'{"access_token": "the_token"}')):
2125
get_code.return_value = "code"
22-
assert "the_token" == get_access_token_with_browser_open(client_info, 8080)
26+
assert "the_token" == get_access_token_with_browser_open(client_info, PORT)
2327

2428
assert get_code.call_count == 2
2529
assert handle_request.call_count == 1

0 commit comments

Comments
 (0)