diff --git a/docs/Developer/Experiment Configuration.md b/docs/Developer/Experiment Configuration.md index 60e2cc8..c83ca87 100644 --- a/docs/Developer/Experiment Configuration.md +++ b/docs/Developer/Experiment Configuration.md @@ -29,7 +29,7 @@ The `TriggerOutputCompatible` class requires the implementation of: ### Trigger inputs -HAL objects that can accept an input trigger to synchronise their output waveforms (like acquisition or AWG modules) must implement a similar couple of classes: +HAL objects that can accept an input trigger to synchronise their output waveforms (like acquisition or AWG modules) must similarly implement a couple of classes: - The HAL object itself must implement `TriggerInputCompatible` in order both appear in timing diagrams and to pass internal verification checks run on HAL objects by the `ExperimentConfiguration` class when using said HAL object to accept a trigger input signal. - The individual trigger inputs are either implemented within the object itself or via a list of classes housed within the HAL object (like with AWG markers); said objects must implement `TriggerInput` to ensure that they have the right functions to interface with the `ExperimentConfiguration` class. diff --git a/sqdtoolz/HAL/AWG.py b/sqdtoolz/HAL/AWG.py index a57a219..04fdd72 100644 --- a/sqdtoolz/HAL/AWG.py +++ b/sqdtoolz/HAL/AWG.py @@ -6,10 +6,10 @@ from sqdtoolz.HAL.WaveformSegments import* import scipy.signal -class AWGBase: - def __init__(self, sample_rate, total_time, global_factor): +class AWGBase(HALbase): + def __init__(self, hal_name,sample_rate, total_time, global_factor): + HALbase.__init__(self, hal_name) self._sample_rate = sample_rate - self._wfm_segment_list = [] self._auto_comp = 'None' self._auto_comp_linked = False self._auto_comp_algos = ['None', 'Basic'] @@ -35,16 +35,7 @@ def AutoCompressionLinkChannels(self, boolVal): @property def Duration(self): - if self._total_time != -1: - return self._total_time - full_len = 0 - for cur_seg in self._wfm_segment_list: - full_len += cur_seg.Duration - #If there is an elastic time-segment, then negate the -1 and add in the correct elastic time... - elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() - if elas_seg_ind != -1: - full_len = full_len + 1 + elastic_time - return full_len + raise NotImplementedError() @property def SampleRate(self): @@ -86,104 +77,8 @@ def get_valid_length_from_time(self, time_length): ret_vals += [ num_pts / self.SampleRate ] return ret_vals - def _get_marker_waveform_from_segments(self, segments): - #Temporarily set the Duration of Elastic time-segment... - elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() - if elas_seg_ind != -1: - self._wfm_segment_list[elas_seg_ind].Duration = elastic_time - - const_segs = [] - dict_segs = {} - for cur_seg in segments: - if type(cur_seg) == list: - if len(cur_seg) == 0: - const_segs += [cur_seg] #TODO: Check if is an error condition to end up here? - - cur_queue = cur_seg[1:] - if len(cur_queue) == 1: - cur_queue = cur_queue[0] - - if not cur_seg[0] in dict_segs: - dict_segs[cur_seg[0]] = [cur_queue] - else: - dict_segs[cur_seg[0]] += [cur_queue] - else: - const_segs += [cur_seg] - - final_wfm = np.zeros(int(np.round(self.NumPts)), dtype=np.ubyte) - cur_ind = 0 - for cur_seg in self._wfm_segment_list: - cur_len = cur_seg.NumPts(self._sample_rate) - if cur_len == 0: - continue - if cur_seg.Name in const_segs: - final_wfm[cur_ind:cur_ind+cur_len] = 1 - elif cur_seg.Name in dict_segs: #i.e. another segment with children like WFS_Group - final_wfm[cur_ind:cur_ind+cur_len] = cur_seg._get_marker_waveform_from_segments(dict_segs[cur_seg.Name], self._sample_rate) - cur_ind += cur_len - - #Reset segment to be elastic - if elas_seg_ind != -1: - self._wfm_segment_list[elas_seg_ind].Duration = -1 - - return final_wfm - - def _get_elastic_time_seg_params(self): - elastic_segs = [] - for ind_wfm, cur_wfm_seg in enumerate(self._wfm_segment_list): - if cur_wfm_seg.Duration == -1: - elastic_segs += [ind_wfm] - assert len(elastic_segs) <= 1, "There are too many elastic waveform segments (cannot be above 1)." - if self._total_time == -1: - assert len(elastic_segs) == 0, f"If the total waveform length for \"{self.Name}\" is unbound, the number of elastic segments must be zero." - if self._total_time > 0 and len(elastic_segs) == 0: - assert np.abs(sum([x.Duration for x in self._wfm_segment_list])-self._total_time) < 5e-15, "Sum of waveform segment durations do not match the total specified waveform group time. Consider making one of the segments elastic by setting its duration to be -1." - #Get the elastic segment index - if len(elastic_segs) > 0: - elas_seg_ind = elastic_segs[0] - fs = self.SampleRate - #On the rare case where the segments won't fit the overall size by being too little (e.g. 2.4, 2.4, 4.2 adds up to 9, but rounding - #the sampled segments yields 2, 2, 4 which adds up to 8) or too large (e.g. 2.6, 2.6, 3.8 adds up to 9, but rounding the sampled - #segments yields 3, 3, 4 which adds up to 10), the elastic-time must be carefully calculated from the total number of sample points - #rather than the durations! - elastic_time = self._total_time*fs - (sum([self._wfm_segment_list[x].NumPts(fs) for x in range(len(self._wfm_segment_list)) if x != elas_seg_ind])) - elastic_time = elastic_time / fs - else: - elas_seg_ind = -1 - elastic_time = -1 - - return (elas_seg_ind, elastic_time) - def _assemble_waveform_raw(self): - #Temporarily set the Duration of Elastic time-segment... - elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() - if elas_seg_ind != -1: - self._wfm_segment_list[elas_seg_ind].Duration = elastic_time - - num_chnls = len(self._awg_chan_list) - final_wfms = [np.array([])]*num_chnls - #Assemble each channel separately - for cur_ch in range(len(self._awg_chan_list)): - #Reset any waveform modulation commands for a new sequence construction... - for cur_wfm_seg in self._wfm_segment_list: - cur_wfm_seg.reset_waveform_transforms(self._lab) - t0 = 0 - #Concatenate the individual waveform segments - for cur_wfm_seg in self._wfm_segment_list: - if cur_wfm_seg.NumPts(self.SampleRate) == 0: - continue - #TODO: Preallocate - this is a bit inefficient... - final_wfms[cur_ch] = np.concatenate((final_wfms[cur_ch], cur_wfm_seg.get_waveform(self._lab, self._sample_rate, t0, cur_ch))) - t0 = final_wfms[cur_ch].size - #Scale the waveform via the global scale-factor... - final_wfms[cur_ch] *= self._global_factor - assert self.NumPts == final_wfms[cur_ch].size, "The sample-rate and segment-lengths yield segment points that exceed the total waveform size. Ensure that there is sufficient freedom in the elastic segment size to compensate." - - #Reset segment to be elastic - if elas_seg_ind != -1: - self._wfm_segment_list[elas_seg_ind].Duration = -1 - - return (final_wfms, elas_seg_ind) + raise NotImplementedError() def _check_changes_wfm_data(self, dict_wfm_data, final_wfm, final_mkrs): #Check waveform equality (works for sequenced/autocompressed version as well) @@ -303,13 +198,102 @@ def _program_auto_comp_basic_linked(self, minSize, final_wfms, final_mkrs): seq_mkrs[cur_ch][-1] = cur_mkrs[cur_ch] return [{'waveforms' : seq_segs[cur_ch], 'markers' : seq_mkrs[cur_ch], 'seq_ids' : seq_ids} for cur_ch in range(num_channels)] + def activate(self): + for cur_awg_chan in self._awg_chan_list: + cur_awg_chan.Output = True + + def deactivate(self): + for cur_awg_chan in self._awg_chan_list: + cur_awg_chan.Output = False + + def prepare_initial(self): + """ + Method to prepare waveforms and load them into memory of AWG intsrument + """ + #Prepare the waveform + final_wfms = self._assemble_waveform_raw() + + #Ensure that the number of points in the waveform satisfies the AWG memory requirements... + for ind, cur_awg_chan in enumerate(self._awg_chan_list): + mem_params = cur_awg_chan._instr_awg.MemoryRequirements + num_pts = final_wfms[ind].size + assert num_pts >= mem_params['MinSize'], f"Waveform too short; needs to have at least {mem_params['MinSize']} points." + assert num_pts % mem_params['Multiple'] == 0, f"Number of points in waveform needs to be a multiple of {mem_params['Multiple']}." + #Assemble markers + final_mkrs = [] + for ind, cur_awg_chan in enumerate(self._awg_chan_list): + if len(cur_awg_chan._awg_mark_list) > 0: + mkr_list = [x._assemble_marker_raw() for x in cur_awg_chan._awg_mark_list] + else: + mkr_list = [np.array([])] + final_mkrs += [mkr_list] + #Check if there are any changes in the waveforms - if not, then there's no need to reprogram... + self._dont_reprogram = True + for m in range(len(self._cur_prog_waveforms)): + if self._cur_prog_waveforms[m] is not None: + if not self._check_changes_wfm_data(self._cur_prog_waveforms[m], final_wfms[m], final_mkrs[m]): + self._dont_reprogram = False + break + else: + self._dont_reprogram = False + break + if self._dont_reprogram: + return -class WaveformAWG(HALbase, TriggerOutputCompatible, TriggerInputCompatible, AWGBase): + #For the case where the sequencing table must be the same for all channels (e.g. channels on the Agilent N8241A), the sequencing is + #done on all the waveforms across all channels + if self.AutoCompressionLinkChannels and self.AutoCompression != 'None': + assert self.AutoCompression == 'Basic', "Only the \'Basic\' algorithm is currently supported for linked-channel auto-compression." + self.cur_wfms_to_commit = [] + #Check that the memory requirements are the same across all channels (i.e. typically the same AWG) + dict_auto_comps = [cur_awg_chan._instr_awg.AutoCompressionSupport for cur_awg_chan in self._awg_chan_list] + for cur_key in dict_auto_comps[0]: + for cur_dict in dict_auto_comps: + assert cur_dict[cur_key] == dict_auto_comps[0][cur_key], f"Linked-channel auto-compression requires all channels to have the same {cur_key}." + #Perform BASIC compression on all the channels simultaneously... + dict_wfm_datas = self._program_auto_comp_basic_linked(dict_auto_comps[0]['MinSize'], final_wfms, final_mkrs) + for ind, cur_awg_chan in enumerate(self._awg_chan_list): + dict_wfm_data = dict_wfm_datas[ind] + seg_lens = [x.size for x in dict_wfm_data['waveforms']] + cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data) + self.cur_wfms_to_commit.append(dict_wfm_data) + else: + self.cur_wfms_to_commit = [] + for ind, cur_awg_chan in enumerate(self._awg_chan_list): + mkr_list = final_mkrs[ind] + + dict_auto_comp = cur_awg_chan._instr_awg.AutoCompressionSupport + if self.AutoCompression == 'None' or not dict_auto_comp['Supported'] or final_wfms[0].size < dict_auto_comp['MinSize']*2: + #UNCOMPRESSED + #Just program the AWG via over a single waveform + #Don't compress if disabled, unsupported or if the waveform size is too small to compress + dict_wfm_data = {'waveforms' : [final_wfms[ind]], 'markers' : [mkr_list], 'seq_ids' : [0]} + elif self.AutoCompression == 'Basic': + #BASIC COMPRESSION + #The basic compression algorithm is to chop up the waveform into its minimum set of bite-sized pieces and to find repetitive aspects + dict_wfm_data = self._program_auto_comp_basic(cur_awg_chan, final_wfms[ind], mkr_list) + + seg_lens = [x.size for x in dict_wfm_data['waveforms']] + cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data) + self.cur_wfms_to_commit.append(dict_wfm_data) + + def prepare_final(self): + """ + Method that programs waveform onto channel + """ + if not self._dont_reprogram: + for ind, cur_awg_chan in enumerate(self._awg_chan_list): + cur_awg_chan._instr_awg.program_channel(cur_awg_chan._instr_awg_chan.short_name, self.cur_wfms_to_commit[ind]) + #Set it AFTER the programming in case there is an error etc... + self._cur_prog_waveforms[ind] = self.cur_wfms_to_commit[ind] + + +class WaveformAWG(AWGBase, HALbase, TriggerOutputCompatible, TriggerInputCompatible): def __init__(self, hal_name, lab, awg_channel_tuples, sample_rate, total_time=-1, global_factor = 1.0): - HALbase.__init__(self, hal_name) - AWGBase.__init__(self, sample_rate, total_time, global_factor) + AWGBase.__init__(self, hal_name, sample_rate, total_time, global_factor) + self._wfm_segment_list = [] if not lab._HAL_exists(hal_name): #awg_channel_tuples is given as (instr_AWG_name, channel_name) self._awg_chan_list = [] @@ -340,6 +324,19 @@ def fromConfigDict(cls, config_dict, lab): config_dict["TotalTime"], config_dict["global_factor"]) + @property + def Duration(self): + if self._total_time != -1: + return self._total_time + full_len = 0 + for cur_seg in self._wfm_segment_list: + full_len += cur_seg.Duration + #If there is an elastic time-segment, then negate the -1 and add in the correct elastic time... + elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() + if elas_seg_ind != -1: + full_len = full_len + 1 + elastic_time + return full_len + def _get_child(self, tuple_name_group): cur_name, cur_type = tuple_name_group if cur_type == 'w': @@ -435,6 +432,74 @@ def _get_all_trigger_inputs(self): trig_inp_objs += cur_chan.get_all_software_triggers() return trig_inp_objs + def _get_marker_waveform_from_segments(self, segments): + #Temporarily set the Duration of Elastic time-segment... + elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() + if elas_seg_ind != -1: + self._wfm_segment_list[elas_seg_ind].Duration = elastic_time + + const_segs = [] + dict_segs = {} + for cur_seg in segments: + if type(cur_seg) == list: + if len(cur_seg) == 0: + const_segs += [cur_seg] #TODO: Check if is an error condition to end up here? + + cur_queue = cur_seg[1:] + if len(cur_queue) == 1: + cur_queue = cur_queue[0] + + if not cur_seg[0] in dict_segs: + dict_segs[cur_seg[0]] = [cur_queue] + else: + dict_segs[cur_seg[0]] += [cur_queue] + else: + const_segs += [cur_seg] + + final_wfm = np.zeros(int(np.round(self.NumPts)), dtype=np.ubyte) + cur_ind = 0 + for cur_seg in self._wfm_segment_list: + cur_len = cur_seg.NumPts(self._sample_rate) + if cur_len == 0: + continue + if cur_seg.Name in const_segs: + final_wfm[cur_ind:cur_ind+cur_len] = 1 + elif cur_seg.Name in dict_segs: #i.e. another segment with children like WFS_Group + final_wfm[cur_ind:cur_ind+cur_len] = cur_seg._get_marker_waveform_from_segments(dict_segs[cur_seg.Name], self._sample_rate) + cur_ind += cur_len + + #Reset segment to be elastic + if elas_seg_ind != -1: + self._wfm_segment_list[elas_seg_ind].Duration = -1 + + return final_wfm + + def _get_elastic_time_seg_params(self): + elastic_segs = [] + for ind_wfm, cur_wfm_seg in enumerate(self._wfm_segment_list): + if cur_wfm_seg.Duration == -1: + elastic_segs += [ind_wfm] + assert len(elastic_segs) <= 1, "There are too many elastic waveform segments (cannot be above 1)." + if self._total_time == -1: + assert len(elastic_segs) == 0, f"If the total waveform length for \"{self.Name}\" is unbound, the number of elastic segments must be zero." + if self._total_time > 0 and len(elastic_segs) == 0: + assert np.abs(sum([x.Duration for x in self._wfm_segment_list])-self._total_time) < 5e-15, "Sum of waveform segment durations do not match the total specified waveform group time. Consider making one of the segments elastic by setting its duration to be -1." + #Get the elastic segment index + if len(elastic_segs) > 0: + elas_seg_ind = elastic_segs[0] + fs = self.SampleRate + #On the rare case where the segments won't fit the overall size by being too little (e.g. 2.4, 2.4, 4.2 adds up to 9, but rounding + #the sampled segments yields 2, 2, 4 which adds up to 8) or too large (e.g. 2.6, 2.6, 3.8 adds up to 9, but rounding the sampled + #segments yields 3, 3, 4 which adds up to 10), the elastic-time must be carefully calculated from the total number of sample points + #rather than the durations! + elastic_time = self._total_time*fs - (sum([self._wfm_segment_list[x].NumPts(fs) for x in range(len(self._wfm_segment_list)) if x != elas_seg_ind])) + elastic_time = elastic_time / fs + else: + elas_seg_ind = -1 + elastic_time = -1 + + return (elas_seg_ind, elastic_time) + def __str__(self): ret_str = "" ret_str += f'Name: {self.Name}\n' @@ -510,7 +575,7 @@ def _set_current_config_waveforms(self, list_wfm_dict_config): self._wfm_segment_list.append(new_wfm_seg) def plot_waveforms(self, overlap=False): - final_wfms = self._assemble_waveform_raw()[0] + final_wfms = self._assemble_waveform_raw() fig = plt.figure() if overlap: fig, axs = plt.subplots(1) @@ -525,99 +590,40 @@ def plot_waveforms(self, overlap=False): axs[ind].plot(t_vals, cur_wfm) return fig - def activate(self): - for cur_awg_chan in self._awg_chan_list: - cur_awg_chan.Output = True - - def deactivate(self): - for cur_awg_chan in self._awg_chan_list: - cur_awg_chan.Output = False - def get_raw_waveforms(self): - return self._assemble_waveform_raw()[0] - - def prepare_initial(self): - """ - Method to prepare waveforms and load them into memory of AWG intsrument - """ - #Prepare the waveform - final_wfms, elastic_ind = self._assemble_waveform_raw() - - #Ensure that the number of points in the waveform satisfies the AWG memory requirements... - for ind, cur_awg_chan in enumerate(self._awg_chan_list): - mem_params = cur_awg_chan._instr_awg.MemoryRequirements - num_pts = final_wfms[ind].size - assert num_pts >= mem_params['MinSize'], f"Waveform too short; needs to have at least {mem_params['MinSize']} points." - assert num_pts % mem_params['Multiple'] == 0, f"Number of points in waveform needs to be a multiple of {mem_params['Multiple']}." - - #Assemble markers - final_mkrs = [] - for ind, cur_awg_chan in enumerate(self._awg_chan_list): - if len(cur_awg_chan._awg_mark_list) > 0: - mkr_list = [x._assemble_marker_raw() for x in cur_awg_chan._awg_mark_list] - else: - mkr_list = [np.array([])] - final_mkrs += [mkr_list] - - #Check if there are any changes in the waveforms - if not, then there's no need to reprogram... - self._dont_reprogram = True - for m in range(len(self._cur_prog_waveforms)): - if self._cur_prog_waveforms[m] is not None: - if not self._check_changes_wfm_data(self._cur_prog_waveforms[m], final_wfms[m], final_mkrs[m]): - self._dont_reprogram = False - break - else: - self._dont_reprogram = False - break - if self._dont_reprogram: - return + return self._assemble_waveform_raw() + + def _assemble_waveform_raw(self): + #Temporarily set the Duration of Elastic time-segment... + elas_seg_ind, elastic_time = self._get_elastic_time_seg_params() + if elas_seg_ind != -1: + self._wfm_segment_list[elas_seg_ind].Duration = elastic_time - #For the case where the sequencing table must be the same for all channels (e.g. channels on the Agilent N8241A), the sequencing is - #done on all the waveforms across all channels - if self.AutoCompressionLinkChannels and self.AutoCompression != 'None': - assert self.AutoCompression == 'Basic', "Only the \'Basic\' algorithm is currently supported for linked-channel auto-compression." - self.cur_wfms_to_commit = [] - #Check that the memory requirements are the same across all channels (i.e. typically the same AWG) - dict_auto_comps = [cur_awg_chan._instr_awg.AutoCompressionSupport for cur_awg_chan in self._awg_chan_list] - for cur_key in dict_auto_comps[0]: - for cur_dict in dict_auto_comps: - assert cur_dict[cur_key] == dict_auto_comps[0][cur_key], f"Linked-channel auto-compression requires all channels to have the same {cur_key}." - #Perform BASIC compression on all the channels simultaneously... - dict_wfm_datas = self._program_auto_comp_basic_linked(dict_auto_comps[0]['MinSize'], final_wfms, final_mkrs) - for ind, cur_awg_chan in enumerate(self._awg_chan_list): - dict_wfm_data = dict_wfm_datas[ind] - seg_lens = [x.size for x in dict_wfm_data['waveforms']] - cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data) - self.cur_wfms_to_commit.append(dict_wfm_data) - else: - self.cur_wfms_to_commit = [] - for ind, cur_awg_chan in enumerate(self._awg_chan_list): - mkr_list = final_mkrs[ind] - - dict_auto_comp = cur_awg_chan._instr_awg.AutoCompressionSupport - if self.AutoCompression == 'None' or not dict_auto_comp['Supported'] or final_wfms[0].size < dict_auto_comp['MinSize']*2: - #UNCOMPRESSED - #Just program the AWG via over a single waveform - #Don't compress if disabled, unsupported or if the waveform size is too small to compress - dict_wfm_data = {'waveforms' : [final_wfms[ind]], 'markers' : [mkr_list], 'seq_ids' : [0]} - elif self.AutoCompression == 'Basic': - #BASIC COMPRESSION - #The basic compression algorithm is to chop up the waveform into its minimum set of bite-sized pieces and to find repetitive aspects - dict_wfm_data = self._program_auto_comp_basic(cur_awg_chan, final_wfms[ind], mkr_list) - - seg_lens = [x.size for x in dict_wfm_data['waveforms']] - cur_awg_chan._instr_awg.prepare_waveform_memory(cur_awg_chan._instr_awg_chan.short_name, seg_lens, raw_data=dict_wfm_data) - self.cur_wfms_to_commit.append(dict_wfm_data) + num_chnls = len(self._awg_chan_list) + final_wfms = [np.array([])]*num_chnls + #Assemble each channel separately + for cur_ch in range(len(self._awg_chan_list)): + #Reset any waveform modulation commands for a new sequence construction... + for cur_wfm_seg in self._wfm_segment_list: + cur_wfm_seg.reset_waveform_transforms(self._lab) + t0 = 0 + #Concatenate the individual waveform segments + for cur_wfm_seg in self._wfm_segment_list: + if cur_wfm_seg.NumPts(self.SampleRate) == 0: + continue + #TODO: Preallocate - this is a bit inefficient... + final_wfms[cur_ch] = np.concatenate((final_wfms[cur_ch], cur_wfm_seg.get_waveform(self._lab, self._sample_rate, t0, cur_ch))) + t0 = final_wfms[cur_ch].size + #Scale the waveform via the global scale-factor... + final_wfms[cur_ch] *= self._global_factor + assert self.NumPts == final_wfms[cur_ch].size, "The sample-rate and segment-lengths yield segment points that exceed the total waveform size. Ensure that there is sufficient freedom in the elastic segment size to compensate." + + #Reset segment to be elastic + if elas_seg_ind != -1: + self._wfm_segment_list[elas_seg_ind].Duration = -1 + + return final_wfms - def prepare_final(self): - """ - Method that programs waveform onto channel - """ - if not self._dont_reprogram: - for ind, cur_awg_chan in enumerate(self._awg_chan_list): - cur_awg_chan._instr_awg.program_channel(cur_awg_chan._instr_awg_chan.short_name, self.cur_wfms_to_commit[ind]) - #Set it AFTER the programming in case there is an error etc... - self._cur_prog_waveforms[ind] = self.cur_wfms_to_commit[ind] class AWGOutputChannel(TriggerInput, LockableProperties): def __init__(self, lab, instr_awg_name, channel_name, ch_index, parent_awg_waveform, sample_rate): diff --git a/sqdtoolz/HAL/GENmwSrcAWG.py b/sqdtoolz/HAL/GENmwSrcAWG.py new file mode 100644 index 0000000..8e67041 --- /dev/null +++ b/sqdtoolz/HAL/GENmwSrcAWG.py @@ -0,0 +1,96 @@ +from sqdtoolz.HAL.AWG import AWGBase, AWGOutputChannel +from sqdtoolz.HAL.HALbase import HALbase +from sqdtoolz.HAL.TriggerPulse import TriggerInput, TriggerInputCompatible +import numpy as np + +class GENmwSrcAWGchannel(TriggerInput): + def __init__(self, parent): + self._frequency = 100e9 + self._power = 0 + self._parent = parent + self._trig_src_pol = 1 + + @property + def Frequency(self): + return self._frequency + @Frequency.setter + def Frequency(self, val): + self._frequency = val + + @property + def Power(self): + return self._power + @Power.setter + def Power(self, val): + self._power = val + + @property + def InputTriggerEdge(self): + return self._trig_src_pol + @InputTriggerEdge.setter + def InputTriggerEdge(self, pol): + self._trig_src_pol = pol + + def get_trigger_source(self): + ''' + Get the Trigger object corresponding to the trigger source. + ''' + return self._trig_src_obj + + def _get_instr_trig_src(self): + return self.get_trigger_source() + + def _get_instr_input_trig_edge(self): + raise self.InputTriggerEdge + + def _get_timing_diagram_info(self): + if self.Mode == 'PulseModulated': + return {'Type' : 'BlockShaded', 'TriggerType' : 'Gated'} + else: + return {'Type' : 'None'} + + + +class GENmwSrcAWG(AWGBase, TriggerInputCompatible): + def __init__(self, hal_name, lab, awg_channel_tuple, num_tones, sample_rate, total_time=0): + AWGBase.__init__(self, hal_name, sample_rate, total_time, global_factor=1.0) + assert len(awg_channel_tuple) == 2, "The argument awg_channel_tuple must be a tuple of form (instr_AWG_name, channel_name)." + cur_awg_name, cur_ch_name = awg_channel_tuple + self._awg_chan_list.append(AWGOutputChannel(lab, cur_awg_name, cur_ch_name, 0, self, sample_rate)) + + assert num_tones > 0, "The argument num_tones must be greater than 0." + self._rf_channels = [GENmwSrcAWGchannel(self) for x in range(num_tones)] + + def get_rf_channel(self, chan_id): + assert chan_id >= 0 and chan_id < len(self._rf_channels), f"chan_id must be a valid zero-based index for the number of RF channels ({len(self._rf_channels)} in this case)." + return self._rf_channels[chan_id] + + def _get_all_trigger_inputs(self): + return self._rf_channels[:] + + @property + def Duration(self): + return self._total_time + + def _check_triggers_same_source(self): + prev_trig_src = None + for m, cur_rf_tone in enumerate(self._rf_channels): + cur_trig_src = cur_rf_tone._get_instr_trig_src() + assert cur_trig_src != None, "The trigger sources must be set for all RF channels in a GENmwSrcAWG HAL." + if m == 0: + prev_trig_src = cur_trig_src._get_instr_trig_src() + assert prev_trig_src == cur_trig_src._get_instr_trig_src(), "The trigger sources for the trigger pulses driving the RF channels must be the same in a GENmwSrcAWG HAL." + + def _assemble_waveform_raw(self): + final_wfm_raw = np.zeros(self.NumPts) + + for cur_rf_ch in self._rf_channels: + volt_amplitude = np.sqrt(0.001 * 10**(cur_rf_ch.Power/10) * 50 * 2) #sqrt(2) for RMS... + edges, gates = cur_rf_ch.get_trigger_source().get_trigger_times() + for cur_gate in gates: + start_ind, end_ind = cur_gate + final_wfm_raw[start_ind:end_ind+1] += volt_amplitude * np.cos(2*np.pi*cur_rf_ch.Frequency * np.arange(end_ind - start_ind + 1)/self.SampleRate) + + return np.array([final_wfm_raw]) + +