-
Notifications
You must be signed in to change notification settings - Fork 0
/
redpitaya_scpi.py
1045 lines (839 loc) · 35.7 KB
/
redpitaya_scpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""SCPI access to Red Pitaya."""
import socket
import struct
import numpy as np
__author__ = "Luka Golinar, Iztok Jeras, Miha Gjura"
__copyright__ = "Copyright 2023, Red Pitaya"
class scpi (object):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter = '\r\n'
def __init__(self, host, timeout=None, port=5000):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self.host = host
self.port = port
self.timeout = timeout
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout is not None:
self._socket.settimeout(timeout)
self._socket.connect((host, port))
except socket.error as e:
print('SCPI >> connect({!s:s}:{:d}) failed: {!s:s}'.format(host, port, e))
def __del__(self):
if self._socket is not None:
self._socket.close()
self._socket = None
def close(self):
"""Close IP connection."""
self.__del__()
def rx_txt(self, chunksize = 4096):
"""Receive text string and return it after removing the delimiter."""
msg = ''
while 1:
chunk = self._socket.recv(chunksize).decode('utf-8') # Receive chunk size of 2^n preferably
msg += chunk
if (len(msg) >= 2 and msg[-2:] == self.delimiter):
return msg[:-2]
def rx_txt_check_error(self, chunksize = 4096,stop = True):
msg = self.rx_txt(chunksize)
self.check_error(stop)
return msg
def rx_arb(self):
""" Recieve binary data from scpi server"""
numOfBytes = 0
data=b''
while len(data) != 1:
data = self._socket.recv(1)
if data != b'#':
return False
data=b''
while len(data) != 1:
data = self._socket.recv(1)
numOfNumBytes = int(data)
if numOfNumBytes <= 0:
return False
data=b''
while len(data) != numOfNumBytes:
data += (self._socket.recv(1))
numOfBytes = int(data)
data=b''
while len(data) < numOfBytes:
r_size = min(numOfBytes - len(data),4096)
data += (self._socket.recv(r_size))
return data
def rx_arb_check_error(self,stop = True):
data = self.rx_arb()
self.check_error(stop)
return data
def tx_txt(self, msg):
"""Send text string ending and append delimiter."""
return self._socket.sendall((msg + self.delimiter).encode('utf-8')) # was send(().encode('utf-8'))
def tx_txt_check_error(self, msg,stop = True):
self.tx_txt(msg)
self.check_error(stop)
def txrx_txt(self, msg):
"""Send/receive text string."""
self.tx_txt(msg)
return self.rx_txt()
def check_error(self,stop = True):
res = int(self.stb_q())
if (res & 0x4):
while 1:
err = self.err_n()
if (err.startswith('0,')):
break
print(err)
n = err.split(",")
if (len(n) > 0 and stop and int(n[0]) > 9500):
exit(1)
# SCPI command functions
def sour_set(
self,
chan: int,
func: str = "sine",
volt: float = 1,
freq: float = 1000,
offset: float = 0,
phase: float = 0,
dcyc: float = 0.5,
data: np.ndarray = None,
burst: bool = False,
ncyc: int = 1,
nor: int = 1,
period: int = None,
trig: str = "int",
sdrlab: bool = False,
siglab: bool = False,
) -> None:
"""
Set the parameters for signal generator on one channel.
Parameters
-----------
chan (int) :
Output channel (either 1 or 2).
func (str, optional) :
Waveform of the signal (SINE, SQUARE, TRIANGLE, SAWU,
SAWD, PWM, ARBITRARY, DC, DC_NEG).
Defaults to `sine`.
volt (int, optional) :
Amplitude of signal {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12.
Defaults to 1.
freq (int, optional) :
Frequency of signal. Not relevant if 'func' is "DC" or "DC_NEG".
Defaults to 1000.
offset (int, optional) :
Signal offset {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12.
Defaults to 0.
phase (int, optional) :
Phase of signal {-360, 360} degrees.
Defaults to 0.
dcyc (float, optional) :
Duty cycle, where 1 corresponds to 100%.
Defaults to 0.5.
data (ndarray, optional) :
Numpy ``ndarray`` of max 16384 values, floats in range {-1,1}
(or {-5,5} for SIGNALlab).
Define the custom waveform if "func" is "ARBITRARY".
Defaults to `None`.
burst (bool, optional) :
Enable/disable Burst mode. (`True` - BURST, `False` - CONINUOUS)
Generate "nor" number of "ncyc" periods with total time "period".
Defaults to `False`.
ncyc (int, optional) :
Number of periods in one burst.
Defaults to 1.
nor (int, optional) :
Number of repeated bursts.
Defaults to 1.
period (_type_, optional) :
Total time of one burst in µs {1, 5e8}. Includes the signal and delay.
Defaults to `None`.
trig (str, optional):
Trigger source (EXT_PE, EXT_NE, INT, GATED).
Defaults to `int` (internal).
sdrlab (bool, optional):
`True` if operating with SDRlab 122-16.
Defaults to `False`.
siglab (bool, optional):
`True` if operating with SIGNALlab 250-12.
Defaults to `False`.
The settings will work on any Red Pitaya board. If operating on a board
other than STEMlab 125-14, change the bool value of the appropriate
parameter to true (sdrlab, siglab)
Raises
------
Raises errors if the input parameters are out of range.
"""
### Constants ###
waveform_list = ["SINE","SQUARE","TRIANGLE","SAWU","SAWD","PWM","ARBITRARY","DC","DC_NEG"]
trigger_list = ["EXT_PE","EXT_NE","INT","GATED"]
buff_size = 16384
### Limits ###
volt_lim = 1
offs_lim = 1
phase_lim = 360
freq_up_lim = 50e6 # 50 MHz
freq_down_lim = 0
if siglab:
volt_lim = 5
offs_lim = 5
elif sdrlab:
freq_down_lim = 300e3 # 300 kHz
### CHECK FOR ERRORS ###
try:
assert chan in (1,2)
except AssertionError as channel_err:
raise ValueError("Channel needs to be either 1 or 2") from channel_err
try:
assert func.upper() in waveform_list
except AssertionError as waveform_err:
raise ValueError(f"{func.upper()} is not a defined waveform") from waveform_err
try:
assert freq_down_lim < freq <= freq_up_lim
except AssertionError as freq_err:
raise ValueError(f"Frequency is out of range {freq_down_lim, freq_up_lim} Hz") from freq_err
try:
assert abs(volt) <= volt_lim
except AssertionError as ampl_err:
raise ValueError(f"Amplitude is out of range {-volt_lim, volt_lim} V") from ampl_err
try:
assert abs(offset) <= offs_lim
except AssertionError as offs_err:
raise ValueError(f"Offset is out of range {-offs_lim, offs_lim} V") from offs_err
try:
assert 0 <= dcyc <= 1
except AssertionError as dcyc_err:
raise ValueError(f"Duty Cycle is out of range {0, 1}") from dcyc_err
try:
assert abs(phase) <= phase_lim
except AssertionError as phase_err:
raise ValueError(f"Phase is out of range {-phase_lim, phase_lim} deg") from phase_err
if data is not None:
try:
assert data.shape[0] <= buff_size
except AssertionError as data_err:
raise ValueError(f"Data array is too long. Max length is {buff_size}") from data_err
#try:
# assert max(absolute(data)) <= volt_lim
#except AssertionError:
# raise ValueError(f"Amplitude of data is out of range {-volt_lim, volt_lim}")
try:
assert ncyc >= 1
except AssertionError as ncyc_err:
raise ValueError("NCYC minimum is 1") from ncyc_err
try:
assert nor >= 1
except AssertionError as nor_err:
raise ValueError("NOR minimum is 1") from nor_err
if period is not None:
try:
assert period >= 1
except AssertionError as period_err:
raise ValueError("Minimal burst period 1 µs") from period_err
try:
assert trig.upper() in trigger_list
except AssertionError as trig_err:
raise ValueError(f"{trig.upper()} is not a defined trigger source") from trig_err
try:
assert not((siglab is True) and (sdrlab is True))
except AssertionError as board_err:
raise ValueError("Please select only one board option. 'siglab' and 'sdrlab' cannot be true at the same time.") from board_err
### Variables ###
wf_data = []
### SEND COMMANDS TO RP ###
self.tx_txt(f"SOUR{chan}:FUNC {func.upper()}")
self.tx_txt(f"SOUR{chan}:VOLT {volt}")
if func.upper() not in waveform_list[7:9]:
self.tx_txt(f"SOUR{chan}:FREQ:FIX {freq}")
self.tx_txt(f"SOUR{chan}:VOLT:OFFS {offset}")
self.tx_txt(f"SOUR{chan}:PHAS {phase}")
if func.upper() == "PWM":
self.tx_txt(f"SOUR{chan}:DCYC {dcyc}")
if (data is not None) and (func.upper() == "ARBITRARY"):
for n in data:
wf_data.append(f"{n:.5f}")
cust_wf = ", ".join(map(str, wf_data))
self.tx_txt(f"SOUR{chan}:TRAC:DATA:DATA {cust_wf}")
if burst:
self.tx_txt(f"SOUR{chan}:BURS:STAT BURST")
self.tx_txt(f"SOUR{chan}:BURS:NCYC {ncyc}")
self.tx_txt(f"SOUR{chan}:BURS:NOR {nor}")
if period is not None:
self.tx_txt(f"SOUR{chan}:BURS:INT:PER {period}")
else:
self.tx_txt(f"SOUR{chan}:BURS:STAT CONTINUOUS")
self.tx_txt(f"SOUR{chan}:TRIG:SOUR {trig.upper()}")
#print(f"SOUR{chan} set successfully")
def acq_set(
self,
dec: int = 1,
trig_lvl: float = 0,
trig_delay: int = 0,
trig_delay_ns: bool = False,
units: str = None,
sample_format: str = None,
averaging: bool = True,
gain: list = None, # 2 channels (double the length if 4-input)
coupling: list = None, # 2 channels
ext_trig_lvl: float = 0,
siglab: bool = False,
input4: bool = False
) -> None:
"""
Set the parameters for signal acquisition.
Parameters
-----------
dec (int, optional) :
Decimation (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048,
4096, 8192, 16384, 32768, 65536)
Defaults to 1.
trig_lvl (float, optional) :
Trigger level in Volts. {-1, 1} Volts on LV gain or {-20, 20} Volts on HV gain.
Defaults to 0.
trig_delay (int, optional) :
Trigger delay in samples (if trig_delay_ns = True, then the delay is in ns)
Defaults to 0.
trig_delay_ns (bool, optional) :
Change the trigger delay to nanoseconds instead of samples.
Defaults to False.
units (str, optional) :
The units in which the acquired data will be returned.
Defaults to "VOLTS".
sample_format (str, optional) :
The format in which the acquired data will be returned.
Defaults to "ASCII".
averaging (bool, optional) :
Enable/disable averaging. When True, if decimation is higher than 1,
each returned sample is the average of the taken samples. For example,
if dec = 4, the returned sample will be the average of the 4 decimated
samples.
Defaults to True.
gain (list(str), optional) :
HV / LV - (High (1:20) or Low (1:1 attenuation))
The first element in list applies to the SOUR1 and the second to SOUR2.
Refers to jumper settings on Red Pitaya fast analog inputs.
(1:20 and 1:1 attenuator for SIGNALlab 250-12)
Defaults to ["LV","LV"].
coupling (list(str), optional) :
AC / DC - coupling mode for fast analog inputs.
The first element in list applies to the SOUR1 and the second to SOUR2.
(Only SIGNALlab 250-12)
Defaults to ["DC","DC"].
ext_trig_lvl (float, optional) :
Set trigger external level in V.
(Only SIGNALlab 250-12)
Defaults to 0.
siglab (bool, optional) :
Set to True if operating with SIGNALlab 250-12.
Defaults to False.
input4 (bool, optional) :
Set to True if operating with STEMlab 125-14 4-Input.
Defaults to False.
The settings will work on any Red Pitaya board. If operating on SIGNALlab 250-12
or STEMlab 125-14 4-Input change the bool value of the appropriate parameter to
true (siglab, input4). This will change the available range of input parameters.
Raises
------
Raises errors if the input parameters are out of range.
"""
### Constants ###
#decimation_list = [1,2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536]
gain_list = ["LV","HV"]
coupling_list = ["DC","AC"]
units_list = ["RAW","VOLTS"]
format_list = ["BIN", "ASCII"]
### Limits ###
if input4: # Set number of channels
n = 4
else:
n = 2
trig_lvl_lim = 1.0
gain_lvl = "LV"
if gain is not None:
for i in gain:
if i.upper() == "HV":
trig_lvl_lim = 20.0
gain_lvl = "HV"
### CHECK FOR ERRORS ###
#try:
# assert dec in decimation_list
#except AssertionError as dec_err:
# raise ValueError(f"Decimation needs to be a power of 2 {1, 65536}")
try:
assert abs(trig_lvl) <= trig_lvl_lim
except AssertionError as trig_err:
raise ValueError(f"Trigger level out of range {-trig_lvl_lim, trig_lvl_lim} V",
f"for gain {gain_lvl}") from trig_err
try:
assert trig_delay >= 0
except AssertionError as trig_dly_err:
raise ValueError("Trigger delay cannot be less that 0") from trig_dly_err
if units is not None:
try:
assert units.upper() in units_list
except AssertionError as unit_err:
raise ValueError(f"{units.upper()} is not a defined unit") from unit_err
if sample_format is not None:
try:
assert sample_format.upper() in format_list
except AssertionError as format_err:
raise ValueError(f"{sample_format.upper()} is not a defined format") from format_err
if gain is not None:
try:
assert (gain[0].upper() in gain_list) and (gain[1].upper() in gain_list)
except AssertionError as gain_err:
raise ValueError(f"{gain[0].upper()} or {gain[1].upper()} is not a defined gain") from gain_err
if siglab and coupling is not None:
try:
assert (coupling[0].upper() in coupling_list) and (coupling[1].upper() in coupling_list)
except AssertionError as coupling_err:
raise ValueError(f"{coupling[0].upper()} or {coupling[1].upper()}",
"is not a defined coupling") from coupling_err
try:
assert abs(ext_trig_lvl) <= trig_lvl_lim
except AssertionError as ext_trig_err:
raise ValueError("External trigger level out of range",
f"{-trig_lvl_lim, trig_lvl_lim} V") from ext_trig_err
try:
assert not((siglab is True) and (input4 is True))
except AssertionError as board_err:
raise ValueError("Please select only one board option.",
"'siglab' and 'input4' cannot be true at the same time.") from board_err
### SEND COMMANDS TO RP ###
self.tx_txt(f"ACQ:DEC {dec}")
if averaging:
self.tx_txt("ACQ:AVG ON")
else:
self.tx_txt("ACQ:AVG OFF")
if trig_delay_ns:
self.tx_txt(f"ACQ:TRIG:DLY:NS {trig_delay}")
else:
self.tx_txt(f"ACQ:TRIG:DLY {trig_delay}")
if units is not None:
self.tx_txt(f"ACQ:DATA:UNITS {units.upper()}")
if sample_format is not None:
self.tx_txt(f"ACQ:DATA:FORMAT {sample_format.upper()}")
if gain is not None:
for i in range(n):
self.tx_txt(f"ACQ:SOUR{i+1}:GAIN {gain[i].upper()}")
self.tx_txt(f"ACQ:TRIG:LEV {trig_lvl}")
if siglab and coupling is not None:
for i in range(n):
self.tx_txt(f"ACQ:SOUR{i+1}:COUP {coupling[i].upper()}")
self.tx_txt(f"ACQ:TRIG:EXT:LEV {ext_trig_lvl}")
#print("ACQ set successfully")
def get_settings(
self,
siglab: bool = False,
input4: bool = False
) -> str:
"""
Retrieves the settings from Red Pitaya, prints them in console and returns
them as an array with the following sequence:
[decimation, avearge, trig_dly, trig_dly_ns, trig_lvl, buf_size, gain_ch1, gain_ch2, coup_ch1, coup_ch2, ext_trig_lvl]
, gain_ch3, gain_ch4
Decimation - Current decimation
Average - Current averaging status (ON/OFF)
Trig_dly - Current trigger delay in samples
Trig_dly_ns - Current trigger delay in nanoseconds
Trig_lvl - Current triger level in Volts
Buf_size - Buffer size
Gain_ch1-4 - Current gain on channels (CH3 and CH4 STEMlab 125-14 4-Input only)
Coup_ch1/2 - Current coupling mode for both channels (AC/DC) (SIGNALlab only)
Ext_trig_lvl - Current external trigger level in Volts (SIGNALlab only)
Note: The last three array elements won't exist if siglab = False
Gain of channels 3 and 4 only if input4 = True
Parameters
----------
siglab (bool, optional):
Set to True if operating with SIGNALlab 250-12.
Defaults to False.
input4 (bool, optional):
Set to True if operating with STEMlab 125-14 4-Input.
Defaults to False.
"""
try:
assert not((siglab is True) and (input4 is True))
except AssertionError as board_err:
raise ValueError("Please select only one board option. 'siglab' and 'input4' cannot be true at the same time.") from board_err
settings = []
if input4: # Set number of channels
n = 4
else:
n = 2
settings.append(self.txrx_txt("ACQ:DEC?"))
settings.append(self.txrx_txt("ACQ:AVG?"))
settings.append(self.txrx_txt("ACQ:TRIG:DLY?"))
settings.append(self.txrx_txt("ACQ:TRIG:DLY:NS?"))
settings.append(self.txrx_txt("ACQ:TRIG:LEV?"))
settings.append(self.txrx_txt("ACQ:BUF:SIZE?"))
for i in range(n):
settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:GAIN?"))
if siglab:
for i in range(2):
settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:COUP?"))
settings.append(self.txrx_txt("ACQ:TRIG:EXT:LEV?"))
print(f"Decimation: {settings[0]}")
print(f"Averaging: {settings[1]}")
print(f"Trigger delay (samples): {settings[2]}")
print(f"Trigger delay (ns): {settings[3]}")
print(f"Trigger level (V): {settings[4]}")
print(f"Buffer size: {settings[5]}")
if input4:
print(f"Gain CH1/CH2/CH3/CH4: {settings[6]}, {settings[7]}, {settings[8]}, {settings[9]}")
else:
print(f"Gain CH1/CH2: {settings[6]}, {settings[7]}")
if siglab:
print(f"Coupling CH1/CH2: {settings[8]}, {settings[9]}")
print(f"External trigger level (V): {settings[10]}")
return settings
def acq_data(
self,
chan: int,
start: int = None,
end: int = None,
num_samples: int = None,
old: bool = False,
lat: bool = False,
binary: bool = False,
convert: bool = False,
input4: bool = False
) -> list:
"""
Returns the acquired data on a channel from the Red Pitaya, with the following options (for a specific channel):
- only channel => returns the whole buffer
- start and end => returns the samples between them
- start and n => returns 'n' samples from the start position
- old and n => returns 'n' oldest samples in the buffer
- lat and n => returns 'n' latest samples in the buffer
Parameters
----------
chan (int) :
Input channel (either 1 or 2).
(1-4 for STEMlab 125-14 4-Input)
start (int, optional):
Start position of acquired data in the buffer {0,1,...16384}
Defaults to None.
end (int, optional):
End position of acquired data in the buffer {0,1,...16384}
Defaults to None.
n (int, optional):
Number of samples read.
old (bool, optional):
Read oldest samples in the buffer.
lat (bool, optional):
Read latest samples in the buffer.
bin (bool, optional):
Set to True if working with Binary data.
Defaults to False.
convert (bool, optional):
Set to True to convert data to a list of floats (VOLTS) or integers (RAW).
Otherwise returns a list of str (VOLTS) or int (RAW).
Defaults to False.
input4 (bool, optional) :
Set to True if operating with STEMlab 125-14 4-Input.
Defaults to False.
Raises
------
Raises errors if the input parameters do not match one of the options.
"""
low_lim = 0
up_lim = 16384
# Check input data for errors
if input4:
try:
assert chan in (1,2,3,4)
except AssertionError as chanel_err:
raise ValueError("Channel needs to be either 1, 2, 3 or 4") from chanel_err
else:
try:
assert chan in (1,2)
except AssertionError as chanel_err:
raise ValueError("Channel needs to be either 1 or 2") from chanel_err
try:
assert not((old is True) and (lat is True))
except AssertionError as arg_err:
raise ValueError("Please select only one. 'old' and 'lat' cannot be True at the same time.") from arg_err
if start is not None:
try:
assert 16384 >= start >= 0
except AssertionError as start_err:
raise ValueError(f"Start position out of range {low_lim, up_lim}") from start_err
if end is not None:
try:
assert 16384 >= end >= 0
except AssertionError as end_err:
raise ValueError(f"End position out of range {low_lim, up_lim}") from end_err
if num_samples is not None:
try:
assert 16384 >= num_samples >= 0
except AssertionError as sample_err:
raise ValueError(f"Sample number out of range {low_lim, up_lim}") from sample_err
# Get data type from Red Pitaya
units = self.txrx_txt('ACQ:DATA:UNITS?')
# format = self.txrx_txt("ACQ:DATA:FORMAT?")
# Determine the output data
if(start is not None) and (end is not None):
self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:END? {start},{end}")
elif(start is not None) and (num_samples is not None):
self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:N? {start},{num_samples}")
elif old and (num_samples is not None):
self.tx_txt(f"ACQ:SOUR{chan}:DATA:OLD:N? {num_samples}")
elif lat and (num_samples is not None):
self.tx_txt(f"ACQ:SOUR{chan}:DATA:LAT:N? {num_samples}")
else:
self.tx_txt(f"ACQ:SOUR{chan}:DATA?")
# Convert data
if binary:
buff_byte = self.rx_arb()
if convert:
if units == "VOLTS":
buff = [struct.unpack('!f',bytearray(buff_byte[i:i+4]))[0] for i in range(0, len(buff_byte), 4)]
elif units == "RAW":
buff = [struct.unpack('!h',bytearray(buff_byte[i:i+2]))[0] for i in range(0, len(buff_byte), 2)]
else:
buff = buff_byte
else:
buff_string = self.rx_txt()
if convert:
buff_string = buff_string.strip('{}\n\r').replace(" ", "").split(',')
buff = list(map(float, buff_string))
else:
buff = buff_string
return buff
def uart_set(
self,
speed: int = 9600,
bits: str = "CS8",
parity: str = "NONE",
stop: int = 1,
timeout: int = 0
) -> None:
"""
Configures the provided settings for UART.
Args:
speed (int, optional): Baud rate/speed of UART connection (bits per second). Defaults to 9600.
bits (str, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8".
parity (str, optional): Parity (NONE, EVEN, ODD, MARK, SPACE). Defaults to "NONE".
stop (int, optional): Number of stop bits (1 or 2). Defaults to 1.
timeout (int, optional): Timeout for reading from UART (in 1/10 of seconds) {0,...255}. Defaults to 0.
"""
# Constants
speed_list = [1200,2400,4800,9600,19200,38400,57600,115200,230400,576000,921000,1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000]
database_list = ["CS6","CS7","CS8"]
parity_list = ["NONE","EVEN","ODD","MARK","SPACE"]
# Input Limits Check
try:
assert speed in speed_list
except AssertionError as speed_err:
raise ValueError(f"{speed} is not a defined speed for UART connection. Please check the speed table.") from speed_err
try:
assert bits in database_list
except AssertionError as bits_err:
raise ValueError(f"{bits} is not a defined character size.") from bits_err
try:
assert parity in parity_list
except AssertionError as parity_err:
raise ValueError(f"{parity} is not a defined parity.") from parity_err
try:
assert stop in (1,2)
except AssertionError as stop_err:
raise ValueError("The number of stop bits can only be 1 or 2") from stop_err
try:
assert 0 <= timeout <= 255
except AssertionError as timeout_err:
raise ValueError(f"Timeout {timeout} is out of range [0, 255]") from timeout_err
# Configuring UART
self.tx_txt("UART:INIT")
self.tx_txt(f"UART:SPEED {speed}")
self.tx_txt(f"UART:BITS {bits.upper()}")
self.tx_txt(f"UART:STOPB STOP{stop}")
self.tx_txt(f"UART:PARITY {parity.upper()}")
self.tx_txt(f"UART:TIMEOUT {timeout}")
self.tx_txt("UART:SETUP")
print("UART is configured")
def uart_get_settings(
self
) -> str:
"""
Retrieves the settings from Red Pitaya, prints them in console and returns
them as an array with the following sequence:
[speed, databits, stopbits, parity, timeout]
"""
# Configuring UART
settings = []
settings.append(self.txrx_txt("UART:SPEED?"))
settings.append(self.txrx_txt("UART:BITS?"))
stop = self.txrx_txt("UART:STOPB?")
if stop == "STOP1":
settings.append("1")
elif stop == "STOP2":
settings.append("2")
settings.append(self.txrx_txt("UART:PARITY?"))
settings.append(self.txrx_txt("UART:TIMEOUT?"))
print(f"Baudrate/Speed: {settings[0]}")
print(f"Databits: {settings[1]}")
print(f"Stopbits: {settings[2]}")
print(f"Parity: {settings[3]}")
print(f"Timeout (0.1 sec): {settings[4]}")
return settings
def uart_write_string(
self,
string: str,
word_length: bool = False
) -> None:
"""
Sends a string of characters through UART.
Args:
string (str, optional): String that will be sent.
word_length (bool, optional): Set to True if UART word lenght is set to 7 (ASCII) or
False if UART word length is set to 8 (UTF-8). Defaults to False.
"""
if word_length:
# word length 7 / ASCII
code = "ascii"
else:
# word length 8 / UTF-8
code = "utf-8"
# transforming and writing to UART
arr = ',#H'.join(format(x, 'X') for x in bytearray(string, f"{code}"))
self.tx_txt(f"UART:WRITE{len(string)} #H{arr}")
print("String sent")
def uart_read_string(
self,
length: int
) -> str:
"""
Reads a string of data from UART and decodes it from ASCII to string.
Args:
length (int): Length of data to read from UART.
Returns:
str: Read data in string format.
"""
# Check for errors
try:
assert length > 0
except AssertionError as length_err:
raise ValueError("Length must be greater than 0.") from length_err
self.tx_txt(f"UART:READ{length}")
res = self.rx_txt()
res = res.strip('{}\n\r').replace(" ", "").split(',')
string = "".join(chr(int(x)) for x in res) # int(x).decode("utf8")
return string
def spi_set(
self,
spi_mode: str = None,
cs_mode: str = None,
speed: int = None,
word_len: int = None
) -> None:
"""
Configures the provided settings for SPI.
Args:
spi_mode (str, optional): Sets the mode for SPI; - LISL (Low Idle level, Sample Leading edge)
- LIST (Low Idle level, Sample Trailing edge)
- HISL (High Idle level, Sample Leading edge)
- HIST (High Idle level, Sample Trailing edge)
Defaults to LISL.
cs_mode (str, optional): Sets the mode for CS: - NORMAL (After message transmission, CS => HIGH)
- HIGH (After message transmission, CS => LOW)
Defaults to NORMAL.
speed (int, optional): Sets the speed of the SPI connection. Defaults to 5e7.
word_len (int, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8".
"""
# Constants
speed_max_limit = 100e6
speed_min_limit = 1
cs_mode_list = ["NORMAL","HIGH"]
order_list = ["MSB","LSB"]
spi_mode_list = ["LISL","LIST","HISL","HIST"]
bits_min_limit = 7
# Input Limits Check
try:
assert spi_mode.upper() in spi_mode_list
except AssertionError as spi_mode_err:
raise ValueError(f"{spi_mode} is not a defined SPI mode.") from spi_mode_err
try:
assert cs_mode.upper() in cs_mode_list
except AssertionError as cs_err:
raise ValueError(f"{cs_mode} is not a defined CS mode.") from cs_err
try:
assert speed_min_limit <= speed <= speed_max_limit
except AssertionError as speed_err:
raise ValueError(f"{speed} is out of range [{speed_min_limit},{speed_max_limit}].") from speed_err
try:
assert word_len >= bits_min_limit
except AssertionError as bits_err:
raise ValueError(f"Word length must be greater than {bits_min_limit}. Current word length: {word_len}") from bits_err
# Configuring SPI
self.tx_txt(f"SPI:SET:MODE {spi_mode.upper()}")
self.tx_txt(f"SPI:SET:CSMODE {cs_mode.upper()}")
self.tx_txt(f"SPI:SET:SPEED {speed}")
self.tx_txt(f"SPI:SET:WORD {word_len}")
self.tx_txt("SPI:SET:SET")
print("SPI is configured")
def spi_get_settings(
self
) -> str:
"""
Retrieves the SPI settings from Red Pitaya, prints them in console and returns
them as an array with the following sequence:
[mode, csmode, speed, word_len, msg_size]
"""
# Configuring SPI
self.tx_txt("SPI:SET:GET")
settings = []
settings.append(self.txrx_txt("SPI:SET:MODE?"))
settings.append(self.txrx_txt("SPI:SET:CSMODE?"))
settings.append(self.txrx_txt("SPI:SET:SPEED?"))
settings.append(self.txrx_txt("SPI:SET:WORD?"))
settings.append(self.txrx_txt("SPI:MSG:SIZE?"))
print(f"SPI mode: {settings[0]}")
print(f"CS mode: {settings[1]}")
print(f"Speed: {settings[2]}")
print(f"Word length: {settings[3]}")
print(f"Message queue length: {settings[4]}")
return settings
# IEEE Mandated Commands
def cls(self):
"""Clear Status Command"""
return self.tx_txt('*CLS')
def ese(self, value: int):
"""Standard Event Status Enable Command"""
return self.tx_txt(f'*ESE {value}')