-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from npeard/working
Add functionality to automate data collection for arbitrary drive waveform
- Loading branch information
Showing
5 changed files
with
741 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os | ||
import sys | ||
import time | ||
import matplotlib.pyplot as plt | ||
import redpitaya_scpi as scpi | ||
import numpy as np | ||
from scipy.fftpack import fft | ||
import math | ||
import util | ||
|
||
IP = 'rp-f0c04a.local' | ||
rp_s = scpi.scpi(IP) | ||
print('Connected to ' + IP) | ||
|
||
def run_one_shot(start_freq=1, end_freq=1000, decimation=8192, store_data=False, plot_data=False): | ||
"""Runs one shot of driving the speaker with a waveform and collecting the relevant data. | ||
Args: | ||
start_freq (int, optional): the lower bound of the valid frequency range. Defaults to 1. | ||
end_freq (int, optional): the upper bound of the valid frequency range. Defaults to 1000. | ||
decimation (int, optional): Decimation that determines sample rate, should be power of 2. Defaults to 8192. | ||
store_data (bool, optional): Whether to store data in h5py file. Defaults to False. | ||
plot_data (bool, optional): Whether to plot data after acquisition. Defaults to False. | ||
""" | ||
##### Create Waveform ##### | ||
|
||
N = 16384 # Number of samples in buffer | ||
SMPL_RATE_DEC1 = 125e6 # sample rate for decimation=1 in Samples/s (Hz) | ||
smpl_rate = SMPL_RATE_DEC1//decimation | ||
burst_time = N / smpl_rate | ||
|
||
wave_form = 'ARBITRARY' | ||
freq = 1 / burst_time | ||
ampl = 0.1 # good range 0-0.6V | ||
|
||
t, y = util.bounded_frequency_waveform(start_freq, end_freq, length=N, sample_rate=smpl_rate) | ||
y = util.linear_convert(y) # convert range of waveform to [-1, 1] to properly set ampl | ||
if plot_data: | ||
plt.plot(t, y) | ||
plt.show() | ||
|
||
##### Reset Generation and Acquisition ###### | ||
rp_s.tx_txt('GEN:RST') | ||
rp_s.tx_txt('ACQ:RST') | ||
|
||
##### Generation ##### | ||
# Function for configuring Source | ||
rp_s.sour_set(1, wave_form, ampl, freq, data=y) | ||
|
||
# Enable output | ||
rp_s.tx_txt('OUTPUT1:STATE ON') | ||
rp_s.tx_txt('SOUR1:TRig:INT') | ||
|
||
##### Acqusition ##### | ||
# Function for configuring Acquisition | ||
rp_s.acq_set(dec=decimation, trig_delay=0) | ||
rp_s.tx_txt('ACQ:START') | ||
time.sleep(1) | ||
rp_s.tx_txt('ACQ:TRig CH2_PE') | ||
time.sleep(1) | ||
|
||
# Wait for trigger | ||
while 1: | ||
rp_s.tx_txt('ACQ:TRig:STAT?') # Get Trigger Status | ||
print('got status') | ||
if rp_s.rx_txt() == 'TD': # Triggered? | ||
break | ||
print('triggered') | ||
## ! OS 2.00 or higher only ! ## | ||
while 1: | ||
rp_s.tx_txt('ACQ:TRig:FILL?') | ||
if rp_s.rx_txt() == '1': | ||
break | ||
|
||
##### Analaysis ##### | ||
# Read data and plot function for Data Acquisition | ||
pd_data = np.array(rp_s.acq_data(chan=1, convert=True)) # Volts | ||
speaker_data = np.array(rp_s.acq_data(chan=2, convert=True)) # Volts | ||
velocity_data, converted_signal, freq = util.velocity_waveform(speaker_data, smpl_rate) | ||
displ_data, _, _ = util.displacement_waveform(speaker_data, smpl_rate) | ||
y_vel, y_converted, _ = util.velocity_waveform(ampl*y, smpl_rate) | ||
time_data = np.linspace(0, N-1, num=N) / smpl_rate | ||
|
||
if plot_data: | ||
fig, ax = plt.subplots(nrows=3) | ||
|
||
ax[0].plot(time_data, pd_data, color='blue', label='Observed PD') | ||
ax[0].plot(time_data, speaker_data, color='black', label='Observed Drive') | ||
ax[0].plot(time_data, ampl*y, label='Drive Output') | ||
ax[0].legend() | ||
ax[0].set_ylabel('Amplitude (V)') | ||
ax[0].set_xlabel('Time (s)') | ||
|
||
ax[1].plot(freq, np.abs(fft(speaker_data)), color='black', label='Observed Drive') | ||
ax[1].plot(freq, np.abs(converted_signal), color='green', label='Expected Observed Vel') | ||
ax[1].plot(freq, np.abs(fft(ampl*y)), color='blue', label='Expected Drive') | ||
ax[1].plot(freq, np.abs(y_converted), color='orange', label='Expected Ideal Vel') | ||
ax[1].loglog() | ||
ax[1].set_xlabel('Frequency (Hz)') | ||
ax[1].set_ylabel('$|\^{V}|$') | ||
ax[1].legend() | ||
|
||
ax[2].plot(time_data, velocity_data, color='black', label='Observed Drive') | ||
ax[2].plot(time_data, y_vel, label='Drive Output') | ||
ax[2].set_ylabel('Expected Vel (Microns/s)') | ||
ax[2].set_xlabel('Time (s)') | ||
ax[2].legend() | ||
plt.tight_layout() | ||
plt.show() | ||
|
||
if store_data: | ||
# Store data in h5py file | ||
path = "/Users/angelajia/Code/College/SMI/data/" | ||
filename = "data.h5py" | ||
file_path = os.path.join(path, filename) | ||
|
||
entries = { | ||
'Time (s)': time_data, | ||
'Speaker (V)': speaker_data, | ||
'Speaker (Microns/s)': velocity_data, | ||
'PD (V)': pd_data, | ||
'Speaker (Microns)': displ_data | ||
} | ||
|
||
util.write_data(file_path, entries) | ||
|
||
##### Reset when closing program ##### | ||
rp_s.tx_txt('GEN:RST') | ||
rp_s.tx_txt('ACQ:RST') | ||
|
||
|
||
num_shots = 1 | ||
for i in range(num_shots): | ||
run_one_shot(store_data=False, plot_data=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import sys | ||
import time | ||
import matplotlib.pyplot as plt | ||
import redpitaya_scpi as scpi | ||
import numpy as np | ||
import matplotlib.ticker as ticker | ||
|
||
IP = 'rp-f0c04a.local' | ||
rp_s = scpi.scpi(IP) | ||
print('Connected to ' + IP) | ||
|
||
wave_form = "SINE" | ||
freq = 1000 | ||
ampl = 0.1 | ||
|
||
N = 16384 # Number of samples in buffer | ||
SMPL_RATE_DEC1 = 125e6 # sample rate for decimation=1 in Samples/s (Hz) | ||
decimation = 32 | ||
smpl_rate = SMPL_RATE_DEC1//decimation | ||
|
||
# Reset Generation and Acquisition | ||
rp_s.tx_txt('GEN:RST') | ||
rp_s.tx_txt('ACQ:RST') | ||
|
||
##### Generation ##### | ||
# Function for configuring Source | ||
rp_s.sour_set(1, wave_form, ampl, freq) | ||
|
||
# Enable output | ||
rp_s.tx_txt('OUTPUT1:STATE ON') | ||
rp_s.tx_txt('SOUR1:TRig:INT') | ||
|
||
##### Acqusition ##### | ||
# Function for configuring Acquisition | ||
rp_s.acq_set(dec=decimation, trig_delay=0) | ||
rp_s.tx_txt('ACQ:START') | ||
time.sleep(1) | ||
rp_s.tx_txt('ACQ:TRig CH2_PE') | ||
time.sleep(1) | ||
|
||
# Wait for trigger | ||
while 1: | ||
rp_s.tx_txt('ACQ:TRig:STAT?') # Get Trigger Status | ||
print('got status') | ||
if rp_s.rx_txt() == 'TD': # Triggerd? | ||
break | ||
print('triggered') | ||
## ! OS 2.00 or higher only ! ## | ||
while 1: | ||
rp_s.tx_txt('ACQ:TRig:FILL?') | ||
if rp_s.rx_txt() == '1': | ||
break | ||
|
||
# Read data and plot | ||
# function for Data Acquisition | ||
time_data = np.linspace(-(N-1)/2, (N-1)/2, num=N) / smpl_rate | ||
pd_data = rp_s.acq_data(chan=1, convert=True) | ||
speaker_data = rp_s.acq_data(chan=2, convert=True) | ||
|
||
print(f"vpp: {np.max(speaker_data) - np.min(speaker_data)}") | ||
|
||
fig, ax = plt.subplots(nrows=1) | ||
ax.plot(time_data, pd_data, color='blue', label='Observed PD') | ||
ax.plot(time_data, speaker_data, color='black', label='Observed Drive') | ||
ax.xaxis.set_major_locator(ticker.MultipleLocator(0.001)) | ||
ax.xaxis.set_minor_locator(ticker.MultipleLocator(0.0001)) | ||
|
||
plt.ylabel('Amplitude (V)') | ||
plt.xlabel('Time (s)') | ||
plt.show() | ||
|
||
##### Reset when closing program ##### | ||
rp_s.tx_txt('GEN:RST') | ||
rp_s.tx_txt('ACQ:RST') |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import numpy as np | ||
from scipy.fftpack import fft, ifft, fftfreq | ||
import h5py | ||
|
||
def bounded_frequency_waveform(start_frequency, end_frequency, length, sample_rate): | ||
"""Generates a random waveform within the given frequency range of a given length. | ||
Args: | ||
start_frequency (float): the lower bound of the valid frequency range | ||
end_frequency (float): the upper bound of the valid frequency range | ||
length (int): the number of values to generate | ||
sample_rate (float): the rate at which to sample values | ||
Returns: | ||
[1darr, 1darr]: the array of time points and amplitude points in time domain | ||
""" | ||
# Create an evenly spaced time array | ||
t = np.linspace(0, 1.0, length, False) # 1 second | ||
# Generate a random frequency spectrum between the start and end frequencies | ||
freq = np.linspace(0, sample_rate/2, length//2, False) | ||
spectrum = np.random.uniform(0, 1, len(freq)) | ||
spectrum = np.where((freq >= start_frequency) & (freq <= end_frequency), spectrum, 0) | ||
c = np.random.rayleigh(np.sqrt(spectrum*(freq[1]-freq[0]))) | ||
# See Phys. Rev. A 107, 042611 (2023) ref 28 for why we use the Rayleigh distribution here | ||
# Unless we use this distribution, the random noise will not be Gaussian distributed | ||
phase = np.random.uniform(-np.pi, np.pi, len(freq)) | ||
# Use the inverse Fourier transform to convert the frequency domain signal back to the time domain | ||
# Also include a zero phase component | ||
spectrum = np.hstack([c*spectrum*np.exp(1j*phase), np.zeros_like(spectrum)]) | ||
y = np.real(ifft(spectrum)) | ||
y = np.fft.fftshift(y) | ||
return t, y | ||
|
||
def linear_convert(data, new_min=-1, new_max=1): | ||
"""Linearly scales data to a new range. Default is [-1, 1]. | ||
Args: | ||
data (1darr): data to scale | ||
new_min (float, optional): new minimum value for data. Defaults to -1. | ||
new_max (float, optional): new maximum value for data. Defaults to 1. | ||
Returns: | ||
1darr: the newly scaled data | ||
""" | ||
old_min = np.min(data) | ||
old_max = np.max(data) | ||
old_range = old_max - old_min | ||
new_range = new_max - new_min | ||
return new_min + new_range * (data - old_min) / old_range | ||
|
||
def write_data(file_path, entries): | ||
"""Add data to a given dataset in 'file'. Creates dataset if it doesn't exist; | ||
otherwise, appends. | ||
Args: | ||
file_path (string): the name of the output HDF5 file to which to append data | ||
entries (dict<str, 1darr>): dictionary of column name & corresponding data | ||
""" | ||
with h5py.File(file_path, 'a') as f: | ||
for col_name, col_data in entries.items(): | ||
if col_name in f.keys(): | ||
f[col_name].resize((f[col_name].shape[0] + 1), axis=0) | ||
new_data = np.expand_dims(col_data, axis=0) | ||
f[col_name][-1:] = new_data | ||
else: | ||
f.create_dataset(col_name, | ||
data=np.expand_dims(col_data, axis=0), | ||
maxshape=(None, col_data.shape[0]), | ||
chunks=True) | ||
|
||
# Constants from calibration_rp using RPRPData.csv | ||
f0 = 257.20857316296724 | ||
Q = 15.804110908084784 | ||
k = 33.42493417407945 | ||
c = -3.208233068626455 | ||
|
||
def A(f): | ||
"""Calculates the expected displacement of the speaker at an inputted drive amplitude 'ampl' for a given frequency 'f', | ||
based on the calibration fit at 0.2Vpp. | ||
Args: | ||
f (1darr): frequencies at which to calculate expected displacement | ||
Returns: | ||
1darr: expected displacement/V_ampl in microns/V | ||
""" | ||
return (k * f0**2) / np.sqrt((f0**2 - f**2)**2 + f0**2*f**2/Q**2) | ||
|
||
def phase(f): | ||
"""Calculates the phase delay between the speaker voltage waveform and the photodiode response | ||
at a given frequency 'f'. | ||
Args: | ||
f (1darr): frequencies at which to calculate expected displacement | ||
Returns: | ||
1darr: phase in radians | ||
""" | ||
return np.arctan2(f0/Q*f, f**2 - f0**2) + c | ||
|
||
def displacement_waveform(speaker_data, sample_rate): | ||
"""Calculates the corresponding displacement waveform based on the given voltage waveform | ||
using calibration. | ||
Args: | ||
speaker_data (1darr): voltage waveform for speaker | ||
sample_rate (float): sample rate used to generate voltage waveform | ||
Returns: | ||
[1darr, 1darr, 1darr]: converted displacement waveform (microns) in time domain, | ||
converted displacement waveform in frequency domain, | ||
frequency array (Hz) | ||
""" | ||
speaker_spectrum = fft(speaker_data) | ||
n = speaker_data.size | ||
sample_spacing = 1/sample_rate | ||
freq = fftfreq(n, d=sample_spacing) # units: cycles/s = Hz | ||
|
||
# Multiply signal by transfer func in freq domain, then return to time domain | ||
converted_signal = speaker_spectrum * A(freq) * np.where(freq < 0, np.exp(-1j*phase(-freq)), np.exp(1j*phase(freq))) | ||
y = np.real(ifft(converted_signal)) | ||
|
||
return y, converted_signal, freq | ||
|
||
def velocity_waveform(speaker_data, sample_rate): | ||
"""Calculates the corresponding velocity waveform based on the given voltage waveform | ||
using calibration. | ||
Args: | ||
speaker_data (1darr): voltage waveform for speaker | ||
sample_rate (float): sample rate used to generate voltage waveform | ||
Returns: | ||
[1darr, 1darr, 1darr]: converted velocity waveform (microns/s) in time domain, | ||
converted velocity waveform in frequency domain, | ||
frequency array (Hz) | ||
""" | ||
speaker_spectrum = fft(speaker_data) | ||
n = speaker_data.size | ||
sample_spacing = 1/sample_rate | ||
freq = fftfreq(n, d=sample_spacing) # units: cycles/s = Hz | ||
|
||
# Multiply signal by transfer func in freq domain, then return to time domain | ||
converted_signal = 1j*freq * speaker_spectrum * A(freq) * np.where(freq < 0, np.exp(-1j*phase(-freq)), np.exp(1j*phase(freq))) | ||
v = np.real(ifft(converted_signal)) | ||
|
||
return v, converted_signal, freq |