"""
Communicate with oscilloscope via vxi11 protocol over LAN network
"""
import abc
import time
import numpy as np
import pylab as p
import vxi11
import re
import struct
from six import with_metaclass
from .. scpi import commands as cmd
from .. import loggers
from .. import plotting
from .. import tools
from copy import copy
bar_available = False
try:
import pyprind
bar_available = True
except ImportError:
pass
try:
# Python 2
from itertools import izip
except ImportError:
# Python 3
izip = zip
from functools import reduce
# abbreviations
aarg = cmd.add_arg
q = cmd.query
TCmd = cmd.TektronixDPO4104BCommands
RSCmd = cmd.RhodeSchwarzRTO1044Commands
BIG_NUMBER = 1e25 # A number larger than the amount of captured samples
[docs]def setget(command):
"""
Shortcut to construct property object to wrap getters and setters
for a number of settings
Args:
command (str): The command being used to get/set. Get will be a query
value (str): The value to set
Returns:
property object
"""
return property(lambda self: self._send(q(command)),\
lambda self, value: self._set(aarg(command, value)))
[docs]class AbstractBaseOscilloscope(with_metaclass(abc.ABCMeta, object)):
"""
A oscilloscope with a high sampling rate in the order of several
gigasamples. Defines the scope API the DAQ reiles on
"""
# constants used by the socket connection
MAXTRIALS = 5
CONTINOUS_RUN = cmd.RUN_CONTINOUS
ACQUIRE_ONE = cmd.RUN_SINGLE
string_encoding = cmd.ENCODING_ISO
def __init__(self, ip="169.254.68.19", loglevel=20):
"""
Connect to the scope via its socket server
Args:
ip (str): ip of the scope
"""
self.ip = ip
self.connect_trials = 0
self.wf_buff_header = None # store a waveform header in case they are all the same
self.instrument = vxi11.Instrument(ip)
self.active_channel = None
self.logger = loggers.get_logger(loglevel)
[docs] def reopen_socket(self):
"""
Close and reopen the socket after a timeout
Returns:
None
"""
self.logger.debug("Reopening socket on ip {}".format(self.ip))
self.instrument = vxi11.Instrument(self.ip)
def _send(self,command):
"""
Send command to the scope. Raises socket.timeout error if
it had failed too often
Args:
command (str): command to be sent to the
scope
"""
if self.connect_trials == self.MAXTRIALS:
self.connect_trials = 0
raise vxi11.vxi11.Vxi11Exception("TimeOut")
self.logger.debug("Sending {}".format(command))
try:
response = self.instrument.ask(command, encoding=self.string_encoding)
except Exception as e:
self.reopen_socket()
response = self.instrument.ask(command, encoding=self.string_encoding)
self.connect_trials += 1
return response
def _set(self, command):
"""
Send a command bur return no response
Args:
command (str): command to be send to the scope
Returns:
None
"""
self.logger.debug("Sending {}".format(command))
self.instrument.write(command, encoding=self.string_encoding)
[docs] def ping(self):
"""
Check if oscilloscope is connected
"""
ping = self._send(cmd.WHOAMI)
self.logger.info("Scope responds to {} with {}".format(cmd.WHOAMI, ping))
return True if ping else False
# def run(self):
# """
# Start data acquisition
#
# Returns:
# None
# """
# self.logger.debug("Starting data acquisition")
# assert self.acquire_mode == cmd.RUN_CONTINOUS,\
# "Run is ment to use with continous acquisition!Set self.acquire_mode accordingly"
# self.acquire = cmd.START_ACQUISITIONS
#
# def stop(self):
# """
# Stop any ongoing data acquisition
#
# Returns:
# None
# """
# self.acquire = cmd.STOP_ACQUISITIONS
#
# def do_single_acquisition(self):
# """
# Acquire a single event
#
# Returns:
# None
# """
# assert self.acquire == cmd.RUN_SINGLE, "Set scope to single acquistion mode first!"
# return self.acquire_waveform()
#
# def do_single_acquisition_fast(self):
# """
# Acquire a single event. FAST mode (no check)
#
# Returns:
#
# """
# return self.acquire_waveform()
def __repr__(self):
"""
String representation of the scope
"""
ping = self._send(cmd.WHOAMI)
return "<" + ping + ">"
[docs] @abc.abstractmethod
def select_channel(self, channel):
"""
Select a channel for the data acquisition
Args:
channel (int): Channel number
Returns:
None
"""
return
@abc.abstractproperty
def samplingrate(self):
"""
Get the current sampling rate
Returns:
float (GSamples/sec)
"""
return
def __del__(self):
"""
Destructor, close connection explicitely.
Returns:
None
"""
self.instrument.close()
# class TektronixWaveform(Waveform):
# """
# A bundle of waveforms
# """
#
# def _convert_volts(self, waveform):
# voltages = ((waveform - (np.ones(len(waveform)) * self.header["yoff"])) \
# * (np.ones(len(waveform)) * self.header["ymult"])) \
# + (np.ones(len(waveform)) * self.header["yzero"])
# return voltages
#
# def volts(self):
# converted = []
# for waveform in self.data:
# converted.append(self._convert_volts(waveform))
#
# return converted
#
[docs]class UnknownOscilloscope(AbstractBaseOscilloscope):
"""
Use for testing and debugging
"""
[docs] def select_channel(self, channel):
raise NotImplementedError("Not implemented!")
[docs] def samplingrate(self):
raise NotImplementedError("Not Implemented!")
# getters/setters for the header
[docs]class TektronixDPO4104B(AbstractBaseOscilloscope):
"""
Oscilloscope of type DPO4104B manufactured by Tektronix
"""
# setget properties
source = setget(cmd.SOURCE)
data_start = setget(cmd.DATA_START)
data_stop = setget(cmd.DATA_STOP)
data_width = setget(TCmd.WF_BYTEWIDTH)
waveform_enc = setget(cmd.WF_ENC)
acquire = setget(cmd.RUN)
acquire_mode = setget(TCmd.ACQUISITON_MODE)
data = setget(cmd.DATA)
trigger_frequency_enabled = setget(TCmd.TRIGGER_FREQUENCY_ENABLED)
histbox = setget(cmd.HISTBOX)
histstart = setget(cmd.HISTSTART)
histend = setget(cmd.HISTEND)
# FIXME make it a property
binary_formats = {"RI": "!b"} # transform the binary format to something
# which is understandable by the struct
# module
binary_header_pattern = re.compile("#(?P<bin_head>[0-9]*)")
def __init__(self, ip, loglevel=20):
AbstractBaseOscilloscope.__init__(self, ip, loglevel=loglevel)
self.active_channel = TCmd.CH1
self._data_start_stop_buffer = (None, None)
self.header = property(get_header, set_header)
self.data_width = 1
# FIXME: future extension
self._is_running = False
self._acquisition_single = False
# prepare the scope - use binary encoding by default
#self.waveform_enc = cmd.ASCII
self._set("DATa:ENCdg FAST")
[docs] def trigger_single(self):
self.acquire_mode = cmd.RUN_SINGLE
[docs] def trigger_continuous(self):
self.acquire_mode = cmd.RUN_CONTINOUS
self.acquire = cmd.START_ACQUISITIONS
def _trigger_acquire(self):
"""
Acquire one single waveform
Returns:
None
"""
self.acquire = "ON"
#@staticmethod
[docs] def select_channel(self, channel):
"""
Select the channel for the readout
Args:
channel (int): Channel number (1-4)
Returns:
None
"""
assert 0 < channel < 5, "Channel value has to be 1-4"
channel_dict = {1 : TCmd.CH1, 2: TCmd.CH2, 3: TCmd.CH3, 4: TCmd.CH4}
self.source = channel_dict[channel]
self.active_channel = channel_dict[channel]
self.logger.info("Selecting channel {}".format(self.source))
return None
def _select_active_channel(self):
"""
Pick the channel which is intended to be used
Returns:
"""
self.source = self.active_channel
@property
def samplingrate(self):
"""
The samplingrate in GSamples/S
Returns:
float
"""
self.logger.debug("Got samplingrate of {}".format(1./self.header["xincr"]))
return 1./self.header["xincr"]
@property
def triggerrate(self):
"""
The rate the scope is triggering. The scope in principle provides this number,
however we have to work around it as it does not work reliably
Keyword Arguments:
interval (int): time interval to integrate measurement over in seconds
Returns:
float
"""
self.logger.debug("The returned value is instantanious!\n "
"For serious measurements, gather some statistics!")
self.trigger_frequency_enabled = TCmd.ON
freq = float(self._send(TCmd.TRIGGER_FREQUENCYQ))
return freq
[docs] def reset_acquisition_window(self):
"""
Reset the acquisition window to the maximum possible acquisition window
Returns:In
None
"""
self.data_start = 0
self.data_stop = BIG_NUMBER # temporarily set this to a big bogus number
# this will result in the correct value for
# "npoints" later
self.set_acquisition_window(0, self.header["npoints"])
@property
def time_binwidth(self):
"""
Get the binwidth of the time - that is sampling rate
Returns:
float
"""
return float(self.header["xincr"])
@property
def waveform_bins(self):
"""
Get the time bin numbers for the waveform voltage data
Returns:
np.ndarray
"""
bins = np.linspace(int(self.data_start), int(self.data_stop),\
int(self.header["npoints"]))
return bins
@property
def waveform_times(self):
"""
Get the time for the waveform bins
Returns:
np.ndarray
"""
return self.header["xs"]
@property
def histogram(self):
"""
Return a histogram which might be recorded
by the scope
"""
start = self.histstart
end = self.histend
bincontent = self._send(cmd.HISTDATA)
assert None not in [start,end,bincontent],\
"Try again! might just be a hickup {} {} {}".format(start,end,bincontent)
bincontent = np.array([int(b) for b in bincontent.split(",")])#
start = float(start)
end = float(end)
nbins = len(bincontent)
if start > end:
print ("Swapping start and end...")
tmpstart = copy(start)
start = end
end = tmpstart
del tmpstart
print("Found histogram with {} entries from {:4.2e} to {:4.2e}".format(nbins,start, end))
l_binedges = np.linspace(start,end,nbins + 1)[:-1]
r_binedges = np.linspace(start,end,nbins + 1)[1:]
bincenters = r_binedges + (r_binedges - l_binedges)/2.
return bincenters, bincontent
[docs] def set_acquisition_window(self, start, stop):
"""
Set the acquisition window in bin number
Args:
start (int): start bin
stop (int): stop bin
Returns:
None
"""
self.data_start = start
self.data_stop = stop
self._data_start_stop_buffer = (start, stop)
self.logger.info("Set acquisition window to {} - {}".format(self.data_start, self.data_stop))
[docs] def set_acquisition_window_from_internal_buffer(self):
"""
Use the internal buffer to set the data acquisition window. Might be necessary
if the channel was switched in the meantime
Returns:
None
"""
self.data_start, self.data_stop = self._data_start_stop_buffer
[docs] def set_feature_acquisition_window(self, leading, trailing, n_waveforms=20):
"""
Set the acquisition window around the most prominent feature in the waveform
Args:
leading (float): leading ns before the most prominent feature
trailing (float): trailing ns after the most prominent feature
Keyword Args
n_waveforms (int): average over n_waveforms to identify the most prominent feature
Returns:
None
"""
self.reset_acquisition_window()
xs, avg = self.average_waveform(n=n_waveforms)
wf_bins = self.waveform_bins
abs_avg = abs(avg)
feature_y = max(abs_avg)
#feature_x = xs[abs_avg == feature_y]
feature_x_bin = wf_bins[abs_avg == feature_y]
bin_width = self.time_binwidth
leading_bins = 1e-9*float(leading)/bin_width
trailing_bins = 1e-9*float(trailing)/bin_width
data_start = int(feature_x_bin - leading_bins)
data_stop = int(feature_x_bin + trailing_bins)
self.set_acquisition_window(data_start, data_stop)
return None
[docs] def make_n_acquisitions(self, n,\
trials=20, return_only_charge=False,\
single_acquisition=True,\
return_digitizer_levels=False
):
"""
Acquire n waveforms
Args:
n (int): Number of waveforms to acquire
Keyword Args:
trials (int): Set breaking condition when to abort acquisition
return_only_charge (bool): don't get the wf, but only integrated charge instead
single_acquisition (bool): use the scopes single acquisition mode
return_digitizer_levels (bool): return the waveform data in digitizer levels, not volts.
Saves space for storage, since int8 can be used for 1-bit representation.
..and there is no float8 for obvious reasons.
Returns:
list: [wf_1,wf_2,...]
"""
wforms = list()
acquired = 0
trial = 0
if bar_available:
bar = pyprind.ProgBar(n, track_time=True, title='Acquiring waveforms...')
if single_acquisition:
self.trigger_single()
else:
self.trigger_continuous()
wf_buff = 0
# get the first waveform
wf_buff = self.acquire_waveform(return_digitizer_levels=return_digitizer_levels)
n -= 1
while acquired < n:
try:
if single_acquisition:
self.acquire = "ON"
wf = self.acquire_waveform(header=self.header, return_digitizer_levels=return_digitizer_levels)
# flatline test
if (wf[0]*np.ones(len(wf)) - wf).sum() == 0:
continue
if (wf - wf_buff).sum() == 0:
continue # test if scope just returned the
# same waveform again
if return_only_charge:
if return_digitizer_levels:
raise ValueError("Can not be done for digitizer levels! Need to convert to volts!")
wf = tools.integrate_wf(self.header, wf)
wf_buff = wf
wforms.append(wf)
acquired += 1
if bar_available:
bar.update()
except Exception as e:
self.logger.critical("Can not acquire wf..{}".format(e))
trial += 1
if trial == trials:
break
if bar_available:
print(bar)
return wforms
[docs] def fill_buffer(self):
"""
Returns:
"""
self._wf_buff = self.acquire_waveform()
[docs] def pull(self, buff_header=True, use_buffered_acq_window=True,
use_channel_info=True):
"""
Fit in the API for the DAQ. Returns waveform data
Keyword Args:
buff_header (bool): buffer the header for subsequent acquisition without
changing the parameters of the acquistion (much faster)
FIXME! Default value of this should be False, however requires DAQ API change
use_buffered_acq_window (bool): set this flag to cache the length of the acquisition window
internally so that it does not get resetted when switching channels
use_channel_info (bool): select the channel on each submit
Returns:
dict
"""
user_error_msg = """ This pull method is designed to be used in single acquisition mode."""
assert self.acquire_mode == cmd.RUN_SINGLE, user_error_msg
# FIXME: The buffer mechanism fails if this is the first
# waveform at all.
data = dict()
if use_channel_info:
self._select_active_channel()
if use_buffered_acq_window:
self.set_acquisition_window_from_internal_buffer()
#while True:
#if buff_header:
# header = self.
wf = self.acquire_waveform(buff_header=buff_header)
#if (wf[0]*np.ones(len(wf)) - wf).sum() == 0:
# continue # flatline test
#elif (wf - self._wf_buff).sum() == 0:
# self._wf_buff = wf
# continue # test if scope just returned the
# # same waveform again
#else:
# self._wf_buff = wf
# break
if buff_header:
data.update(self._header_buff)
else:
data.update(self.wf_header())
data["waveform"] = wf
return data
[docs]class RhodeSchwarzRTO1044(AbstractBaseOscilloscope):
"""
Made by Rhode&Schwarz, scope with sampling rate up to 20GSamples/s
"""
def __init__(self, ip):
AbstractBaseOscilloscope.__init__(self,ip)
self.active_channel = RSCmd.CH1
self.run_start_time = None
[docs] def select_channel(self, channel):
"""
Select the channel for the readout.
Args:
channel (int): Channel number (1-4)
Returns:
None
"""
channel_dict = {1: RSCmd.CH1, 2: RSCmd.CH2, 3: RSCmd.CH3, 4: RSCmd.CH4}
self.active_channel = channel_dict[channel]
@property
def samplingrate(self):
raise NotImplementedError
[docs] def run(self):
"""
Start continuous acquisitions
Returns:
"""
self._set(RSCmd.RUN)
self.run_start_time = time.monotonic()
[docs] def stop(self):
self._set(RSCmd.STOP)
[docs] def do_single_acquisition(self):
self._set(RSCmd.SINGLE)
@property
def triggerrate(self):
"""
Get the triggerrate of the scope
Args:
interval (float): measurement time in seconds to
Returns:
float
"""
nacq = self._send(RSCmd.N_ACQUISITONS)
interval = time.monotonic() - self.run_start_time
return float(nacq/interval)