forked from pjkundert/cpppo
-
Notifications
You must be signed in to change notification settings - Fork 2
/
modbus_test.py
246 lines (212 loc) · 9.55 KB
/
modbus_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
from __future__ import absolute_import, print_function, division
try:
from future_builtins import zip, map # Use Python 3 "lazy" zip, map
except ImportError:
pass
import atexit
import errno
import logging
import os
import random
import re
import signal
import subprocess
import sys
import time
import traceback
log = logging.getLogger( 'mbs_test' )
has_o_nonblock = False
try:
import fcntl
has_o_nonblock = True
except Exception:
log.warning( "Failed to import fcntl; skipping simulated Modbus/TCP PLC tests" )
from . import misc
from .tools.waits import waitfor
RTU_WAIT = 2.0 # How long to wait for the simulator
RTU_LATENCY = 0.05 # poll for command-line I/O response
class nonblocking_command( object ):
"""Set up a non-blocking command producing output. Read the output using:
collect = ''
while True:
if command is None:
# Restarts command on failure, for example
command = nonblocking_command( ... )
try:
data = command.stdout.read()
log.debug( "Received %d bytes from command, len( data ))
collect += data
except IOError as exc:
if exc.errno != errno.EAGAIN:
log.warning( "I/O Error reading data: %s" % traceback.format_exc() )
command = None
# Data not presently available; ignore
except:
log.warning( "Exception reading data: %s", traceback.format_exc() )
command = None
# do other stuff in loop...
The command is killed when it goes out of scope. Pass a file-like object for stderr if desired;
None would cause it to share the enclosing interpreter's stderr.
As a safety mechanism, arrange to use atexit.register to terminate the command (if it isn't
already dead).
"""
def __init__( self, command, stderr=subprocess.STDOUT, stdin=None, bufsize=0, blocking=None ):
shell = type( command ) is not list
self.command = ' '.join( command ) if not shell else command
log.info( "Starting command: %s", self.command )
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=stderr, stdin=stdin,
bufsize=bufsize, preexec_fn=os.setsid, shell=shell )
log.normal( 'Started Server PID [%d]: %s', self.process.pid, self.command )
if not blocking:
fd = self.process.stdout.fileno()
fl = fcntl.fcntl( fd, fcntl.F_GETFL )
fcntl.fcntl( fd, fcntl.F_SETFL, fl | os.O_NONBLOCK )
# Really, really ensure we get terminated
atexit.register( self.kill )
@property
def stdout( self ):
return self.process.stdout
def kill( self ):
log.normal( 'Sending SIGTERM to PID [%d]: %s, via: %s', self.process.pid, self.command,
''.join( traceback.format_stack() ) if log.isEnabledFor( logging.INFO ) else '' )
try:
os.killpg( self.process.pid, signal.SIGTERM )
except OSError as exc:
log.info( 'Failed to send SIGTERM to PID [%d]: %s', self.process.pid, exc )
else:
log.info( "Waiting for command (PID [%d]) to terminate", self.process.pid )
self.process.wait()
# Process may exit with a non-numeric returncode (eg. None)
log.info( "Command (PID [%d]) finished with status %r: %s",
self.process.pid, self.process.returncode, self.command )
__del__ = kill
def start_modbus_simulator( options ):
"""Start bin/modbus_sim.py; assumes it flushes stdout when printing bindings so we can parse it
here.
"""
command = nonblocking_command( [
sys.executable,
os.path.join( os.path.dirname( os.path.abspath( __file__ )), 'bin', 'modbus_sim.py' ),
] + list( options ), stderr=None, bufsize=0 )
begun = misc.timer()
address = None
data = ''
while address is None and misc.timer() - begun < RTU_WAIT:
# On Python2, socket will raise IOError/EAGAIN; on Python3 may return None 'til command started.
raw = None
try:
raw = command.stdout.read()
log.debug( "Socket received: %r", raw)
if raw:
data += raw.decode( 'utf-8', 'backslashreplace' )
except IOError as exc:
log.debug( "Socket blocking...")
assert exc.errno == errno.EAGAIN, "Expected only Non-blocking IOError"
except Exception as exc:
log.warning("Socket read return Exception: %s", exc)
if not raw:
time.sleep( RTU_LATENCY )
while data.find( '\n' ) >= 0:
line,data = data.split( '\n', 1 )
log.info( "%s", line )
m = re.search( "address = (.*)", line )
if m:
try:
host,port = m.group(1).split( ':' )
address = host,int(port)
log.normal( "Modbus/TCP Simulator started after %7.3fs on %s:%d",
misc.timer() - begun, address[0], address[1] )
except:
assert m.group(1).startswith( '/' )
address = m.group(1)
log.normal( "Modbus/RTU Simulator started after %7.3fs on %s",
misc.timer() - begun, address )
break
return command,address
def run_plc_modbus_polls( plc ):
# Initial conditions (in case PLC is persistent between tests)
plc.write( 1, 0 )
plc.write( 40001, 0 )
rate = 1.0
timeout = 2 * rate # Nyquist
intervals = timeout / .05 # w/ fixed .05s intervals
wfkw = dict( timeout=timeout, intervals=intervals )
plc.poll( 40001, rate=rate )
success,elapsed = waitfor( lambda: plc.read( 40001 ) is not None, "40001 polled", **wfkw )
assert success
assert elapsed < 1.0
assert plc.read( 40001 ) == 0
assert plc.read( 1 ) == None
assert plc.read( 40002 ) == None
success,elapsed = waitfor( lambda: plc.read( 40002 ) is not None, "40002 polled", **wfkw )
assert success
assert elapsed < 1.0
assert plc.read( 40002 ) == 0
success,elapsed = waitfor( lambda: plc.read( 1 ) is not None, "00001 polled", **wfkw )
assert success
assert elapsed < 1.0
assert plc.read( 1 ) == 0
# Now add a bunch of new stuff to poll, and ensure polling occurs. As we add registers the
# number of distinct poll ranges will increase, and then decrease as we in-fill and the
# inter-register range drops below the merge reach 10, allowing the polling to merge ranges.
# Thus, keep track of the number of registers added, and allow
#
# avg.
# poll
# time
#
# |
# |
# 4s| ..
# 3s| . .
# 2s| ... ...
# 1s|..... .......
# -+----------------------------------
# | 10 20 30 40 regs
# We'll be overwhelming the poller, so it won't be able to poll w/in the target rate, so we'll
# need to more than double the Nyquist-rate timeout
wfkw['timeout'] *= 2.5
wfkw['intervals'] *= 2.5
regs = {}
extent = 100 # how many each of coil/holding registers
total = extent*2 # total registers in play
elapsed = None
rolling = None
rolling_factor = 1.0/5 # Rolling exponential moving average over last ~8 samples
# Keep increasing the number of registers polled, up to 1/2 of all registers
while len( regs ) < total * 50 // 100:
# Always select a previously unpolled register; however, it might
# have already been in a merge range; if so, get its current value
# so we mutate it (forcing it to be re-polled)
base = 40001 if random.randint( 0, 1 ) else 1
r = None
while r is None or r in regs:
r = random.randint( base, base + extent )
v = plc.read( r )
if v is not None:
log.detail( "New reg %5d was already polled due to reach=%d", r, plc.reach )
regs[r] = v
regs[r] = ( regs[r] ^ 1 if r in regs
else random.randint( 0, 65535 ) if base > 40000
else random.randint( 0, 1 ) )
plc.write( r, regs[r] )
plc.poll( r )
if len( regs ) > total * 10 // 100:
# skip to the good parts... After 10% of all registers are being polled, start
# calculating. See how long it takes, on average, to get the newly written register
# value polled back.
success,elapsed = waitfor( lambda: plc.read( r ) == regs[r], "polled %5d == %5d" % ( r, regs[r] ), **wfkw )
assert success
rolling = misc.exponential_moving_average( rolling, elapsed, rolling_factor )
log.normal( "%3d/%3d regs: polled %3d ranges w/in %7.3fs. Polled %5d == %5d w/in %7.3fs: avg. %7.3fs (load %3.2f, %3.2f, %3.2f)",
len( regs ), total, len( plc.polling ), plc.duration,
r, regs[r], elapsed or 0.0, rolling or 0.0, *[misc.nan if load is None else load for load in plc.load] )
if len( regs ) > total * 20 // 100:
# after 20%, start looking for the exit (ranges should merge, poll rate fall )
if rolling < plc.rate:
break
assert rolling < plc.rate, \
"Rolling average poll cycle %7.3fs should have fallen below target poll rate %7.3fs" % ( rolling, plc.rate )
for r,v in regs.items():
assert plc.read( r ) == v