1
- use std:: { collections:: HashMap , net:: IpAddr } ;
1
+ use std:: {
2
+ collections:: HashMap ,
3
+ net:: SocketAddr ,
4
+ sync:: { RwLock , Arc } ,
5
+ time:: Duration ,
6
+ } ;
2
7
3
- use crate :: { credentials:: { Credentials , CredentialsKind } , AuthenticationType , error:: Error , permissions:: Permissions } ;
8
+ use crate :: { credentials:: { DeclaredCredentials , DCredentialsResolveGuard , RequestCredentials , CredentialsKind } , AuthenticationType , error:: Error , permissions:: Permissions } ;
4
9
5
10
use super :: { users_storage:: UsersStorage , Authenticator } ;
6
11
7
12
use sha2:: { Digest , Sha512 } ;
8
13
14
+ use tokio:: net:: lookup_host;
15
+
16
+ #[ inline]
17
+ async fn lookup ( hostname : & str ) -> Option < Vec < SocketAddr > > {
18
+ lookup_host ( hostname) . await
19
+ . ok ( )
20
+ . map ( |addr| addr. collect ( ) )
21
+ }
22
+
23
+ type NodesCredentials = HashMap < String , DCredentialsResolveGuard > ;
24
+
9
25
#[ derive( Debug , Default , Clone ) ]
10
26
pub struct Basic < Storage : UsersStorage > {
11
27
users_storage : Storage ,
12
- nodes : HashMap < IpAddr , Vec < Credentials > > ,
28
+ nodes : Arc < RwLock < NodesCredentials > > ,
29
+ resolve_sleep_period_ms : u64 ,
13
30
}
14
31
15
32
impl < Storage : UsersStorage > Basic < Storage > {
16
- pub fn new ( users_storage : Storage ) -> Self {
33
+ pub fn new ( users_storage : Storage , resolve_sleep_period_ms : u64 ) -> Self {
17
34
Self {
18
35
users_storage,
19
- nodes : HashMap :: new ( ) ,
36
+ nodes : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
37
+ resolve_sleep_period_ms,
20
38
}
21
39
}
22
40
41
+ fn node_creds_ok ( creds : & HashMap < String , DeclaredCredentials > ) -> bool {
42
+ creds. values ( )
43
+ . all ( |cred| cred. validate_internode ( ) )
44
+ }
45
+
23
46
pub fn set_nodes_credentials (
24
47
& mut self ,
25
- nodes : HashMap < IpAddr , Vec < Credentials > > ,
48
+ mut nodes : HashMap < String , DeclaredCredentials > ,
26
49
) -> Result < ( ) , Error > {
27
- if nodes
28
- . values ( )
29
- . all ( |creds|
30
- creds
31
- . iter ( )
32
- . all ( |cred|
33
- cred. ip ( ) . is_some ( ) &&
34
- cred. kind ( ) . map ( |k| k. is_internode ( ) ) == Some ( true ) ) )
35
- {
36
- self . nodes = nodes;
50
+ if Self :: node_creds_ok ( & nodes) {
51
+ let mut nodes_creds = self . nodes . write ( ) . expect ( "nodes credentials lock" ) ;
52
+ for ( nodename, cred) in nodes. drain ( ) {
53
+ let mut guard = DCredentialsResolveGuard :: new ( cred. clone ( ) , self . resolve_sleep_period_ms ) ;
54
+ if cred. ip ( ) . is_empty ( ) && cred. hostname ( ) . is_some ( ) {
55
+ guard. set_in_progress ( ) ;
56
+ nodes_creds. insert ( nodename, guard) ;
57
+ self . spawn_resolver ( cred) ;
58
+ } else {
59
+ nodes_creds. insert ( nodename, guard) ;
60
+ }
61
+ }
37
62
Ok ( ( ) )
38
63
} else {
39
64
let message = "nodes credentials missing ip or node name" ;
40
65
Err ( Error :: CredentialsNotProvided ( message. to_string ( ) ) )
41
66
}
42
67
}
43
68
44
- fn check_node_request ( & self , node_name : & String , ip : Option < IpAddr > ) -> Option < bool > {
45
- if self . nodes . is_empty ( ) {
46
- warn ! ( "nodes credentials not set" ) ;
69
+ fn spawn_resolver ( & self , cred : DeclaredCredentials ) {
70
+ tokio:: spawn ( Self :: resolve_worker ( self . nodes . clone ( ) , cred, self . resolve_sleep_period_ms ) ) ;
71
+ }
72
+
73
+ async fn resolve_worker ( nodes : Arc < RwLock < NodesCredentials > > , cred : DeclaredCredentials , sleep_period_ms : u64 ) {
74
+ let hostname = cred. hostname ( ) . as_ref ( ) . expect ( "resolve worker without hostname" ) ;
75
+ let mut addr: Option < Vec < SocketAddr > > = lookup ( hostname) . await ;
76
+ let mut cur_sleep_period_ms = 100 ;
77
+ while addr. is_none ( ) || ( addr. is_some ( ) && addr. as_ref ( ) . unwrap ( ) . len ( ) == 0 ) {
78
+ tokio:: time:: sleep ( Duration :: from_millis ( cur_sleep_period_ms) ) . await ;
79
+
80
+ addr = lookup ( hostname) . await ;
81
+
82
+ cur_sleep_period_ms = sleep_period_ms. min ( cur_sleep_period_ms * 2 ) ;
83
+ }
84
+
85
+ let addr = addr. expect ( "somehow addr is none" ) ;
86
+ if let CredentialsKind :: InterNode ( nodename) = cred. kind ( ) {
87
+ let mut nodes = nodes. write ( ) . expect ( "nodes credentials lock" ) ;
88
+ if let Some ( creds) = nodes. get_mut ( nodename) {
89
+ creds. set_resolved ( addr) ;
90
+ }
91
+ } else {
92
+ error ! ( "resolved credentials are not internode" ) ;
93
+ }
94
+ }
95
+
96
+ fn process_auth_result ( & self , nodename : & str , authenticated : bool ) {
97
+ let mut nodes = self . nodes . write ( ) . expect ( "nodes credentials lock" ) ;
98
+ if let Some ( node) = nodes. get_mut ( nodename) {
99
+ if node. update_resolve_state ( authenticated) {
100
+ node. set_in_progress ( ) ;
101
+ self . spawn_resolver ( node. creds ( ) . clone ( ) ) ;
102
+ }
47
103
}
48
- self . nodes
49
- . get ( ip. as_ref ( ) ?)
50
- . map ( |creds| {
51
- for cred in creds {
52
- if let Some ( CredentialsKind :: InterNode ( other_name) ) = cred. kind ( ) {
53
- if node_name == other_name {
54
- return true ;
55
- }
104
+ }
105
+
106
+ fn check_node_request ( & self , node_name : & String , ip : Option < SocketAddr > ) -> bool {
107
+ let mut authenticated = false ;
108
+ let mut needs_update = false ;
109
+ {
110
+ if ip. is_none ( ) {
111
+ return false ;
112
+ }
113
+ let ip = ip. unwrap ( ) . ip ( ) ;
114
+ let nodes = self . nodes . read ( ) . expect ( "nodes credentials lock" ) ;
115
+ if let Some ( guard) = nodes. get ( node_name) {
116
+ let cred = guard. creds ( ) ;
117
+ if let CredentialsKind :: InterNode ( other_name) = cred. kind ( ) {
118
+ debug_assert ! ( node_name == other_name) ;
119
+ if cred. ip ( ) . iter ( ) . find ( |cred_ip| cred_ip. ip ( ) == ip) . is_some ( ) {
120
+ authenticated = true ;
56
121
}
122
+ needs_update = guard. needs_update ( authenticated) ;
57
123
}
58
- false
59
- } )
124
+ }
125
+ }
126
+ if needs_update {
127
+ self . process_auth_result ( node_name, authenticated) ;
128
+ }
129
+ authenticated
60
130
}
61
131
62
- fn check_credentials_common ( & self , credentials : Credentials ) -> Result < Permissions , Error > {
132
+ fn check_credentials_common ( & self , credentials : RequestCredentials ) -> Result < Permissions , Error > {
63
133
match credentials. kind ( ) {
64
134
Some ( CredentialsKind :: Basic { username, password } ) => {
65
135
debug ! (
@@ -101,11 +171,11 @@ impl<Storage> Authenticator for Basic<Storage>
101
171
where
102
172
Storage : UsersStorage ,
103
173
{
104
- fn check_credentials_grpc ( & self , credentials : Credentials ) -> Result < Permissions , Error > {
174
+ fn check_credentials_grpc ( & self , credentials : RequestCredentials ) -> Result < Permissions , Error > {
105
175
debug ! ( "check {:?}" , credentials) ;
106
176
match credentials. kind ( ) {
107
177
Some ( CredentialsKind :: InterNode ( node_name) ) => {
108
- if let Some ( true ) = self . check_node_request ( node_name, credentials. ip ( ) ) {
178
+ if self . check_node_request ( node_name, credentials. ip ( ) ) {
109
179
debug ! ( "request from node: {:?}" , credentials. ip( ) ) ;
110
180
Ok ( Permissions :: all ( ) )
111
181
} else {
@@ -116,7 +186,7 @@ where
116
186
}
117
187
}
118
188
119
- fn check_credentials_rest ( & self , credentials : Credentials ) -> Result < Permissions , Error > {
189
+ fn check_credentials_rest ( & self , credentials : RequestCredentials ) -> Result < Permissions , Error > {
120
190
debug ! ( "check {:?}" , credentials) ;
121
191
self . check_credentials_common ( credentials)
122
192
}
0 commit comments