@@ -49,12 +49,19 @@ def setUp(self):
49
49
)
50
50
self .introspect_url_patcher .start ()
51
51
52
+ self .required_audience_patcher = mock .patch (
53
+ 'netbox_diode_plugin.api.authentication.get_required_token_audience' ,
54
+ return_value = []
55
+ )
56
+ self .required_audience_mock = self .required_audience_patcher .start ()
57
+
52
58
def tearDown (self ):
53
59
"""Clean up after tests."""
54
60
self .cache_patcher .stop ()
55
61
self .cache_set_patcher .stop ()
56
62
self .requests_patcher .stop ()
57
63
self .introspect_url_patcher .stop ()
64
+ self .required_audience_patcher .stop ()
58
65
59
66
def test_authenticate_no_auth_header (self ):
60
67
"""Test authentication with no Authorization header."""
@@ -103,6 +110,42 @@ def test_authenticate_token_with_required_scope(self):
103
110
self .assertEqual (user , self .diode_user .user )
104
111
self .cache_set_mock .assert_called_once ()
105
112
113
+ def test_authenticate_token_with_required_audience (self ):
114
+ """Test authentication with token having required audience."""
115
+ self .cache_get_mock .return_value = None
116
+ self .requests_mock .return_value .json .return_value = {
117
+ 'active' : True ,
118
+ 'scope' : 'netbox:read netbox:write' ,
119
+ 'exp' : 1000 ,
120
+ 'iat' : 500
121
+ }
122
+
123
+ request = self .factory .get ('/' , HTTP_AUTHORIZATION = f'Bearer { self .token_with_scope } ' )
124
+
125
+ self .cache_get_mock .return_value = None
126
+ self .required_audience_mock .return_value = ['netbox' ]
127
+ try :
128
+ # should fail if the token does not have the required audience
129
+ with self .assertRaises (AuthenticationFailed ):
130
+ self .auth .authenticate (request )
131
+ self .required_audience_mock .assert_called_once ()
132
+ self .cache_set_mock .assert_not_called ()
133
+
134
+ # should succeed if the token has the required audience
135
+ self .requests_mock .return_value .json .return_value = {
136
+ 'active' : True ,
137
+ 'aud' : ['netbox' , 'api' , 'other' ],
138
+ 'scope' : 'netbox:read netbox:write' ,
139
+ 'exp' : 1000 ,
140
+ 'iat' : 500
141
+ }
142
+
143
+ user , _ = self .auth .authenticate (request )
144
+ self .assertEqual (user , self .diode_user .user )
145
+ self .cache_set_mock .assert_called_once ()
146
+ finally :
147
+ self .required_audience_patcher .return_value = []
148
+
106
149
def test_authenticate_token_introspection_failure (self ):
107
150
"""Test authentication when token introspection fails."""
108
151
self .cache_get_mock .return_value = None
0 commit comments