From 2ac0db880ac93672f2fbbed891bb4f7c284a14ff Mon Sep 17 00:00:00 2001 From: npeard Date: Thu, 16 May 2024 16:39:13 -0700 Subject: [PATCH] tested scope acquisition from Red Pitaya --- acquire.py | 53 +++ blink.py | 25 ++ redpitaya_scpi.py | 1045 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1123 insertions(+) create mode 100644 acquire.py create mode 100644 blink.py create mode 100644 redpitaya_scpi.py diff --git a/acquire.py b/acquire.py new file mode 100644 index 0000000..699b3ef --- /dev/null +++ b/acquire.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 + +import sys +import time +import matplotlib.pyplot as plt +import redpitaya_scpi as scpi + +IP = 'rp-f0c04a.local' +rp_s = scpi.scpi(IP) +print('Connected to ' + IP) + +wave_form = 'sine' +freq = 1000000 +ampl = 1 + +# Reset Generation and Acquisition +rp_s.tx_txt('GEN:RST') +rp_s.tx_txt('ACQ:RST') + +##### Generation ##### +# Function for configuring Source +rp_s.sour_set(1, wave_form, ampl, freq, burst=True, ncyc=3) + +##### Acqusition ##### +# Function for configuring Acquisition +rp_s.acq_set(dec=1, trig_lvl=0, trig_delay=0) + +rp_s.tx_txt('ACQ:START') +time.sleep(1) +rp_s.tx_txt('ACQ:TRig AWG_PE') +rp_s.tx_txt('OUTPUT1:STATE ON') +time.sleep(1) + +rp_s.tx_txt('SOUR1:TRig:INT') + +# Wait for trigger +while 1: + rp_s.tx_txt('ACQ:TRig:STAT?') # Get Trigger Status + if rp_s.rx_txt() == 'TD': # Triggerd? + break + +## ! OS 2.00 or higher only ! ## +while 1: + rp_s.tx_txt('ACQ:TRig:FILL?') + if rp_s.rx_txt() == '1': + break + +# Read data and plot +# function for Data Acquisition +data = rp_s.acq_data(1, convert= True) + +plt.plot(data) +plt.show() \ No newline at end of file diff --git a/blink.py b/blink.py new file mode 100644 index 0000000..4d21810 --- /dev/null +++ b/blink.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +import sys +import time +import redpitaya_scpi as scpi + +IP = 'rp-f0c04a.local' +rp_s = scpi.scpi(IP) + +if (len(sys.argv) > 2): + led = int(sys.argv[2]) +else: + led = 0 + +print ("Blinking LED["+str(led)+"]") + +period = 1 # seconds + +while 1: + time.sleep(period/2.0) + rp_s.tx_txt('DIG:PIN LED' + str(led) + ',' + str(1)) + time.sleep(period/2.0) + rp_s.tx_txt('DIG:PIN LED' + str(led) + ',' + str(0)) + +rp_s.close() \ No newline at end of file diff --git a/redpitaya_scpi.py b/redpitaya_scpi.py new file mode 100644 index 0000000..43027bc --- /dev/null +++ b/redpitaya_scpi.py @@ -0,0 +1,1045 @@ +"""SCPI access to Red Pitaya.""" + +import socket +import struct +import numpy as np + +__author__ = "Luka Golinar, Iztok Jeras, Miha Gjura" +__copyright__ = "Copyright 2023, Red Pitaya" + +class scpi (object): + """SCPI class used to access Red Pitaya over an IP network.""" + delimiter = '\r\n' + + def __init__(self, host, timeout=None, port=5000): + """Initialize object and open IP connection. + Host IP should be a string in parentheses, like '192.168.1.100'. + """ + self.host = host + self.port = port + self.timeout = timeout + + try: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + if timeout is not None: + self._socket.settimeout(timeout) + + self._socket.connect((host, port)) + + except socket.error as e: + print('SCPI >> connect({!s:s}:{:d}) failed: {!s:s}'.format(host, port, e)) + + def __del__(self): + if self._socket is not None: + self._socket.close() + self._socket = None + + def close(self): + """Close IP connection.""" + self.__del__() + + def rx_txt(self, chunksize = 4096): + """Receive text string and return it after removing the delimiter.""" + msg = '' + while 1: + chunk = self._socket.recv(chunksize).decode('utf-8') # Receive chunk size of 2^n preferably + msg += chunk + if (len(msg) >= 2 and msg[-2:] == self.delimiter): + return msg[:-2] + + def rx_txt_check_error(self, chunksize = 4096,stop = True): + msg = self.rx_txt(chunksize) + self.check_error(stop) + return msg + + def rx_arb(self): + """ Recieve binary data from scpi server""" + numOfBytes = 0 + data=b'' + while len(data) != 1: + data = self._socket.recv(1) + if data != b'#': + return False + data=b'' + + while len(data) != 1: + data = self._socket.recv(1) + numOfNumBytes = int(data) + if numOfNumBytes <= 0: + return False + data=b'' + + while len(data) != numOfNumBytes: + data += (self._socket.recv(1)) + numOfBytes = int(data) + data=b'' + + while len(data) < numOfBytes: + r_size = min(numOfBytes - len(data),4096) + data += (self._socket.recv(r_size)) + return data + + def rx_arb_check_error(self,stop = True): + data = self.rx_arb() + self.check_error(stop) + return data + + def tx_txt(self, msg): + """Send text string ending and append delimiter.""" + return self._socket.sendall((msg + self.delimiter).encode('utf-8')) # was send(().encode('utf-8')) + + def tx_txt_check_error(self, msg,stop = True): + self.tx_txt(msg) + self.check_error(stop) + + def txrx_txt(self, msg): + """Send/receive text string.""" + self.tx_txt(msg) + return self.rx_txt() + + def check_error(self,stop = True): + res = int(self.stb_q()) + if (res & 0x4): + while 1: + err = self.err_n() + if (err.startswith('0,')): + break + print(err) + n = err.split(",") + if (len(n) > 0 and stop and int(n[0]) > 9500): + exit(1) + +# SCPI command functions + + def sour_set( + self, + chan: int, + func: str = "sine", + volt: float = 1, + freq: float = 1000, + offset: float = 0, + phase: float = 0, + dcyc: float = 0.5, + data: np.ndarray = None, + burst: bool = False, + ncyc: int = 1, + nor: int = 1, + period: int = None, + trig: str = "int", + sdrlab: bool = False, + siglab: bool = False, + ) -> None: + + """ + Set the parameters for signal generator on one channel. + + Parameters + ----------- + chan (int) : + Output channel (either 1 or 2). + func (str, optional) : + Waveform of the signal (SINE, SQUARE, TRIANGLE, SAWU, + SAWD, PWM, ARBITRARY, DC, DC_NEG). + Defaults to `sine`. + volt (int, optional) : + Amplitude of signal {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12. + Defaults to 1. + freq (int, optional) : + Frequency of signal. Not relevant if 'func' is "DC" or "DC_NEG". + Defaults to 1000. + offset (int, optional) : + Signal offset {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12. + Defaults to 0. + phase (int, optional) : + Phase of signal {-360, 360} degrees. + Defaults to 0. + dcyc (float, optional) : + Duty cycle, where 1 corresponds to 100%. + Defaults to 0.5. + data (ndarray, optional) : + Numpy ``ndarray`` of max 16384 values, floats in range {-1,1} + (or {-5,5} for SIGNALlab). + Define the custom waveform if "func" is "ARBITRARY". + Defaults to `None`. + burst (bool, optional) : + Enable/disable Burst mode. (`True` - BURST, `False` - CONINUOUS) + Generate "nor" number of "ncyc" periods with total time "period". + Defaults to `False`. + ncyc (int, optional) : + Number of periods in one burst. + Defaults to 1. + nor (int, optional) : + Number of repeated bursts. + Defaults to 1. + period (_type_, optional) : + Total time of one burst in µs {1, 5e8}. Includes the signal and delay. + Defaults to `None`. + trig (str, optional): + Trigger source (EXT_PE, EXT_NE, INT, GATED). + Defaults to `int` (internal). + sdrlab (bool, optional): + `True` if operating with SDRlab 122-16. + Defaults to `False`. + siglab (bool, optional): + `True` if operating with SIGNALlab 250-12. + Defaults to `False`. + + The settings will work on any Red Pitaya board. If operating on a board + other than STEMlab 125-14, change the bool value of the appropriate + parameter to true (sdrlab, siglab) + + Raises + ------ + + Raises errors if the input parameters are out of range. + + """ + + ### Constants ### + waveform_list = ["SINE","SQUARE","TRIANGLE","SAWU","SAWD","PWM","ARBITRARY","DC","DC_NEG"] + trigger_list = ["EXT_PE","EXT_NE","INT","GATED"] + buff_size = 16384 + + ### Limits ### + volt_lim = 1 + offs_lim = 1 + phase_lim = 360 + freq_up_lim = 50e6 # 50 MHz + freq_down_lim = 0 + + if siglab: + volt_lim = 5 + offs_lim = 5 + elif sdrlab: + freq_down_lim = 300e3 # 300 kHz + + + + ### CHECK FOR ERRORS ### + + try: + assert chan in (1,2) + except AssertionError as channel_err: + raise ValueError("Channel needs to be either 1 or 2") from channel_err + + try: + assert func.upper() in waveform_list + except AssertionError as waveform_err: + raise ValueError(f"{func.upper()} is not a defined waveform") from waveform_err + + try: + assert freq_down_lim < freq <= freq_up_lim + except AssertionError as freq_err: + raise ValueError(f"Frequency is out of range {freq_down_lim, freq_up_lim} Hz") from freq_err + + try: + assert abs(volt) <= volt_lim + except AssertionError as ampl_err: + raise ValueError(f"Amplitude is out of range {-volt_lim, volt_lim} V") from ampl_err + + try: + assert abs(offset) <= offs_lim + except AssertionError as offs_err: + raise ValueError(f"Offset is out of range {-offs_lim, offs_lim} V") from offs_err + + try: + assert 0 <= dcyc <= 1 + except AssertionError as dcyc_err: + raise ValueError(f"Duty Cycle is out of range {0, 1}") from dcyc_err + + try: + assert abs(phase) <= phase_lim + except AssertionError as phase_err: + raise ValueError(f"Phase is out of range {-phase_lim, phase_lim} deg") from phase_err + + if data is not None: + + try: + assert data.shape[0] <= buff_size + except AssertionError as data_err: + raise ValueError(f"Data array is too long. Max length is {buff_size}") from data_err + + #try: + # assert max(absolute(data)) <= volt_lim + #except AssertionError: + # raise ValueError(f"Amplitude of data is out of range {-volt_lim, volt_lim}") + + try: + assert ncyc >= 1 + except AssertionError as ncyc_err: + raise ValueError("NCYC minimum is 1") from ncyc_err + + try: + assert nor >= 1 + except AssertionError as nor_err: + raise ValueError("NOR minimum is 1") from nor_err + + if period is not None: + try: + assert period >= 1 + except AssertionError as period_err: + raise ValueError("Minimal burst period 1 µs") from period_err + + try: + assert trig.upper() in trigger_list + except AssertionError as trig_err: + raise ValueError(f"{trig.upper()} is not a defined trigger source") from trig_err + + try: + assert not((siglab is True) and (sdrlab is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option. 'siglab' and 'sdrlab' cannot be true at the same time.") from board_err + + + + ### Variables ### + wf_data = [] + + + ### SEND COMMANDS TO RP ### + self.tx_txt(f"SOUR{chan}:FUNC {func.upper()}") + self.tx_txt(f"SOUR{chan}:VOLT {volt}") + + if func.upper() not in waveform_list[7:9]: + self.tx_txt(f"SOUR{chan}:FREQ:FIX {freq}") + + self.tx_txt(f"SOUR{chan}:VOLT:OFFS {offset}") + self.tx_txt(f"SOUR{chan}:PHAS {phase}") + + if func.upper() == "PWM": + self.tx_txt(f"SOUR{chan}:DCYC {dcyc}") + + if (data is not None) and (func.upper() == "ARBITRARY"): + for n in data: + wf_data.append(f"{n:.5f}") + cust_wf = ", ".join(map(str, wf_data)) + + self.tx_txt(f"SOUR{chan}:TRAC:DATA:DATA {cust_wf}") + + if burst: + self.tx_txt(f"SOUR{chan}:BURS:STAT BURST") + self.tx_txt(f"SOUR{chan}:BURS:NCYC {ncyc}") + self.tx_txt(f"SOUR{chan}:BURS:NOR {nor}") + + if period is not None: + self.tx_txt(f"SOUR{chan}:BURS:INT:PER {period}") + else: + self.tx_txt(f"SOUR{chan}:BURS:STAT CONTINUOUS") + + self.tx_txt(f"SOUR{chan}:TRIG:SOUR {trig.upper()}") + + #print(f"SOUR{chan} set successfully") + + def acq_set( + self, + dec: int = 1, + trig_lvl: float = 0, + trig_delay: int = 0, + trig_delay_ns: bool = False, + units: str = None, + sample_format: str = None, + averaging: bool = True, + gain: list = None, # 2 channels (double the length if 4-input) + coupling: list = None, # 2 channels + ext_trig_lvl: float = 0, + siglab: bool = False, + input4: bool = False + ) -> None: + + """ + + Set the parameters for signal acquisition. + + Parameters + ----------- + + dec (int, optional) : + Decimation (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, + 4096, 8192, 16384, 32768, 65536) + Defaults to 1. + trig_lvl (float, optional) : + Trigger level in Volts. {-1, 1} Volts on LV gain or {-20, 20} Volts on HV gain. + Defaults to 0. + trig_delay (int, optional) : + Trigger delay in samples (if trig_delay_ns = True, then the delay is in ns) + Defaults to 0. + trig_delay_ns (bool, optional) : + Change the trigger delay to nanoseconds instead of samples. + Defaults to False. + units (str, optional) : + The units in which the acquired data will be returned. + Defaults to "VOLTS". + sample_format (str, optional) : + The format in which the acquired data will be returned. + Defaults to "ASCII". + averaging (bool, optional) : + Enable/disable averaging. When True, if decimation is higher than 1, + each returned sample is the average of the taken samples. For example, + if dec = 4, the returned sample will be the average of the 4 decimated + samples. + Defaults to True. + gain (list(str), optional) : + HV / LV - (High (1:20) or Low (1:1 attenuation)) + The first element in list applies to the SOUR1 and the second to SOUR2. + Refers to jumper settings on Red Pitaya fast analog inputs. + (1:20 and 1:1 attenuator for SIGNALlab 250-12) + Defaults to ["LV","LV"]. + coupling (list(str), optional) : + AC / DC - coupling mode for fast analog inputs. + The first element in list applies to the SOUR1 and the second to SOUR2. + (Only SIGNALlab 250-12) + Defaults to ["DC","DC"]. + ext_trig_lvl (float, optional) : + Set trigger external level in V. + (Only SIGNALlab 250-12) + Defaults to 0. + siglab (bool, optional) : + Set to True if operating with SIGNALlab 250-12. + Defaults to False. + input4 (bool, optional) : + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + The settings will work on any Red Pitaya board. If operating on SIGNALlab 250-12 + or STEMlab 125-14 4-Input change the bool value of the appropriate parameter to + true (siglab, input4). This will change the available range of input parameters. + + Raises + ------ + + Raises errors if the input parameters are out of range. + + """ + + ### Constants ### + #decimation_list = [1,2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536] + gain_list = ["LV","HV"] + coupling_list = ["DC","AC"] + units_list = ["RAW","VOLTS"] + format_list = ["BIN", "ASCII"] + + ### Limits ### + if input4: # Set number of channels + n = 4 + else: + n = 2 + trig_lvl_lim = 1.0 + gain_lvl = "LV" + + if gain is not None: + for i in gain: + if i.upper() == "HV": + trig_lvl_lim = 20.0 + gain_lvl = "HV" + + ### CHECK FOR ERRORS ### + #try: + # assert dec in decimation_list + #except AssertionError as dec_err: + # raise ValueError(f"Decimation needs to be a power of 2 {1, 65536}") + + try: + assert abs(trig_lvl) <= trig_lvl_lim + except AssertionError as trig_err: + raise ValueError(f"Trigger level out of range {-trig_lvl_lim, trig_lvl_lim} V", + f"for gain {gain_lvl}") from trig_err + + try: + assert trig_delay >= 0 + except AssertionError as trig_dly_err: + raise ValueError("Trigger delay cannot be less that 0") from trig_dly_err + + if units is not None: + try: + assert units.upper() in units_list + except AssertionError as unit_err: + raise ValueError(f"{units.upper()} is not a defined unit") from unit_err + + if sample_format is not None: + try: + assert sample_format.upper() in format_list + except AssertionError as format_err: + raise ValueError(f"{sample_format.upper()} is not a defined format") from format_err + + if gain is not None: + try: + assert (gain[0].upper() in gain_list) and (gain[1].upper() in gain_list) + except AssertionError as gain_err: + raise ValueError(f"{gain[0].upper()} or {gain[1].upper()} is not a defined gain") from gain_err + + if siglab and coupling is not None: + try: + assert (coupling[0].upper() in coupling_list) and (coupling[1].upper() in coupling_list) + except AssertionError as coupling_err: + raise ValueError(f"{coupling[0].upper()} or {coupling[1].upper()}", + "is not a defined coupling") from coupling_err + try: + assert abs(ext_trig_lvl) <= trig_lvl_lim + except AssertionError as ext_trig_err: + raise ValueError("External trigger level out of range", + f"{-trig_lvl_lim, trig_lvl_lim} V") from ext_trig_err + + try: + assert not((siglab is True) and (input4 is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option.", + "'siglab' and 'input4' cannot be true at the same time.") from board_err + + + ### SEND COMMANDS TO RP ### + self.tx_txt(f"ACQ:DEC {dec}") + + if averaging: + self.tx_txt("ACQ:AVG ON") + else: + self.tx_txt("ACQ:AVG OFF") + + if trig_delay_ns: + self.tx_txt(f"ACQ:TRIG:DLY:NS {trig_delay}") + else: + self.tx_txt(f"ACQ:TRIG:DLY {trig_delay}") + + if units is not None: + self.tx_txt(f"ACQ:DATA:UNITS {units.upper()}") + if sample_format is not None: + self.tx_txt(f"ACQ:DATA:FORMAT {sample_format.upper()}") + + if gain is not None: + for i in range(n): + self.tx_txt(f"ACQ:SOUR{i+1}:GAIN {gain[i].upper()}") + + self.tx_txt(f"ACQ:TRIG:LEV {trig_lvl}") + + if siglab and coupling is not None: + for i in range(n): + self.tx_txt(f"ACQ:SOUR{i+1}:COUP {coupling[i].upper()}") + + self.tx_txt(f"ACQ:TRIG:EXT:LEV {ext_trig_lvl}") + + #print("ACQ set successfully") + + def get_settings( + self, + siglab: bool = False, + input4: bool = False + ) -> str: + """ + + Retrieves the settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [decimation, avearge, trig_dly, trig_dly_ns, trig_lvl, buf_size, gain_ch1, gain_ch2, coup_ch1, coup_ch2, ext_trig_lvl] + , gain_ch3, gain_ch4 + Decimation - Current decimation + Average - Current averaging status (ON/OFF) + Trig_dly - Current trigger delay in samples + Trig_dly_ns - Current trigger delay in nanoseconds + Trig_lvl - Current triger level in Volts + Buf_size - Buffer size + Gain_ch1-4 - Current gain on channels (CH3 and CH4 STEMlab 125-14 4-Input only) + Coup_ch1/2 - Current coupling mode for both channels (AC/DC) (SIGNALlab only) + Ext_trig_lvl - Current external trigger level in Volts (SIGNALlab only) + + Note: The last three array elements won't exist if siglab = False + Gain of channels 3 and 4 only if input4 = True + + Parameters + ---------- + siglab (bool, optional): + Set to True if operating with SIGNALlab 250-12. + Defaults to False. + input4 (bool, optional): + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + """ + + try: + assert not((siglab is True) and (input4 is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option. 'siglab' and 'input4' cannot be true at the same time.") from board_err + + + settings = [] + + if input4: # Set number of channels + n = 4 + else: + n = 2 + + settings.append(self.txrx_txt("ACQ:DEC?")) + settings.append(self.txrx_txt("ACQ:AVG?")) + settings.append(self.txrx_txt("ACQ:TRIG:DLY?")) + settings.append(self.txrx_txt("ACQ:TRIG:DLY:NS?")) + settings.append(self.txrx_txt("ACQ:TRIG:LEV?")) + settings.append(self.txrx_txt("ACQ:BUF:SIZE?")) + + for i in range(n): + settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:GAIN?")) + + if siglab: + for i in range(2): + settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:COUP?")) + + settings.append(self.txrx_txt("ACQ:TRIG:EXT:LEV?")) + + + print(f"Decimation: {settings[0]}") + print(f"Averaging: {settings[1]}") + print(f"Trigger delay (samples): {settings[2]}") + print(f"Trigger delay (ns): {settings[3]}") + print(f"Trigger level (V): {settings[4]}") + print(f"Buffer size: {settings[5]}") + + if input4: + print(f"Gain CH1/CH2/CH3/CH4: {settings[6]}, {settings[7]}, {settings[8]}, {settings[9]}") + else: + print(f"Gain CH1/CH2: {settings[6]}, {settings[7]}") + + if siglab: + print(f"Coupling CH1/CH2: {settings[8]}, {settings[9]}") + print(f"External trigger level (V): {settings[10]}") + + return settings + + def acq_data( + self, + chan: int, + start: int = None, + end: int = None, + num_samples: int = None, + old: bool = False, + lat: bool = False, + binary: bool = False, + convert: bool = False, + input4: bool = False + ) -> list: + """ + Returns the acquired data on a channel from the Red Pitaya, with the following options (for a specific channel): + - only channel => returns the whole buffer + - start and end => returns the samples between them + - start and n => returns 'n' samples from the start position + - old and n => returns 'n' oldest samples in the buffer + - lat and n => returns 'n' latest samples in the buffer + + Parameters + ---------- + chan (int) : + Input channel (either 1 or 2). + (1-4 for STEMlab 125-14 4-Input) + start (int, optional): + Start position of acquired data in the buffer {0,1,...16384} + Defaults to None. + end (int, optional): + End position of acquired data in the buffer {0,1,...16384} + Defaults to None. + n (int, optional): + Number of samples read. + old (bool, optional): + Read oldest samples in the buffer. + lat (bool, optional): + Read latest samples in the buffer. + bin (bool, optional): + Set to True if working with Binary data. + Defaults to False. + convert (bool, optional): + Set to True to convert data to a list of floats (VOLTS) or integers (RAW). + Otherwise returns a list of str (VOLTS) or int (RAW). + Defaults to False. + input4 (bool, optional) : + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + + Raises + ------ + + Raises errors if the input parameters do not match one of the options. + + """ + + low_lim = 0 + up_lim = 16384 + + # Check input data for errors + if input4: + try: + assert chan in (1,2,3,4) + except AssertionError as chanel_err: + raise ValueError("Channel needs to be either 1, 2, 3 or 4") from chanel_err + else: + try: + assert chan in (1,2) + except AssertionError as chanel_err: + raise ValueError("Channel needs to be either 1 or 2") from chanel_err + + try: + assert not((old is True) and (lat is True)) + except AssertionError as arg_err: + raise ValueError("Please select only one. 'old' and 'lat' cannot be True at the same time.") from arg_err + + if start is not None: + try: + assert 16384 >= start >= 0 + except AssertionError as start_err: + raise ValueError(f"Start position out of range {low_lim, up_lim}") from start_err + + if end is not None: + try: + assert 16384 >= end >= 0 + except AssertionError as end_err: + raise ValueError(f"End position out of range {low_lim, up_lim}") from end_err + + if num_samples is not None: + try: + assert 16384 >= num_samples >= 0 + except AssertionError as sample_err: + raise ValueError(f"Sample number out of range {low_lim, up_lim}") from sample_err + + # Get data type from Red Pitaya + units = self.txrx_txt('ACQ:DATA:UNITS?') + # format = self.txrx_txt("ACQ:DATA:FORMAT?") + + + # Determine the output data + if(start is not None) and (end is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:END? {start},{end}") + + elif(start is not None) and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:N? {start},{num_samples}") + + elif old and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:OLD:N? {num_samples}") + + elif lat and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:LAT:N? {num_samples}") + + else: + self.tx_txt(f"ACQ:SOUR{chan}:DATA?") + + # Convert data + if binary: + buff_byte = self.rx_arb() + + if convert: + if units == "VOLTS": + buff = [struct.unpack('!f',bytearray(buff_byte[i:i+4]))[0] for i in range(0, len(buff_byte), 4)] + elif units == "RAW": + buff = [struct.unpack('!h',bytearray(buff_byte[i:i+2]))[0] for i in range(0, len(buff_byte), 2)] + else: + buff = buff_byte + else: + buff_string = self.rx_txt() + + if convert: + buff_string = buff_string.strip('{}\n\r').replace(" ", "").split(',') + buff = list(map(float, buff_string)) + else: + buff = buff_string + + return buff + + + def uart_set( + self, + speed: int = 9600, + bits: str = "CS8", + parity: str = "NONE", + stop: int = 1, + timeout: int = 0 + ) -> None: + """ + Configures the provided settings for UART. + + Args: + speed (int, optional): Baud rate/speed of UART connection (bits per second). Defaults to 9600. + bits (str, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8". + parity (str, optional): Parity (NONE, EVEN, ODD, MARK, SPACE). Defaults to "NONE". + stop (int, optional): Number of stop bits (1 or 2). Defaults to 1. + timeout (int, optional): Timeout for reading from UART (in 1/10 of seconds) {0,...255}. Defaults to 0. + """ + + # Constants + speed_list = [1200,2400,4800,9600,19200,38400,57600,115200,230400,576000,921000,1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000] + database_list = ["CS6","CS7","CS8"] + parity_list = ["NONE","EVEN","ODD","MARK","SPACE"] + + + # Input Limits Check + try: + assert speed in speed_list + except AssertionError as speed_err: + raise ValueError(f"{speed} is not a defined speed for UART connection. Please check the speed table.") from speed_err + + try: + assert bits in database_list + except AssertionError as bits_err: + raise ValueError(f"{bits} is not a defined character size.") from bits_err + + try: + assert parity in parity_list + except AssertionError as parity_err: + raise ValueError(f"{parity} is not a defined parity.") from parity_err + + try: + assert stop in (1,2) + except AssertionError as stop_err: + raise ValueError("The number of stop bits can only be 1 or 2") from stop_err + + try: + assert 0 <= timeout <= 255 + except AssertionError as timeout_err: + raise ValueError(f"Timeout {timeout} is out of range [0, 255]") from timeout_err + + # Configuring UART + + self.tx_txt("UART:INIT") + self.tx_txt(f"UART:SPEED {speed}") + self.tx_txt(f"UART:BITS {bits.upper()}") + self.tx_txt(f"UART:STOPB STOP{stop}") + self.tx_txt(f"UART:PARITY {parity.upper()}") + self.tx_txt(f"UART:TIMEOUT {timeout}") + + self.tx_txt("UART:SETUP") + print("UART is configured") + + def uart_get_settings( + self + ) -> str: + """ + Retrieves the settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [speed, databits, stopbits, parity, timeout] + + """ + + # Configuring UART + + settings = [] + + settings.append(self.txrx_txt("UART:SPEED?")) + settings.append(self.txrx_txt("UART:BITS?")) + + stop = self.txrx_txt("UART:STOPB?") + if stop == "STOP1": + settings.append("1") + elif stop == "STOP2": + settings.append("2") + + settings.append(self.txrx_txt("UART:PARITY?")) + settings.append(self.txrx_txt("UART:TIMEOUT?")) + + print(f"Baudrate/Speed: {settings[0]}") + print(f"Databits: {settings[1]}") + print(f"Stopbits: {settings[2]}") + print(f"Parity: {settings[3]}") + print(f"Timeout (0.1 sec): {settings[4]}") + + return settings + + def uart_write_string( + self, + string: str, + word_length: bool = False + ) -> None: + """ + Sends a string of characters through UART. + + Args: + string (str, optional): String that will be sent. + word_length (bool, optional): Set to True if UART word lenght is set to 7 (ASCII) or + False if UART word length is set to 8 (UTF-8). Defaults to False. + """ + + if word_length: + # word length 7 / ASCII + code = "ascii" + else: + # word length 8 / UTF-8 + code = "utf-8" + + + # transforming and writing to UART + arr = ',#H'.join(format(x, 'X') for x in bytearray(string, f"{code}")) + self.tx_txt(f"UART:WRITE{len(string)} #H{arr}") + + print("String sent") + + def uart_read_string( + self, + length: int + ) -> str: + """ + Reads a string of data from UART and decodes it from ASCII to string. + + Args: + length (int): Length of data to read from UART. + + Returns: + str: Read data in string format. + """ + + # Check for errors + try: + assert length > 0 + except AssertionError as length_err: + raise ValueError("Length must be greater than 0.") from length_err + + self.tx_txt(f"UART:READ{length}") + res = self.rx_txt() + res = res.strip('{}\n\r').replace(" ", "").split(',') + string = "".join(chr(int(x)) for x in res) # int(x).decode("utf8") + + return string + + + def spi_set( + self, + spi_mode: str = None, + cs_mode: str = None, + speed: int = None, + word_len: int = None + ) -> None: + """ + Configures the provided settings for SPI. + + Args: + spi_mode (str, optional): Sets the mode for SPI; - LISL (Low Idle level, Sample Leading edge) + - LIST (Low Idle level, Sample Trailing edge) + - HISL (High Idle level, Sample Leading edge) + - HIST (High Idle level, Sample Trailing edge) + Defaults to LISL. + cs_mode (str, optional): Sets the mode for CS: - NORMAL (After message transmission, CS => HIGH) + - HIGH (After message transmission, CS => LOW) + Defaults to NORMAL. + speed (int, optional): Sets the speed of the SPI connection. Defaults to 5e7. + word_len (int, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8". + """ + + # Constants + speed_max_limit = 100e6 + speed_min_limit = 1 + cs_mode_list = ["NORMAL","HIGH"] + order_list = ["MSB","LSB"] + spi_mode_list = ["LISL","LIST","HISL","HIST"] + bits_min_limit = 7 + + + # Input Limits Check + + try: + assert spi_mode.upper() in spi_mode_list + except AssertionError as spi_mode_err: + raise ValueError(f"{spi_mode} is not a defined SPI mode.") from spi_mode_err + + try: + assert cs_mode.upper() in cs_mode_list + except AssertionError as cs_err: + raise ValueError(f"{cs_mode} is not a defined CS mode.") from cs_err + + try: + assert speed_min_limit <= speed <= speed_max_limit + except AssertionError as speed_err: + raise ValueError(f"{speed} is out of range [{speed_min_limit},{speed_max_limit}].") from speed_err + + try: + assert word_len >= bits_min_limit + except AssertionError as bits_err: + raise ValueError(f"Word length must be greater than {bits_min_limit}. Current word length: {word_len}") from bits_err + + + # Configuring SPI + + self.tx_txt(f"SPI:SET:MODE {spi_mode.upper()}") + self.tx_txt(f"SPI:SET:CSMODE {cs_mode.upper()}") + self.tx_txt(f"SPI:SET:SPEED {speed}") + self.tx_txt(f"SPI:SET:WORD {word_len}") + + self.tx_txt("SPI:SET:SET") + print("SPI is configured") + + def spi_get_settings( + self + ) -> str: + """ + Retrieves the SPI settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [mode, csmode, speed, word_len, msg_size] + + """ + + # Configuring SPI + + self.tx_txt("SPI:SET:GET") + settings = [] + + settings.append(self.txrx_txt("SPI:SET:MODE?")) + settings.append(self.txrx_txt("SPI:SET:CSMODE?")) + settings.append(self.txrx_txt("SPI:SET:SPEED?")) + settings.append(self.txrx_txt("SPI:SET:WORD?")) + settings.append(self.txrx_txt("SPI:MSG:SIZE?")) + + print(f"SPI mode: {settings[0]}") + print(f"CS mode: {settings[1]}") + print(f"Speed: {settings[2]}") + print(f"Word length: {settings[3]}") + print(f"Message queue length: {settings[4]}") + + return settings + + +# IEEE Mandated Commands + + def cls(self): + """Clear Status Command""" + return self.tx_txt('*CLS') + + def ese(self, value: int): + """Standard Event Status Enable Command""" + return self.tx_txt(f'*ESE {value}') + + def ese_q(self): + """Standard Event Status Enable Query""" + return self.txrx_txt('*ESE?') + + def esr_q(self): + """Standard Event Status Register Query""" + return self.txrx_txt('*ESR?') + + def idn_q(self): + """Identification Query""" + return self.txrx_txt('*IDN?') + + def opc(self): + """Operation Complete Command""" + return self.tx_txt('*OPC') + + def opc_q(self): + """Operation Complete Query""" + return self.txrx_txt('*OPC?') + + def rst(self): + """Reset Command""" + return self.tx_txt('*RST') + + def sre(self): + """Service Request Enable Command""" + return self.tx_txt('*SRE') + + def sre_q(self): + """Service Request Enable Query""" + return self.txrx_txt('*SRE?') + + def stb_q(self): + """Read Status Byte Query""" + return self.txrx_txt('*STB?') + +# :SYSTem + + def err_c(self): + """Error count.""" + return self.txrx_txt('SYST:ERR:COUN?') + + def err_n(self): + """Error next.""" + return self.txrx_txt('SYST:ERR:NEXT?')