1
+ use bollard:: Docker ;
2
+ use bollard:: models:: EventMessage ;
3
+ use bollard:: errors:: Error as DockerError ;
4
+ use dotenv:: dotenv;
5
+ use futures_util:: StreamExt ;
6
+ use std:: env;
7
+ use std:: sync:: Arc ;
8
+ use tokio:: sync:: Mutex ;
9
+ use log:: { info, debug, error, warn} ;
10
+ use tokio:: time:: { sleep, Duration } ;
11
+
12
+ #[ tokio:: main]
13
+ async fn main ( ) {
14
+ // Lese Umgebungsvariablen (z.B. für LABEL_KEY und LABEL_VALUE)
15
+ dotenv ( ) . ok ( ) ;
16
+
17
+ // Setze das Standard-Log-Level auf "info", wenn keine Umgebungsvariable RUST_LOG gesetzt ist
18
+ if env:: var ( "RUST_LOG" ) . is_err ( ) {
19
+ env:: set_var ( "RUST_LOG" , "info" ) ;
20
+ }
21
+
22
+ // Initialisiere den Logger ohne Zeitstempel
23
+ env_logger:: Builder :: from_default_env ( )
24
+ . format_timestamp ( None )
25
+ . init ( ) ;
26
+
27
+ info ! ( "Listening on container start and stop events..." ) ;
28
+
29
+ let label_key = env:: var ( "LABEL_KEY" ) . unwrap_or_else ( |_| "ofelia.restart" . to_string ( ) ) ;
30
+ let label_value = env:: var ( "LABEL_VALUE" ) . unwrap_or_else ( |_| "true" . to_string ( ) ) ;
31
+ let container_name_to_restart = env:: var ( "CRON_CONTAINER" ) . unwrap_or_else ( |_| "cronjobs-cron-1" . to_string ( ) ) ;
32
+
33
+ let docker = Docker :: connect_with_local_defaults ( ) . expect ( "Failed to connect to Docker" ) ;
34
+
35
+ // Mutex für die Steuerung des Neustart-Timers
36
+ let restart_timer = Arc :: new ( Mutex :: new ( None ) ) ;
37
+
38
+ let mut events_stream = docker. events :: < String > ( None ) . fuse ( ) ;
39
+
40
+ while let Some ( event) = events_stream. next ( ) . await {
41
+ match event {
42
+ Ok ( event_message) => {
43
+ debug ! ( "Received event: {:?}" , event_message) ;
44
+ let docker = docker. clone ( ) ;
45
+ let restart_timer = restart_timer. clone ( ) ;
46
+ let label_key = label_key. clone ( ) ;
47
+ let label_value = label_value. clone ( ) ;
48
+ let container_name_to_restart = container_name_to_restart. clone ( ) ;
49
+
50
+ tokio:: spawn ( async move {
51
+ handle_event ( & docker, event_message, & label_key, & label_value, & container_name_to_restart, restart_timer) . await ;
52
+ } ) ;
53
+ } ,
54
+ Err ( e) => error ! ( "Error receiving event: {:?}" , e) ,
55
+ }
56
+ }
57
+ }
58
+
59
+ async fn handle_event (
60
+ docker : & Docker ,
61
+ event : EventMessage ,
62
+ label_key : & str ,
63
+ label_value : & str ,
64
+ container_name_to_restart : & str ,
65
+ restart_timer : Arc < Mutex < Option < tokio:: task:: JoinHandle < ( ) > > > >
66
+ ) {
67
+ if let Some ( action) = event. action {
68
+ if action == "start" || action == "stop" {
69
+ if let Some ( actor) = event. actor {
70
+ if let Some ( container_id) = actor. id {
71
+ // Hole den Container-Namen
72
+ let container_info = docker. inspect_container ( & container_id, None ) . await ;
73
+ if let Ok ( container_info) = container_info {
74
+ if let Some ( container_name) = container_info. name {
75
+ // Ignoriere Events des Cron-Containers selbst
76
+ if container_name == format ! ( "/{}" , container_name_to_restart) {
77
+ debug ! ( "Ignoring event for the cron container itself: {}" , container_name) ;
78
+ return ;
79
+ }
80
+ }
81
+ }
82
+
83
+ match docker. inspect_container ( & container_id, None ) . await {
84
+ Ok ( container_info) => {
85
+ if let Some ( labels) = container_info. config . and_then ( |c| c. labels ) {
86
+ if labels. get ( label_key) . map_or ( false , |v| v == label_value) {
87
+ info ! ( "Container {} {}ed" , container_info. name. unwrap_or_default( ) , action) ;
88
+
89
+ // Setze den Timer zurück, wenn ein neuer Container startet oder stoppt
90
+ let mut timer_guard = restart_timer. lock ( ) . await ;
91
+ if let Some ( existing_timer) = timer_guard. take ( ) {
92
+ existing_timer. abort ( ) ; // Abbrechen des bestehenden Timers
93
+ info ! ( "Timer reset for restarting the cron container" ) ;
94
+ }
95
+
96
+ let docker = docker. clone ( ) ;
97
+ let container_name_to_restart = container_name_to_restart. to_string ( ) ;
98
+
99
+ // Starte einen neuen Timer (z.B. 60 Sekunden)
100
+ * timer_guard = Some ( tokio:: spawn ( async move {
101
+ info ! ( "Timer set for restarting the cron container in 60 seconds" ) ;
102
+ sleep ( Duration :: from_secs ( 60 ) ) . await ;
103
+ if let Err ( e) = restart_container ( & docker, & container_name_to_restart) . await {
104
+ error ! ( "Error restarting container {}: {:?}" , container_name_to_restart, e) ;
105
+ }
106
+ } ) ) ;
107
+ }
108
+ }
109
+ } ,
110
+ Err ( DockerError :: DockerResponseServerError { status_code, .. } ) if status_code == 404 => {
111
+ warn ! ( "Container {} not found" , container_id) ;
112
+ } ,
113
+ Err ( e) => {
114
+ error ! ( "API error when getting container {}: {:?}" , container_id, e) ;
115
+ }
116
+ }
117
+ }
118
+ }
119
+ }
120
+ }
121
+ }
122
+
123
+ async fn restart_container ( docker : & Docker , container_name : & str ) -> Result < ( ) , DockerError > {
124
+ match docker. inspect_container ( container_name, None ) . await {
125
+ Ok ( _) => {
126
+ docker. restart_container ( container_name, None ) . await ?;
127
+ info ! ( "Container {} was restarted." , container_name) ;
128
+ Ok ( ( ) )
129
+ } ,
130
+ Err ( DockerError :: DockerResponseServerError { status_code, .. } ) if status_code == 404 => {
131
+ warn ! ( "Container {} not found." , container_name) ;
132
+ Ok ( ( ) )
133
+ } ,
134
+ Err ( e) => {
135
+ error ! ( "API error when restarting container {}: {:?}" , container_name, e) ;
136
+ Err ( e)
137
+ }
138
+ }
139
+ }
0 commit comments