forked from indigo-dc/flaat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconftest.py
103 lines (87 loc) · 2.82 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
Setup for all tests.
If the tests are run by CI, we mock user infos here.
"""
import logging
from typing import Optional
from pytest import MonkeyPatch
from flaat import BaseFlaat, test_env
from flaat.access_tokens import AccessTokenInfo
from flaat.exceptions import FlaatUnauthenticated
from flaat.test_env import FLAAT_AT, OIDC_AGENT_ACCOUNT
from flaat.user_infos import UserInfos
logger = logging.getLogger(__name__)
logging.getLogger("requests_cache").setLevel(logging.WARN)
logging.getLogger("urllib3").setLevel(logging.WARN)
logging.getLogger("asyncio").setLevel(logging.WARN)
# mock data
_jwt_issuer = "https://mock.issuer.jwt"
_non_jwt_issuer = "https://mock.issuer.non.jwt"
_jwt_user_infos = UserInfos(
AccessTokenInfo({}, verification=None),
{
"iss": _jwt_issuer,
"sub": "mock_sub",
"email": "[email protected]",
"mock_groups": ["foo", "bar"],
"mock_entitlements": [
"urn:mace:egi.eu:group:test:foo",
"urn:mace:egi.eu:group:test:bar",
],
},
None,
)
_non_jwt_user_infos = UserInfos(
None,
{
"iss": _non_jwt_issuer,
"sub": "non_jwt_mock_sub",
"mock_groups": ["foo", "bar"],
"email": "[email protected]",
"mock_entitlements": [
"urn:mace:egi.eu:group:test:foo",
"urn:mace:egi.eu:group:test:bar",
],
},
None,
)
def _mock_get_user_infos_from_access_token(
self: BaseFlaat, at, issuer_hint=""
) -> Optional[UserInfos]:
logger.debug("Mock called for access token: %s %s", at, issuer_hint)
if issuer_hint == "https://invalid.issuer.org":
raise FlaatUnauthenticated("mock_unauthenticated")
if at == "invalid_at":
return None
info = None
if at == "mock_jwt_at":
info = _jwt_user_infos
if at == "mock_non_jwt_at":
info = _non_jwt_user_infos
if info is not None:
if not self._issuer_is_trusted(info.issuer):
raise FlaatUnauthenticated(
f"Issuer {info.issuer} not trusted (trusted: {self.trusted_op_list} {self.iss})"
)
return info
return None
def mock_user_for_ci():
logger.debug("Monkey patching BaseFlaat as we have no access token")
mp = MonkeyPatch()
mp.setattr(
BaseFlaat,
"get_user_infos_from_access_token",
_mock_get_user_infos_from_access_token,
)
for (key, value) in [
("FLAAT_ISS", _jwt_issuer),
("NON_JWT_FLAAT_ISS", _non_jwt_issuer),
("FLAAT_TRUSTED_OPS_LIST", [_jwt_issuer, _non_jwt_issuer]),
("FLAAT_CLAIM_GROUP", "mock_groups"),
("FLAAT_CLAIM_ENTITLEMENT", "mock_entitlements"),
("FLAAT_AT", "mock_jwt_at"),
("NON_JWT_FLAAT_AT", "mock_non_jwt_at"),
]:
mp.setattr(test_env, key, value)
if OIDC_AGENT_ACCOUNT == "" or FLAAT_AT == "":
mock_user_for_ci()