11// SPDX-License-Identifier: Apache-2.0
22// Copyright Open Network Fabric Authors
33
4+ use crate :: processor:: k8s_client:: K8sClientError ;
5+ use crate :: processor:: k8s_client:: k8s_start_client;
46use crate :: processor:: proc:: ConfigChannelRequest ;
57use crate :: processor:: proc:: ConfigProcessor ;
6- use args :: GrpcAddress ;
8+
79use std:: fmt:: Display ;
8- use std:: io:: Error ;
910use std:: net:: SocketAddr ;
1011use std:: os:: unix:: fs:: PermissionsExt ;
1112use std:: path:: { Path , PathBuf } ;
1213use std:: pin:: Pin ;
1314use std:: task:: { Context , Poll } ;
15+
1416use tokio:: io;
1517use tokio:: net:: UnixListener ;
1618use tokio:: sync:: mpsc:: Sender ;
1719use tokio_stream:: Stream ;
20+ use tonic:: transport:: Server ;
21+
22+ use args:: GrpcAddress ;
23+ use tracing:: { debug, error, info, warn} ;
1824
1925use crate :: grpc:: server:: create_config_service;
2026use crate :: processor:: proc:: ConfigProcessorParams ;
21- use tonic:: transport:: Server ;
22- use tracing:: { debug, error, info, warn} ;
27+
28+ #[ derive( Debug , thiserror:: Error ) ]
29+ pub enum LaunchError {
30+ #[ error( "GRPC server error: {0}" ) ]
31+ GrpcServerError ( tonic:: transport:: Error ) ,
32+ #[ error( "IO error: {0}" ) ]
33+ IoError ( std:: io:: Error ) ,
34+ #[ error( "Error in K8s client task: {0}" ) ]
35+ K8sClientError ( K8sClientError ) ,
36+ #[ error( "Error starting/waiting for K8s client task: {0}" ) ]
37+ K8sClientJoinError ( tokio:: task:: JoinError ) ,
38+ #[ error( "K8s client exited prematurely" ) ]
39+ PrematureK8sClientExit ,
40+ #[ error( "Grpc server exited prematurely" ) ]
41+ PrematureGrpcExit ,
42+ #[ error( "Config processor exited prematurely" ) ]
43+ PrematureProcessorExit ,
44+
45+ #[ error( "Error in Config Processor task: {0}" ) ]
46+ ProcessorError ( std:: io:: Error ) ,
47+ #[ error( "Error starting/waiting for Config Processor task: {0}" ) ]
48+ ProcessorJoinError ( tokio:: task:: JoinError ) ,
49+ }
2350
2451/// Start the gRPC server on TCP
2552async fn start_grpc_server_tcp (
2653 addr : SocketAddr ,
2754 channel_tx : Sender < ConfigChannelRequest > ,
28- ) -> Result < ( ) , Error > {
55+ ) -> Result < ( ) , LaunchError > {
2956 info ! ( "Starting gRPC server on TCP address: {addr}" ) ;
3057 let config_service = create_config_service ( channel_tx) ;
3158
@@ -35,7 +62,7 @@ async fn start_grpc_server_tcp(
3562 . await
3663 . map_err ( |e| {
3764 error ! ( "Failed to start gRPC server" ) ;
38- Error :: other ( e . to_string ( ) )
65+ LaunchError :: GrpcServerError ( e )
3966 } )
4067}
4168
@@ -69,7 +96,7 @@ impl Stream for UnixAcceptor {
6996async fn start_grpc_server_unix (
7097 socket_path : & Path ,
7198 channel_tx : Sender < ConfigChannelRequest > ,
72- ) -> Result < ( ) , Error > {
99+ ) -> Result < ( ) , LaunchError > {
73100 info ! (
74101 "Starting gRPC server on UNIX socket: {}" ,
75102 socket_path. display( )
@@ -89,7 +116,7 @@ async fn start_grpc_server_unix(
89116 if !parent. exists ( ) {
90117 if let Err ( e) = std:: fs:: create_dir_all ( parent) {
91118 error ! ( "Failed to create parent directory: {e}" ) ;
92- return Err ( e ) ;
119+ return Err ( LaunchError :: IoError ( e ) ) ;
93120 }
94121 }
95122 }
@@ -102,7 +129,7 @@ async fn start_grpc_server_unix(
102129 }
103130 Err ( e) => {
104131 error ! ( "Failed to bind UNIX socket: {e}" ) ;
105- return Err ( e ) ;
132+ return Err ( LaunchError :: IoError ( e ) ) ;
106133 }
107134 } ;
108135
@@ -125,7 +152,7 @@ async fn start_grpc_server_unix(
125152 . await
126153 . map_err ( |e| {
127154 error ! ( "Failed to start gRPC server" ) ;
128- Error :: other ( e . to_string ( ) )
155+ LaunchError :: GrpcServerError ( e )
129156 } ) ?;
130157
131158 // Clean up the socket file after server shutdown
@@ -155,19 +182,14 @@ impl Display for ServerAddress {
155182}
156183
157184pub struct MgmtParams {
158- pub grpc_addr : GrpcAddress ,
185+ pub grpc_addr : Option < GrpcAddress > ,
159186 pub processor_params : ConfigProcessorParams ,
160187}
161188
162189/// Start the mgmt service with either type of socket
163- pub fn start_mgmt ( params : MgmtParams ) -> Result < std:: thread:: JoinHandle < ( ) > , Error > {
164- /* build server address from provided grpc address */
165- let server_address = match params. grpc_addr {
166- GrpcAddress :: Tcp ( addr) => ServerAddress :: Tcp ( addr) ,
167- GrpcAddress :: UnixSocket ( path) => ServerAddress :: Unix ( path. into ( ) ) ,
168- } ;
169- debug ! ( "Will start gRPC listening on {server_address}" ) ;
170-
190+ pub fn start_mgmt (
191+ params : MgmtParams ,
192+ ) -> Result < std:: thread:: JoinHandle < Result < ( ) , LaunchError > > , std:: io:: Error > {
171193 std:: thread:: Builder :: new ( )
172194 . name ( "mgmt" . to_string ( ) )
173195 . spawn ( move || {
@@ -180,19 +202,60 @@ pub fn start_mgmt(params: MgmtParams) -> Result<std::thread::JoinHandle<()>, Err
180202 . build ( )
181203 . expect ( "Tokio runtime creation failed" ) ;
182204
183- /* block thread to run gRPC and configuration processor */
184- rt. block_on ( async {
185- let ( processor, tx) = ConfigProcessor :: new ( params. processor_params ) ;
186- tokio:: spawn ( async { processor. run ( ) . await } ) ;
187-
188- // Start the appropriate server based on address type
189- let result = match server_address {
190- ServerAddress :: Tcp ( sock_addr) => start_grpc_server_tcp ( sock_addr, tx) . await ,
191- ServerAddress :: Unix ( path) => start_grpc_server_unix ( & path, tx) . await ,
205+ if let Some ( grpc_addr) = params. grpc_addr {
206+ /* build server address from provided grpc address */
207+ let server_address = match grpc_addr {
208+ GrpcAddress :: Tcp ( addr) => ServerAddress :: Tcp ( addr) ,
209+ GrpcAddress :: UnixSocket ( path) => ServerAddress :: Unix ( path. into ( ) ) ,
192210 } ;
193- if let Err ( e) = result {
194- error ! ( "Failed to start gRPC server: {e}" ) ;
195- }
196- } ) ;
211+ debug ! ( "Will start gRPC listening on {server_address}" ) ;
212+
213+ /* block thread to run gRPC and configuration processor */
214+ rt. block_on ( async {
215+ let ( processor, tx) = ConfigProcessor :: new ( params. processor_params ) ;
216+ tokio:: spawn ( async { processor. run ( ) . await } ) ;
217+
218+ // Start the appropriate server based on address type
219+ let result = match server_address {
220+ ServerAddress :: Tcp ( sock_addr) => start_grpc_server_tcp ( sock_addr, tx) . await ,
221+ ServerAddress :: Unix ( path) => start_grpc_server_unix ( & path, tx) . await ,
222+ } ;
223+ if let Err ( e) = result {
224+ error ! ( "Failed to start gRPC server: {e}" ) ;
225+ Err ( e)
226+ } else {
227+ error ! ( "GRPC server exited prematurely" ) ;
228+ Err ( LaunchError :: PrematureGrpcExit )
229+ }
230+ } )
231+ } else {
232+ debug ! ( "Will start watching k8s for configuration changes" ) ;
233+ rt. block_on ( async {
234+ let ( processor, tx) = ConfigProcessor :: new ( params. processor_params ) ;
235+ let processor_handle = tokio:: spawn ( async { processor. run ( ) . await } ) ;
236+ let k8s_handle = tokio:: spawn ( async move { k8s_start_client ( tx. clone ( ) ) . await } ) ;
237+ tokio:: select! {
238+ result = processor_handle => {
239+ match result {
240+ Ok ( _) => {
241+ error!( "Configuration processor task exited unexpectedly" ) ;
242+ Err ( LaunchError :: PrematureProcessorExit ) ?
243+ }
244+ Err ( e) => { Err :: <( ) , LaunchError >( LaunchError :: ProcessorJoinError ( e) ) }
245+ }
246+ }
247+ result = k8s_handle => {
248+ match result {
249+ Ok ( result) => { result. inspect_err( |e| error!( "K8s client task failed: {e}" ) ) . map_err( LaunchError :: K8sClientError ) ?;
250+ error!( "Kubernetes client task exited unexpectedly" ) ;
251+ Err ( LaunchError :: PrematureK8sClientExit ) ?
252+ }
253+ Err ( e) => { Err ( LaunchError :: K8sClientJoinError ( e) ) ? }
254+ }
255+ }
256+ } ?;
257+ Ok :: < ( ) , LaunchError > ( ( ) )
258+ } )
259+ }
197260 } )
198261}
0 commit comments