@@ -9,64 +9,74 @@ import (
9
9
10
10
const maxDatagramSize = 8192
11
11
12
- type MulticastReceiver struct {
13
- activeIfis map [string ]bool
14
- connections []* net.UDPConn
12
+ type MulticastServer struct {
13
+ connection * net.UDPConn
15
14
running bool
16
15
consumer func ([]byte )
17
16
mutex sync.Mutex
18
17
SkipInterfaces []string
18
+ Verbose bool
19
19
}
20
20
21
- func NewMulticastReceiver (consumer func ([]byte )) (r * MulticastReceiver ) {
22
- r = new (MulticastReceiver )
23
- r .activeIfis = map [string ]bool {}
21
+ func NewMulticastServer (consumer func ([]byte )) (r * MulticastServer ) {
22
+ r = new (MulticastServer )
24
23
r .consumer = consumer
25
24
return
26
25
}
27
26
28
- func (r * MulticastReceiver ) Start (multicastAddress string ) {
27
+ func (r * MulticastServer ) Start (multicastAddress string ) {
29
28
r .running = true
30
29
go r .receive (multicastAddress )
31
30
}
32
31
33
- func (r * MulticastReceiver ) Stop () {
32
+ func (r * MulticastServer ) Stop () {
34
33
r .mutex .Lock ()
35
34
defer r .mutex .Unlock ()
36
35
r .running = false
37
- for _ , c := range r .connections {
38
- if err := c .Close (); err != nil {
39
- log .Println ("Could not close connection: " , err )
40
- }
36
+ if err := r .connection .Close (); err != nil {
37
+ log .Println ("Could not close connection: " , err )
41
38
}
42
39
}
43
40
44
- func (r * MulticastReceiver ) receive (multicastAddress string ) {
41
+ func (r * MulticastServer ) receive (multicastAddress string ) {
42
+ log .Printf ("Receiving on %v" , multicastAddress )
43
+ var currentIfiIdx = 0
45
44
for r .isRunning () {
46
- ifis , _ := net .Interfaces ()
47
- for _ , ifi := range ifis {
48
- if ifi .Flags & net .FlagMulticast == 0 || // No multicast support
49
- r .skipInterface (ifi .Name ) {
50
- continue
51
- }
52
- r .mutex .Lock ()
53
- if _ , ok := r .activeIfis [ifi .Name ]; ! ok {
54
- // interface not active, (re-)start receiving
55
- go r .receiveOnInterface (multicastAddress , ifi )
56
- }
57
- r .mutex .Unlock ()
45
+ ifis := r .interfaces ()
46
+ currentIfiIdx = currentIfiIdx % len (ifis )
47
+ ifi := ifis [currentIfiIdx ]
48
+ r .receiveOnInterface (multicastAddress , ifi )
49
+ currentIfiIdx ++
50
+ if currentIfiIdx >= len (ifis ) {
51
+ // cycled though all interfaces once, make a short break to avoid producing endless log messages
52
+ time .Sleep (1 * time .Second )
58
53
}
59
- time .Sleep (1 * time .Second )
60
54
}
61
55
}
62
56
63
- func (r * MulticastReceiver ) isRunning () bool {
57
+ func (r * MulticastServer ) isRunning () bool {
64
58
r .mutex .Lock ()
65
59
defer r .mutex .Unlock ()
66
60
return r .running
67
61
}
68
62
69
- func (r * MulticastReceiver ) skipInterface (ifiName string ) bool {
63
+ func (r * MulticastServer ) interfaces () (interfaces []net.Interface ) {
64
+ interfaces = []net.Interface {}
65
+ ifis , err := net .Interfaces ()
66
+ if err != nil {
67
+ log .Println ("Could not get available interfaces: " , err )
68
+ }
69
+ for _ , ifi := range ifis {
70
+ if ifi .Flags & net .FlagMulticast == 0 || // No multicast support
71
+ r .skipInterface (ifi .Name ) {
72
+ continue
73
+ }
74
+ interfaces = append (interfaces , ifi )
75
+ }
76
+ return
77
+ }
78
+
79
+ func (r * MulticastServer ) skipInterface (ifiName string ) bool {
70
80
for _ , skipIfi := range r .SkipInterfaces {
71
81
if skipIfi == ifiName {
72
82
return true
@@ -75,52 +85,55 @@ func (r *MulticastReceiver) skipInterface(ifiName string) bool {
75
85
return false
76
86
}
77
87
78
- func (r * MulticastReceiver ) receiveOnInterface (multicastAddress string , ifi net.Interface ) {
88
+ func (r * MulticastServer ) receiveOnInterface (multicastAddress string , ifi net.Interface ) {
79
89
addr , err := net .ResolveUDPAddr ("udp" , multicastAddress )
80
90
if err != nil {
81
91
log .Printf ("Could resolve multicast address %v: %v" , multicastAddress , err )
82
92
return
83
93
}
84
94
85
- listener , err : = net .ListenMulticastUDP ("udp" , & ifi , addr )
95
+ r . connection , err = net .ListenMulticastUDP ("udp" , & ifi , addr )
86
96
if err != nil {
87
97
log .Printf ("Could not listen at %v: %v" , multicastAddress , err )
88
98
return
89
99
}
90
100
91
- if err := listener .SetReadBuffer (maxDatagramSize ); err != nil {
101
+ if err := r . connection .SetReadBuffer (maxDatagramSize ); err != nil {
92
102
log .Println ("Could not set read buffer: " , err )
93
103
}
94
104
95
- r .mutex .Lock ()
96
- r .connections = append (r .connections , listener )
97
- r .activeIfis [ifi .Name ] = true
98
- r .mutex .Unlock ()
99
-
100
- log .Printf ("Listening on %s (%s)" , multicastAddress , ifi .Name )
105
+ if r .Verbose {
106
+ log .Printf ("Listening on %s (%s)" , multicastAddress , ifi .Name )
107
+ }
101
108
109
+ first := true
102
110
data := make ([]byte , maxDatagramSize )
103
111
for {
104
- n , _ , err := listener .ReadFrom (data )
112
+ if err := r .connection .SetDeadline (time .Now ().Add (300 * time .Millisecond )); err != nil {
113
+ log .Println ("Could not set deadline on connection: " , err )
114
+ }
115
+ n , _ , err := r .connection .ReadFromUDP (data )
105
116
if err != nil {
106
- log .Println ("ReadFromUDP failed:" , err )
117
+ if r .Verbose {
118
+ log .Println ("ReadFromUDP failed:" , err )
119
+ }
107
120
break
108
121
}
109
122
123
+ if first {
124
+ log .Printf ("Got first data packets from %s (%s)" , multicastAddress , ifi .Name )
125
+ first = false
126
+ }
127
+
110
128
r .consumer (data [:n ])
111
129
}
112
130
113
- log .Printf ("Stop listening on %s (%s)" , multicastAddress , ifi .Name )
131
+ if r .Verbose {
132
+ log .Printf ("Stop listening on %s (%s)" , multicastAddress , ifi .Name )
133
+ }
114
134
115
- if err := listener .Close (); err != nil {
135
+ if err := r . connection .Close (); err != nil {
116
136
log .Println ("Could not close listener: " , err )
117
137
}
118
- r .mutex .Lock ()
119
- delete (r .activeIfis , ifi .Name )
120
- for i , c := range r .connections {
121
- if c == listener {
122
- r .connections = append (r .connections [:i ], r .connections [i + 1 :]... )
123
- }
124
- }
125
- r .mutex .Unlock ()
138
+ return
126
139
}
0 commit comments