From 65ef58903c665e6c13a0d8280e879b94a21e73cc Mon Sep 17 00:00:00 2001
From: Prasanna Pakkiam
Date: Thu, 8 Aug 2024 14:33:03 +1000
Subject: [PATCH] Refactored WaveformAWG and added a new GENmwSrcAWG HAL.
---
docs/Developer/Experiment Configuration.md | 2 +-
sqdtoolz/HAL/AWG.py | 414 +++++++++++----------
sqdtoolz/HAL/GENmwSrcAWG.py | 96 +++++
3 files changed, 307 insertions(+), 205 deletions(-)
create mode 100644 sqdtoolz/HAL/GENmwSrcAWG.py
diff --git a/docs/Developer/Experiment Configuration.md b/docs/Developer/Experiment Configuration.md
index 60e2cc8..c83ca87 100644
--- a/docs/Developer/Experiment Configuration.md
+++ b/docs/Developer/Experiment Configuration.md
@@ -29,7 +29,7 @@ The `TriggerOutputCompatible` class requires the implementation of:
### Trigger inputs
-HAL objects that can accept an input trigger to synchronise their output waveforms (like acquisition or AWG modules) must implement a similar couple of classes:
+HAL objects that can accept an input trigger to synchronise their output waveforms (like acquisition or AWG modules) must similarly implement a couple of classes:
- The HAL object itself must implement `TriggerInputCompatible` in order both appear in timing diagrams and to pass internal verification checks run on HAL objects by the `ExperimentConfiguration` class when using said HAL object to accept a trigger input signal.
- The individual trigger inputs are either implemented within the object itself or via a list of classes housed within the HAL object (like with AWG markers); said objects must implement `TriggerInput` to ensure that they have the right functions to interface with the `ExperimentConfiguration` class.
diff --git a/sqdtoolz/HAL/AWG.py b/sqdtoolz/HAL/AWG.py
index a57a219..04fdd72 100644
--- a/sqdtoolz/HAL/AWG.py
+++ b/sqdtoolz/HAL/AWG.py
@@ -6,10 +6,10 @@
from sqdtoolz.HAL.WaveformSegments import*
import scipy.signal
-class AWGBase:
- def __init__(self, sample_rate, total_time, global_factor):
+class AWGBase(HALbase):
+ def __init__(self, hal_name,sample_rate, total_time, global_factor):
+ HALbase.__init__(self, hal_name)
self._sample_rate = sample_rate
- self._wfm_segment_list = []
self._auto_comp = 'None'
self._auto_comp_linked = False
self._auto_comp_algos = ['None', 'Basic']
@@ -35,16 +35,7 @@ def AutoCompressionLinkChannels(self, boolVal):
@property
def Duration(self):
- if self._total_time != -1:
- return self._total_time
- full_len = 0
- for cur_seg in self._wfm_segment_list:
- full_len += cur_seg.Duration
- #If there is an elastic time-segment, then negate the -1 and add in the correct elastic time...
- elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
- if elas_seg_ind != -1:
- full_len = full_len + 1 + elastic_time
- return full_len
+ raise NotImplementedError()
@property
def SampleRate(self):
@@ -86,104 +77,8 @@ def get_valid_length_from_time(self, time_length):
ret_vals += [ num_pts / self.SampleRate ]
return ret_vals
- def _get_marker_waveform_from_segments(self, segments):
- #Temporarily set the Duration of Elastic time-segment...
- elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
- if elas_seg_ind != -1:
- self._wfm_segment_list[elas_seg_ind].Duration = elastic_time
-
- const_segs = []
- dict_segs = {}
- for cur_seg in segments:
- if type(cur_seg) == list:
- if len(cur_seg) == 0:
- const_segs += [cur_seg] #TODO: Check if is an error condition to end up here?
-
- cur_queue = cur_seg[1:]
- if len(cur_queue) == 1:
- cur_queue = cur_queue[0]
-
- if not cur_seg[0] in dict_segs:
- dict_segs[cur_seg[0]] = [cur_queue]
- else:
- dict_segs[cur_seg[0]] += [cur_queue]
- else:
- const_segs += [cur_seg]
-
- final_wfm = np.zeros(int(np.round(self.NumPts)), dtype=np.ubyte)
- cur_ind = 0
- for cur_seg in self._wfm_segment_list:
- cur_len = cur_seg.NumPts(self._sample_rate)
- if cur_len == 0:
- continue
- if cur_seg.Name in const_segs:
- final_wfm[cur_ind:cur_ind+cur_len] = 1
- elif cur_seg.Name in dict_segs: #i.e. another segment with children like WFS_Group
- final_wfm[cur_ind:cur_ind+cur_len] = cur_seg._get_marker_waveform_from_segments(dict_segs[cur_seg.Name], self._sample_rate)
- cur_ind += cur_len
-
- #Reset segment to be elastic
- if elas_seg_ind != -1:
- self._wfm_segment_list[elas_seg_ind].Duration = -1
-
- return final_wfm
-
- def _get_elastic_time_seg_params(self):
- elastic_segs = []
- for ind_wfm, cur_wfm_seg in enumerate(self._wfm_segment_list):
- if cur_wfm_seg.Duration == -1:
- elastic_segs += [ind_wfm]
- assert len(elastic_segs) <= 1, "There are too many elastic waveform segments (cannot be above 1)."
- if self._total_time == -1:
- assert len(elastic_segs) == 0, f"If the total waveform length for \"{self.Name}\" is unbound, the number of elastic segments must be zero."
- if self._total_time > 0 and len(elastic_segs) == 0:
- assert np.abs(sum([x.Duration for x in self._wfm_segment_list])-self._total_time) < 5e-15, "Sum of waveform segment durations do not match the total specified waveform group time. Consider making one of the segments elastic by setting its duration to be -1."
- #Get the elastic segment index
- if len(elastic_segs) > 0:
- elas_seg_ind = elastic_segs[0]
- fs = self.SampleRate
- #On the rare case where the segments won't fit the overall size by being too little (e.g. 2.4, 2.4, 4.2 adds up to 9, but rounding
- #the sampled segments yields 2, 2, 4 which adds up to 8) or too large (e.g. 2.6, 2.6, 3.8 adds up to 9, but rounding the sampled
- #segments yields 3, 3, 4 which adds up to 10), the elastic-time must be carefully calculated from the total number of sample points
- #rather than the durations!
- elastic_time = self._total_time*fs - (sum([self._wfm_segment_list[x].NumPts(fs) for x in range(len(self._wfm_segment_list)) if x != elas_seg_ind]))
- elastic_time = elastic_time / fs
- else:
- elas_seg_ind = -1
- elastic_time = -1
-
- return (elas_seg_ind, elastic_time)
-
def _assemble_waveform_raw(self):
- #Temporarily set the Duration of Elastic time-segment...
- elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
- if elas_seg_ind != -1:
- self._wfm_segment_list[elas_seg_ind].Duration = elastic_time
-
- num_chnls = len(self._awg_chan_list)
- final_wfms = [np.array([])]*num_chnls
- #Assemble each channel separately
- for cur_ch in range(len(self._awg_chan_list)):
- #Reset any waveform modulation commands for a new sequence construction...
- for cur_wfm_seg in self._wfm_segment_list:
- cur_wfm_seg.reset_waveform_transforms(self._lab)
- t0 = 0
- #Concatenate the individual waveform segments
- for cur_wfm_seg in self._wfm_segment_list:
- if cur_wfm_seg.NumPts(self.SampleRate) == 0:
- continue
- #TODO: Preallocate - this is a bit inefficient...
- final_wfms[cur_ch] = np.concatenate((final_wfms[cur_ch], cur_wfm_seg.get_waveform(self._lab, self._sample_rate, t0, cur_ch)))
- t0 = final_wfms[cur_ch].size
- #Scale the waveform via the global scale-factor...
- final_wfms[cur_ch] *= self._global_factor
- assert self.NumPts == final_wfms[cur_ch].size, "The sample-rate and segment-lengths yield segment points that exceed the total waveform size. Ensure that there is sufficient freedom in the elastic segment size to compensate."
-
- #Reset segment to be elastic
- if elas_seg_ind != -1:
- self._wfm_segment_list[elas_seg_ind].Duration = -1
-
- return (final_wfms, elas_seg_ind)
+ raise NotImplementedError()
def _check_changes_wfm_data(self, dict_wfm_data, final_wfm, final_mkrs):
#Check waveform equality (works for sequenced/autocompressed version as well)
@@ -303,13 +198,102 @@ def _program_auto_comp_basic_linked(self, minSize, final_wfms, final_mkrs):
seq_mkrs[cur_ch][-1] = cur_mkrs[cur_ch]
return [{'waveforms' : seq_segs[cur_ch], 'markers' : seq_mkrs[cur_ch], 'seq_ids' : seq_ids} for cur_ch in range(num_channels)]
+ def activate(self):
+ for cur_awg_chan in self._awg_chan_list:
+ cur_awg_chan.Output = True
+
+ def deactivate(self):
+ for cur_awg_chan in self._awg_chan_list:
+ cur_awg_chan.Output = False
+
+ def prepare_initial(self):
+ """
+ Method to prepare waveforms and load them into memory of AWG intsrument
+ """
+ #Prepare the waveform
+ final_wfms = self._assemble_waveform_raw()
+
+ #Ensure that the number of points in the waveform satisfies the AWG memory requirements...
+ for ind, cur_awg_chan in enumerate(self._awg_chan_list):
+ mem_params = cur_awg_chan._instr_awg.MemoryRequirements
+ num_pts = final_wfms[ind].size
+ assert num_pts >= mem_params['MinSize'], f"Waveform too short; needs to have at least {mem_params['MinSize']} points."
+ assert num_pts % mem_params['Multiple'] == 0, f"Number of points in waveform needs to be a multiple of {mem_params['Multiple']}."
+ #Assemble markers
+ final_mkrs = []
+ for ind, cur_awg_chan in enumerate(self._awg_chan_list):
+ if len(cur_awg_chan._awg_mark_list) > 0:
+ mkr_list = [x._assemble_marker_raw() for x in cur_awg_chan._awg_mark_list]
+ else:
+ mkr_list = [np.array([])]
+ final_mkrs += [mkr_list]
+ #Check if there are any changes in the waveforms - if not, then there's no need to reprogram...
+ self._dont_reprogram = True
+ for m in range(len(self._cur_prog_waveforms)):
+ if self._cur_prog_waveforms[m] is not None:
+ if not self._check_changes_wfm_data(self._cur_prog_waveforms[m], final_wfms[m], final_mkrs[m]):
+ self._dont_reprogram = False
+ break
+ else:
+ self._dont_reprogram = False
+ break
+ if self._dont_reprogram:
+ return
-class WaveformAWG(HALbase, TriggerOutputCompatible, TriggerInputCompatible, AWGBase):
+ #For the case where the sequencing table must be the same for all channels (e.g. channels on the Agilent N8241A), the sequencing is
+ #done on all the waveforms across all channels
+ if self.AutoCompressionLinkChannels and self.AutoCompression != 'None':
+ assert self.AutoCompression == 'Basic', "Only the \'Basic\' algorithm is currently supported for linked-channel auto-compression."
+ self.cur_wfms_to_commit = []
+ #Check that the memory requirements are the same across all channels (i.e. typically the same AWG)
+ dict_auto_comps = [cur_awg_chan._instr_awg.AutoCompressionSupport for cur_awg_chan in self._awg_chan_list]
+ for cur_key in dict_auto_comps[0]:
+ for cur_dict in dict_auto_comps:
+ assert cur_dict[cur_key] == dict_auto_comps[0][cur_key], f"Linked-channel auto-compression requires all channels to have the same {cur_key}."
+ #Perform BASIC compression on all the channels simultaneously...
+ dict_wfm_datas = self._program_auto_comp_basic_linked(dict_auto_comps[0]['MinSize'], final_wfms, final_mkrs)
+ for ind, cur_awg_chan in enumerate(self._awg_chan_list):
+ dict_wfm_data = dict_wfm_datas[ind]
+ seg_lens = [x.size for x in dict_wfm_data['waveforms']]
+ cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data)
+ self.cur_wfms_to_commit.append(dict_wfm_data)
+ else:
+ self.cur_wfms_to_commit = []
+ for ind, cur_awg_chan in enumerate(self._awg_chan_list):
+ mkr_list = final_mkrs[ind]
+
+ dict_auto_comp = cur_awg_chan._instr_awg.AutoCompressionSupport
+ if self.AutoCompression == 'None' or not dict_auto_comp['Supported'] or final_wfms[0].size < dict_auto_comp['MinSize']*2:
+ #UNCOMPRESSED
+ #Just program the AWG via over a single waveform
+ #Don't compress if disabled, unsupported or if the waveform size is too small to compress
+ dict_wfm_data = {'waveforms' : [final_wfms[ind]], 'markers' : [mkr_list], 'seq_ids' : [0]}
+ elif self.AutoCompression == 'Basic':
+ #BASIC COMPRESSION
+ #The basic compression algorithm is to chop up the waveform into its minimum set of bite-sized pieces and to find repetitive aspects
+ dict_wfm_data = self._program_auto_comp_basic(cur_awg_chan, final_wfms[ind], mkr_list)
+
+ seg_lens = [x.size for x in dict_wfm_data['waveforms']]
+ cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data)
+ self.cur_wfms_to_commit.append(dict_wfm_data)
+
+ def prepare_final(self):
+ """
+ Method that programs waveform onto channel
+ """
+ if not self._dont_reprogram:
+ for ind, cur_awg_chan in enumerate(self._awg_chan_list):
+ cur_awg_chan._instr_awg.program_channel(cur_awg_chan._instr_awg_chan.short_name, self.cur_wfms_to_commit[ind])
+ #Set it AFTER the programming in case there is an error etc...
+ self._cur_prog_waveforms[ind] = self.cur_wfms_to_commit[ind]
+
+
+class WaveformAWG(AWGBase, HALbase, TriggerOutputCompatible, TriggerInputCompatible):
def __init__(self, hal_name, lab, awg_channel_tuples, sample_rate, total_time=-1, global_factor = 1.0):
- HALbase.__init__(self, hal_name)
- AWGBase.__init__(self, sample_rate, total_time, global_factor)
+ AWGBase.__init__(self, hal_name, sample_rate, total_time, global_factor)
+ self._wfm_segment_list = []
if not lab._HAL_exists(hal_name):
#awg_channel_tuples is given as (instr_AWG_name, channel_name)
self._awg_chan_list = []
@@ -340,6 +324,19 @@ def fromConfigDict(cls, config_dict, lab):
config_dict["TotalTime"],
config_dict["global_factor"])
+ @property
+ def Duration(self):
+ if self._total_time != -1:
+ return self._total_time
+ full_len = 0
+ for cur_seg in self._wfm_segment_list:
+ full_len += cur_seg.Duration
+ #If there is an elastic time-segment, then negate the -1 and add in the correct elastic time...
+ elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
+ if elas_seg_ind != -1:
+ full_len = full_len + 1 + elastic_time
+ return full_len
+
def _get_child(self, tuple_name_group):
cur_name, cur_type = tuple_name_group
if cur_type == 'w':
@@ -435,6 +432,74 @@ def _get_all_trigger_inputs(self):
trig_inp_objs += cur_chan.get_all_software_triggers()
return trig_inp_objs
+ def _get_marker_waveform_from_segments(self, segments):
+ #Temporarily set the Duration of Elastic time-segment...
+ elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
+ if elas_seg_ind != -1:
+ self._wfm_segment_list[elas_seg_ind].Duration = elastic_time
+
+ const_segs = []
+ dict_segs = {}
+ for cur_seg in segments:
+ if type(cur_seg) == list:
+ if len(cur_seg) == 0:
+ const_segs += [cur_seg] #TODO: Check if is an error condition to end up here?
+
+ cur_queue = cur_seg[1:]
+ if len(cur_queue) == 1:
+ cur_queue = cur_queue[0]
+
+ if not cur_seg[0] in dict_segs:
+ dict_segs[cur_seg[0]] = [cur_queue]
+ else:
+ dict_segs[cur_seg[0]] += [cur_queue]
+ else:
+ const_segs += [cur_seg]
+
+ final_wfm = np.zeros(int(np.round(self.NumPts)), dtype=np.ubyte)
+ cur_ind = 0
+ for cur_seg in self._wfm_segment_list:
+ cur_len = cur_seg.NumPts(self._sample_rate)
+ if cur_len == 0:
+ continue
+ if cur_seg.Name in const_segs:
+ final_wfm[cur_ind:cur_ind+cur_len] = 1
+ elif cur_seg.Name in dict_segs: #i.e. another segment with children like WFS_Group
+ final_wfm[cur_ind:cur_ind+cur_len] = cur_seg._get_marker_waveform_from_segments(dict_segs[cur_seg.Name], self._sample_rate)
+ cur_ind += cur_len
+
+ #Reset segment to be elastic
+ if elas_seg_ind != -1:
+ self._wfm_segment_list[elas_seg_ind].Duration = -1
+
+ return final_wfm
+
+ def _get_elastic_time_seg_params(self):
+ elastic_segs = []
+ for ind_wfm, cur_wfm_seg in enumerate(self._wfm_segment_list):
+ if cur_wfm_seg.Duration == -1:
+ elastic_segs += [ind_wfm]
+ assert len(elastic_segs) <= 1, "There are too many elastic waveform segments (cannot be above 1)."
+ if self._total_time == -1:
+ assert len(elastic_segs) == 0, f"If the total waveform length for \"{self.Name}\" is unbound, the number of elastic segments must be zero."
+ if self._total_time > 0 and len(elastic_segs) == 0:
+ assert np.abs(sum([x.Duration for x in self._wfm_segment_list])-self._total_time) < 5e-15, "Sum of waveform segment durations do not match the total specified waveform group time. Consider making one of the segments elastic by setting its duration to be -1."
+ #Get the elastic segment index
+ if len(elastic_segs) > 0:
+ elas_seg_ind = elastic_segs[0]
+ fs = self.SampleRate
+ #On the rare case where the segments won't fit the overall size by being too little (e.g. 2.4, 2.4, 4.2 adds up to 9, but rounding
+ #the sampled segments yields 2, 2, 4 which adds up to 8) or too large (e.g. 2.6, 2.6, 3.8 adds up to 9, but rounding the sampled
+ #segments yields 3, 3, 4 which adds up to 10), the elastic-time must be carefully calculated from the total number of sample points
+ #rather than the durations!
+ elastic_time = self._total_time*fs - (sum([self._wfm_segment_list[x].NumPts(fs) for x in range(len(self._wfm_segment_list)) if x != elas_seg_ind]))
+ elastic_time = elastic_time / fs
+ else:
+ elas_seg_ind = -1
+ elastic_time = -1
+
+ return (elas_seg_ind, elastic_time)
+
def __str__(self):
ret_str = ""
ret_str += f'Name: {self.Name}\n'
@@ -510,7 +575,7 @@ def _set_current_config_waveforms(self, list_wfm_dict_config):
self._wfm_segment_list.append(new_wfm_seg)
def plot_waveforms(self, overlap=False):
- final_wfms = self._assemble_waveform_raw()[0]
+ final_wfms = self._assemble_waveform_raw()
fig = plt.figure()
if overlap:
fig, axs = plt.subplots(1)
@@ -525,99 +590,40 @@ def plot_waveforms(self, overlap=False):
axs[ind].plot(t_vals, cur_wfm)
return fig
- def activate(self):
- for cur_awg_chan in self._awg_chan_list:
- cur_awg_chan.Output = True
-
- def deactivate(self):
- for cur_awg_chan in self._awg_chan_list:
- cur_awg_chan.Output = False
-
def get_raw_waveforms(self):
- return self._assemble_waveform_raw()[0]
-
- def prepare_initial(self):
- """
- Method to prepare waveforms and load them into memory of AWG intsrument
- """
- #Prepare the waveform
- final_wfms, elastic_ind = self._assemble_waveform_raw()
-
- #Ensure that the number of points in the waveform satisfies the AWG memory requirements...
- for ind, cur_awg_chan in enumerate(self._awg_chan_list):
- mem_params = cur_awg_chan._instr_awg.MemoryRequirements
- num_pts = final_wfms[ind].size
- assert num_pts >= mem_params['MinSize'], f"Waveform too short; needs to have at least {mem_params['MinSize']} points."
- assert num_pts % mem_params['Multiple'] == 0, f"Number of points in waveform needs to be a multiple of {mem_params['Multiple']}."
-
- #Assemble markers
- final_mkrs = []
- for ind, cur_awg_chan in enumerate(self._awg_chan_list):
- if len(cur_awg_chan._awg_mark_list) > 0:
- mkr_list = [x._assemble_marker_raw() for x in cur_awg_chan._awg_mark_list]
- else:
- mkr_list = [np.array([])]
- final_mkrs += [mkr_list]
-
- #Check if there are any changes in the waveforms - if not, then there's no need to reprogram...
- self._dont_reprogram = True
- for m in range(len(self._cur_prog_waveforms)):
- if self._cur_prog_waveforms[m] is not None:
- if not self._check_changes_wfm_data(self._cur_prog_waveforms[m], final_wfms[m], final_mkrs[m]):
- self._dont_reprogram = False
- break
- else:
- self._dont_reprogram = False
- break
- if self._dont_reprogram:
- return
+ return self._assemble_waveform_raw()
+
+ def _assemble_waveform_raw(self):
+ #Temporarily set the Duration of Elastic time-segment...
+ elas_seg_ind, elastic_time = self._get_elastic_time_seg_params()
+ if elas_seg_ind != -1:
+ self._wfm_segment_list[elas_seg_ind].Duration = elastic_time
- #For the case where the sequencing table must be the same for all channels (e.g. channels on the Agilent N8241A), the sequencing is
- #done on all the waveforms across all channels
- if self.AutoCompressionLinkChannels and self.AutoCompression != 'None':
- assert self.AutoCompression == 'Basic', "Only the \'Basic\' algorithm is currently supported for linked-channel auto-compression."
- self.cur_wfms_to_commit = []
- #Check that the memory requirements are the same across all channels (i.e. typically the same AWG)
- dict_auto_comps = [cur_awg_chan._instr_awg.AutoCompressionSupport for cur_awg_chan in self._awg_chan_list]
- for cur_key in dict_auto_comps[0]:
- for cur_dict in dict_auto_comps:
- assert cur_dict[cur_key] == dict_auto_comps[0][cur_key], f"Linked-channel auto-compression requires all channels to have the same {cur_key}."
- #Perform BASIC compression on all the channels simultaneously...
- dict_wfm_datas = self._program_auto_comp_basic_linked(dict_auto_comps[0]['MinSize'], final_wfms, final_mkrs)
- for ind, cur_awg_chan in enumerate(self._awg_chan_list):
- dict_wfm_data = dict_wfm_datas[ind]
- seg_lens = [x.size for x in dict_wfm_data['waveforms']]
- cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data)
- self.cur_wfms_to_commit.append(dict_wfm_data)
- else:
- self.cur_wfms_to_commit = []
- for ind, cur_awg_chan in enumerate(self._awg_chan_list):
- mkr_list = final_mkrs[ind]
-
- dict_auto_comp = cur_awg_chan._instr_awg.AutoCompressionSupport
- if self.AutoCompression == 'None' or not dict_auto_comp['Supported'] or final_wfms[0].size < dict_auto_comp['MinSize']*2:
- #UNCOMPRESSED
- #Just program the AWG via over a single waveform
- #Don't compress if disabled, unsupported or if the waveform size is too small to compress
- dict_wfm_data = {'waveforms' : [final_wfms[ind]], 'markers' : [mkr_list], 'seq_ids' : [0]}
- elif self.AutoCompression == 'Basic':
- #BASIC COMPRESSION
- #The basic compression algorithm is to chop up the waveform into its minimum set of bite-sized pieces and to find repetitive aspects
- dict_wfm_data = self._program_auto_comp_basic(cur_awg_chan, final_wfms[ind], mkr_list)
-
- seg_lens = [x.size for x in dict_wfm_data['waveforms']]
- cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data)
- self.cur_wfms_to_commit.append(dict_wfm_data)
+ num_chnls = len(self._awg_chan_list)
+ final_wfms = [np.array([])]*num_chnls
+ #Assemble each channel separately
+ for cur_ch in range(len(self._awg_chan_list)):
+ #Reset any waveform modulation commands for a new sequence construction...
+ for cur_wfm_seg in self._wfm_segment_list:
+ cur_wfm_seg.reset_waveform_transforms(self._lab)
+ t0 = 0
+ #Concatenate the individual waveform segments
+ for cur_wfm_seg in self._wfm_segment_list:
+ if cur_wfm_seg.NumPts(self.SampleRate) == 0:
+ continue
+ #TODO: Preallocate - this is a bit inefficient...
+ final_wfms[cur_ch] = np.concatenate((final_wfms[cur_ch], cur_wfm_seg.get_waveform(self._lab, self._sample_rate, t0, cur_ch)))
+ t0 = final_wfms[cur_ch].size
+ #Scale the waveform via the global scale-factor...
+ final_wfms[cur_ch] *= self._global_factor
+ assert self.NumPts == final_wfms[cur_ch].size, "The sample-rate and segment-lengths yield segment points that exceed the total waveform size. Ensure that there is sufficient freedom in the elastic segment size to compensate."
+
+ #Reset segment to be elastic
+ if elas_seg_ind != -1:
+ self._wfm_segment_list[elas_seg_ind].Duration = -1
+
+ return final_wfms
- def prepare_final(self):
- """
- Method that programs waveform onto channel
- """
- if not self._dont_reprogram:
- for ind, cur_awg_chan in enumerate(self._awg_chan_list):
- cur_awg_chan._instr_awg.program_channel(cur_awg_chan._instr_awg_chan.short_name, self.cur_wfms_to_commit[ind])
- #Set it AFTER the programming in case there is an error etc...
- self._cur_prog_waveforms[ind] = self.cur_wfms_to_commit[ind]
class AWGOutputChannel(TriggerInput, LockableProperties):
def __init__(self, lab, instr_awg_name, channel_name, ch_index, parent_awg_waveform, sample_rate):
diff --git a/sqdtoolz/HAL/GENmwSrcAWG.py b/sqdtoolz/HAL/GENmwSrcAWG.py
new file mode 100644
index 0000000..8e67041
--- /dev/null
+++ b/sqdtoolz/HAL/GENmwSrcAWG.py
@@ -0,0 +1,96 @@
+from sqdtoolz.HAL.AWG import AWGBase, AWGOutputChannel
+from sqdtoolz.HAL.HALbase import HALbase
+from sqdtoolz.HAL.TriggerPulse import TriggerInput, TriggerInputCompatible
+import numpy as np
+
+class GENmwSrcAWGchannel(TriggerInput):
+ def __init__(self, parent):
+ self._frequency = 100e9
+ self._power = 0
+ self._parent = parent
+ self._trig_src_pol = 1
+
+ @property
+ def Frequency(self):
+ return self._frequency
+ @Frequency.setter
+ def Frequency(self, val):
+ self._frequency = val
+
+ @property
+ def Power(self):
+ return self._power
+ @Power.setter
+ def Power(self, val):
+ self._power = val
+
+ @property
+ def InputTriggerEdge(self):
+ return self._trig_src_pol
+ @InputTriggerEdge.setter
+ def InputTriggerEdge(self, pol):
+ self._trig_src_pol = pol
+
+ def get_trigger_source(self):
+ '''
+ Get the Trigger object corresponding to the trigger source.
+ '''
+ return self._trig_src_obj
+
+ def _get_instr_trig_src(self):
+ return self.get_trigger_source()
+
+ def _get_instr_input_trig_edge(self):
+ raise self.InputTriggerEdge
+
+ def _get_timing_diagram_info(self):
+ if self.Mode == 'PulseModulated':
+ return {'Type' : 'BlockShaded', 'TriggerType' : 'Gated'}
+ else:
+ return {'Type' : 'None'}
+
+
+
+class GENmwSrcAWG(AWGBase, TriggerInputCompatible):
+ def __init__(self, hal_name, lab, awg_channel_tuple, num_tones, sample_rate, total_time=0):
+ AWGBase.__init__(self, hal_name, sample_rate, total_time, global_factor=1.0)
+ assert len(awg_channel_tuple) == 2, "The argument awg_channel_tuple must be a tuple of form (instr_AWG_name, channel_name)."
+ cur_awg_name, cur_ch_name = awg_channel_tuple
+ self._awg_chan_list.append(AWGOutputChannel(lab, cur_awg_name, cur_ch_name, 0, self, sample_rate))
+
+ assert num_tones > 0, "The argument num_tones must be greater than 0."
+ self._rf_channels = [GENmwSrcAWGchannel(self) for x in range(num_tones)]
+
+ def get_rf_channel(self, chan_id):
+ assert chan_id >= 0 and chan_id < len(self._rf_channels), f"chan_id must be a valid zero-based index for the number of RF channels ({len(self._rf_channels)} in this case)."
+ return self._rf_channels[chan_id]
+
+ def _get_all_trigger_inputs(self):
+ return self._rf_channels[:]
+
+ @property
+ def Duration(self):
+ return self._total_time
+
+ def _check_triggers_same_source(self):
+ prev_trig_src = None
+ for m, cur_rf_tone in enumerate(self._rf_channels):
+ cur_trig_src = cur_rf_tone._get_instr_trig_src()
+ assert cur_trig_src != None, "The trigger sources must be set for all RF channels in a GENmwSrcAWG HAL."
+ if m == 0:
+ prev_trig_src = cur_trig_src._get_instr_trig_src()
+ assert prev_trig_src == cur_trig_src._get_instr_trig_src(), "The trigger sources for the trigger pulses driving the RF channels must be the same in a GENmwSrcAWG HAL."
+
+ def _assemble_waveform_raw(self):
+ final_wfm_raw = np.zeros(self.NumPts)
+
+ for cur_rf_ch in self._rf_channels:
+ volt_amplitude = np.sqrt(0.001 * 10**(cur_rf_ch.Power/10) * 50 * 2) #sqrt(2) for RMS...
+ edges, gates = cur_rf_ch.get_trigger_source().get_trigger_times()
+ for cur_gate in gates:
+ start_ind, end_ind = cur_gate
+ final_wfm_raw[start_ind:end_ind+1] += volt_amplitude * np.cos(2*np.pi*cur_rf_ch.Frequency * np.arange(end_ind - start_ind + 1)/self.SampleRate)
+
+ return np.array([final_wfm_raw])
+
+