Source code for instruments.abstract_instruments.instrument

#!/usr/bin/env python
"""
Provides the base Instrument class for all instruments.
"""

# IMPORTS #####################################################################


import os
import collections
import socket
import struct
import typing_extensions
import urllib.parse as parse

from serial import SerialException
from serial.tools.list_ports import comports
import pyvisa
import usb.core

from instruments.abstract_instruments.comm import (
    SocketCommunicator,
    USBCommunicator,
    VisaCommunicator,
    FileCommunicator,
    LoopbackCommunicator,
    GPIBCommunicator,
    AbstractCommunicator,
    USBTMCCommunicator,
    VXI11Communicator,
    serial_manager,
)
from instruments.optional_dep_finder import numpy
from instruments.errors import AcknowledgementError, PromptError

# CONSTANTS ###################################################################

_DEFAULT_FORMATS = collections.defaultdict(lambda: ">b")
_DEFAULT_FORMATS.update({1: ">b", 2: ">h", 4: ">i"})

# CLASSES #####################################################################


[docs] class Instrument: """ This is the base instrument class from which all others are derived from. It provides the basic implementation for all communication related tasks. In addition, it also contains several class methods for opening connections via the supported hardware channels. """ def __init__(self, filelike, *args, **kwargs): # Check to make sure filelike is a subclass of AbstractCommunicator if isinstance(filelike, AbstractCommunicator): self._file = filelike else: raise TypeError( "Instrument must be initialized with a filelike " "object that is a subclass of " "AbstractCommunicator." ) # Record if we're using the Loopback Communicator and put class in # testing mode so we can disable sleeps in class implementations self._testing = isinstance(self._file, LoopbackCommunicator) self._prompt = None self._terminator = "\n" # COMMAND-HANDLING METHODS # def _ack_expected(self, msg=""): # pylint: disable=unused-argument,no-self-use return None def _authenticate(self, auth): """ Authenticate with credentials for establishing the communication with the instrument. :param auth: Credentials for authentication. """ raise NotImplementedError
[docs] def sendcmd(self, cmd): """ Sends a command without waiting for a response. :param str cmd: String containing the command to be sent. """ self._file.sendcmd(str(cmd)) ack_expected_list = self._ack_expected( cmd ) # pylint: disable=assignment-from-none if not isinstance(ack_expected_list, (list, tuple)): ack_expected_list = [ack_expected_list] for ack_expected in ack_expected_list: if ack_expected is None: break ack = self.read() if ack != ack_expected: raise AcknowledgementError( "Incorrect ACK message received: got {} " "expected {}".format(ack, ack_expected) ) if self.prompt is not None: prompt = self.read(len(self.prompt)) if prompt != self.prompt: raise PromptError( "Incorrect prompt message received: got {} " "expected {}".format(prompt, self.prompt) )
[docs] def query(self, cmd, size=-1): """ Executes the given query. :param str cmd: String containing the query to execute. :param int size: Number of bytes to be read. Default is read until termination character is found. :return: The result of the query as returned by the connected instrument. :rtype: `str` """ ack_expected_list = self._ack_expected( cmd ) # pylint: disable=assignment-from-none if not isinstance(ack_expected_list, (list, tuple)): ack_expected_list = [ack_expected_list] if ack_expected_list[0] is None: # Case no ACK value = self._file.query(cmd, size) else: # Case with ACKs _ = self._file.query(cmd, size=0) # Send the cmd, don't read for ack_expected in ack_expected_list: # Read and verify ACKs ack = self.read() if ack != ack_expected: raise AcknowledgementError( f"Incorrect ACK message received: got {ack} expected {ack_expected}" ) value = self.read(size) # Now read in our return data if self.prompt is not None: prompt = self.read(len(self.prompt)) if prompt != self.prompt: raise PromptError( f"Incorrect prompt message received: got {prompt} expected {self.prompt}" ) return value
[docs] def read(self, size=-1, encoding="utf-8"): """ Read the last line. :param int size: Number of bytes to be read. Default is read until termination character is found. :return: The result of the read as returned by the connected instrument. :rtype: `str` """ return self._file.read(size, encoding)
[docs] def read_raw(self, size=-1): """ Read the raw last line. :param int size: Number of bytes to be read. Default is read until termination character is found. :return: The result of the read as returned by the connected instrument. :rtype: `str` """ return self._file.read_raw(size)
# PROPERTIES # @property def timeout(self): """ Gets/sets the communication timeout for this instrument. Note that setting this value after opening the connection is not supported for all connection types. :type: `int` """ return self._file.timeout @timeout.setter def timeout(self, newval): self._file.timeout = newval @property def address(self): """ Gets/sets the target communication of the instrument. This is useful for situations when running straight from a Python shell and your instrument has enumerated with a different address. An example when this can happen is if you are using a USB to Serial adapter and you disconnect/reconnect it. :type: `int` for GPIB address, `str` for other """ return self._file.address @address.setter def address(self, newval): self._file.address = newval @property def terminator(self): """ Gets/sets the terminator used for communication. For communication options where this is applicable, the value corresponds to the ASCII character used for termination in decimal format. Example: 10 sets the character to NEWLINE. :type: `int`, or `str` for GPIB adapters. """ return self._file.terminator @terminator.setter def terminator(self, newval): self._file.terminator = newval @property def prompt(self): """ Gets/sets the prompt used for communication. The prompt refers to a character that is sent back from the instrument after it has finished processing your last command. Typically this is used to indicate to an end-user that the device is ready for input when connected to a serial-terminal interface. In IK, the prompt is specified that that it (and its associated termination character) are read in. The value read in from the device is also checked against the stored prompt value to make sure that everything is still in sync. :type: `str` """ return self._prompt @prompt.setter def prompt(self, newval): self._prompt = newval # BASIC I/O METHODS #
[docs] def write(self, msg): """ Write data string to the connected instrument. This will call the write method for the attached filelike object. This will typically bypass attaching any termination characters or other communication channel related work. .. seealso:: `Instrument.sendcmd` if you wish to send a string to the instrument, while still having InstrumentKit handle termination characters and other communication channel related work. :param str msg: String that will be written to the filelike object (`Instrument._file`) attached to this instrument. """ self._file.write(msg)
[docs] def binblockread(self, data_width, fmt=None): """ " Read a binary data block from attached instrument. This requires that the instrument respond in a particular manner as EOL terminators naturally can not be used in binary transfers. The format is as follows: #{number of following digits:1-9}{num of bytes to be read}{data bytes} :param int data_width: Specify the number of bytes wide each data point is. One of [1,2,4]. :param str fmt: Format string as specified by the :mod:`struct` module, or `None` to choose a format automatically based on the data width. Typically you can just specify `data_width` and leave this default. """ # This needs to be a # symbol for valid binary block symbol = self._file.read_raw(1) if symbol != b"#": # Check to make sure block is valid raise OSError( "Not a valid binary block start. Binary blocks " "require the first character to be #, instead got " "{}".format(symbol) ) else: # Read in the num of digits for next part digits = int(self._file.read_raw(1)) # Read in the num of bytes to be read num_of_bytes = int(self._file.read_raw(digits)) # Make or use the required format string. if fmt is None: fmt = _DEFAULT_FORMATS[data_width] # Read in the data bytes, and pass them to numpy using the specified # data type (format). # This is looped in case a communication timeout occurs midway # through transfer and multiple reads are required tries = 3 data = self._file.read_raw(num_of_bytes) while len(data) < num_of_bytes: old_len = len(data) data += self._file.read_raw(num_of_bytes - old_len) if old_len == len(data): tries -= 1 if tries == 0: raise OSError( "Did not read in the required number of bytes" "during binblock read. Got {}, expected " "{}".format(len(data), num_of_bytes) ) if numpy: return numpy.frombuffer(data, dtype=fmt) return struct.unpack(f"{fmt[0]}{int(len(data)/data_width)}{fmt[-1]}", data)
# CLASS METHODS # URI_SCHEMES = [ "serial", "tcpip", "gpib+usb", "gpib+serial", "visa", "file", "usbtmc", "vxi11", "test", ]
[docs] @classmethod def open_from_uri(cls, uri): # pylint: disable=too-many-return-statements,too-many-branches """ Given an instrument URI, opens the instrument named by that URI. Instrument URIs are formatted with a scheme, such as ``serial://``, followed by a location that is interpreted differently for each scheme. The following examples URIs demonstrate the currently supported schemes and location formats:: serial://COM3 serial:///dev/ttyACM0 tcpip://192.168.0.10:4100 gpib+usb://COM3/15 gpib+serial://COM3/15 gpib+serial:///dev/ttyACM0/15 # Currently non-functional. visa://USB::0x0699::0x0401::C0000001::0::INSTR usbtmc://USB::0x0699::0x0401::C0000001::0::INSTR test:// For the ``serial`` URI scheme, baud rates may be explicitly specified using the query parameter ``baud=``, as in the example ``serial://COM9?baud=115200``. If not specified, the baud rate is assumed to be 115200. :param str uri: URI for the instrument to be loaded. :rtype: `Instrument` .. seealso:: `PySerial`_ documentation for serial port URI format .. _PySerial: http://pyserial.sourceforge.net/ """ # Make sure that urlparse knows that we want query strings. for scheme in cls.URI_SCHEMES: if scheme not in parse.uses_query: parse.uses_query.append(scheme) # Break apart the URI using urlparse. This returns a named tuple whose # parts describe the incoming URI. parsed_uri = parse.urlparse(uri) # We always want the query string to provide keyword args to the # class method. # FIXME: This currently won't work, as everything is strings, # but the other class methods expect ints or floats, depending. kwargs = parse.parse_qs(parsed_uri.query) if parsed_uri.scheme == "serial": # Ex: serial:///dev/ttyACM0 # We want to pass just the netloc and the path to PySerial, # sending the query string as kwargs. Thus, we should make the # device name here. dev_name = parsed_uri.netloc if parsed_uri.path: dev_name = os.path.join(dev_name, parsed_uri.path) # We should handle the baud rate separately, however, to ensure # that the default is set correctly and that the type is `int`, # as expected. if "baud" in kwargs: kwargs["baud"] = int(kwargs["baud"][0]) else: kwargs["baud"] = 115200 return cls.open_serial(dev_name, **kwargs) elif parsed_uri.scheme == "tcpip": # Ex: tcpip://192.168.0.10:4100 host, port = parsed_uri.netloc.split(":") port = int(port) return cls.open_tcpip(host, port, **kwargs) elif parsed_uri.scheme == "gpib+usb" or parsed_uri.scheme == "gpib+serial": # Ex: gpib+usb://COM3/15 # scheme="gpib+usb", netloc="COM3", path="/15" # Make a new device path by joining the netloc (if any) # with all but the last segment of the path. uri_head, uri_tail = os.path.split(parsed_uri.path) dev_path = os.path.join(parsed_uri.netloc, uri_head) return cls.open_gpibusb(dev_path, int(uri_tail), **kwargs) elif parsed_uri.scheme == "visa": # Ex: visa://USB::{VID}::{PID}::{SERIAL}::0::INSTR # where {VID}, {PID} and {SERIAL} are to be replaced with # the vendor ID, product ID and serial number of the USB-VISA # device. return cls.open_visa(parsed_uri.netloc, **kwargs) elif parsed_uri.scheme == "usbtmc": # TODO: check for other kinds of usbtmc URLs. # Ex: usbtmc can take URIs exactly like visa://. return cls.open_usbtmc(parsed_uri.netloc, **kwargs) elif parsed_uri.scheme == "file": return cls.open_file( os.path.join(parsed_uri.netloc, parsed_uri.path), **kwargs ) elif parsed_uri.scheme == "vxi11": # Examples: # vxi11://192.168.1.104 # vxi11://TCPIP::192.168.1.105::gpib,5::INSTR return cls.open_vxi11(parsed_uri.netloc, **kwargs) elif parsed_uri.scheme == "test": return cls.open_test(**kwargs) else: raise NotImplementedError("Invalid scheme or not yet " "implemented.")
[docs] @classmethod def open_tcpip(cls, host, port, auth=None): """ Opens an instrument, connecting via TCP/IP to a given host and TCP port. :param str host: Name or IP address of the instrument. :param int port: TCP port on which the insturment is listening. :param auth: Authentication credentials to establish connection. Type depends on instrument and authentication method used. :rtype: `Instrument` :return: Object representing the connected instrument. .. seealso:: `~socket.socket.connect` for description of `host` and `port` parameters in the TCP/IP address family. """ conn = socket.socket() conn.connect((host, port)) ret_cls = cls(SocketCommunicator(conn), auth=auth) return ret_cls
# pylint: disable=too-many-arguments
[docs] @classmethod def open_serial( cls, port=None, baud=9600, vid=None, pid=None, serial_number=None, timeout=3, write_timeout=3, ): """ Opens an instrument, connecting via a physical or emulated serial port. Note that many instruments which connect via USB are exposed to the operating system as serial ports, so this method will very commonly be used for connecting instruments via USB. This method can be called by either supplying a port as a string, or by specifying vendor and product IDs, and an optional serial number (used when more than one device with the same IDs is attached). If both the port and IDs are supplied, the port will default to the supplied port string, else it will search the available com ports for a port matching the defined IDs and serial number. :param str port: Name of the the port or device file to open a connection on. For example, ``"COM10"`` on Windows or ``"/dev/ttyUSB0"`` on Linux. :param int baud: The baud rate at which instrument communicates. :param int vid: the USB port vendor id. :param int pid: the USB port product id. :param str serial_number: The USB port serial_number. :param float timeout: Number of seconds to wait when reading from the instrument before timing out. :param float write_timeout: Number of seconds to wait when writing to the instrument before timing out. :rtype: `Instrument` :return: Object representing the connected instrument. .. seealso:: `~serial.Serial` for description of `port`, baud rates and timeouts. """ if port is None and vid is None: raise ValueError( "One of port, or the USB VID/PID pair, must be " "specified when " ) if port is not None and vid is not None: raise ValueError( "Cannot specify both a specific port, and a USB" "VID/PID pair." ) if (vid is not None and pid is None) or (pid is not None and vid is None): raise ValueError( "Both VID and PID must be specified when opening" "a serial connection via a USB VID/PID pair." ) if port is None: match_count = 0 for _port in comports(): # If no match on vid/pid, go to next comport if not _port.pid == pid or not _port.vid == vid: continue # If we specified a serial num, verify then break if serial_number is not None and _port.serial_number == serial_number: port = _port.device break # If no provided serial number, match, but also keep a count if serial_number is None: port = _port.device match_count += 1 # If we found more than 1 vid/pid device, but no serial number, # raise an exception due to ambiguity if match_count > 1: raise SerialException( "Found more than one matching serial " "port from VID/PID pair" ) # if the port is still None after that, raise an error. if port is None and vid is not None: err_msg = ( "Could not find a port with the attributes vid: {vid}, " "pid: {pid}, serial number: {serial_number}" ) raise ValueError( err_msg.format( vid=vid, pid=pid, serial_number="any" if serial_number is None else serial_number, ) ) ser = serial_manager.new_serial_connection( port, baud=baud, timeout=timeout, write_timeout=write_timeout ) return cls(ser)
[docs] @classmethod def open_gpibusb(cls, port, gpib_address, timeout=3, write_timeout=3, model="gi"): """ Opens an instrument, connecting via a `Galvant Industries GPIB-USB adapter`_. :param str port: Name of the the port or device file to open a connection on. Note that because the GI GPIB-USB adapter identifies as a serial port to the operating system, this should be the name of a serial port. :param int gpib_address: Address on the connected GPIB bus assigned to the instrument. :param float timeout: Number of seconds to wait when reading from the instrument before timing out. :param float write_timeout: Number of seconds to wait when writing to the instrument before timing out. :param str model: The brand of adapter to be connected to. Currently supported is "gi" for Galvant Industries, and "pl" for Prologix LLC. :rtype: `Instrument` :return: Object representing the connected instrument. .. seealso:: `~serial.Serial` for description of `port` and timeouts. .. _Galvant Industries GPIB-USB adapter: galvant.ca/#!/store/gpibusb """ ser = serial_manager.new_serial_connection( port, baud=460800, timeout=timeout, write_timeout=write_timeout ) return cls(GPIBCommunicator(ser, gpib_address, model))
[docs] @classmethod def open_gpibethernet(cls, host, port, gpib_address, model="pl"): """ Opens an instrument, connecting via a Prologix GPIBETHERNET adapter. :param str host: Name or IP address of the instrument. :param int port: TCP port on which the insturment is listening. :param int gpib_address: Address on the connected GPIB bus assigned to the instrument. :param str model: The brand of adapter to be connected to. Currently supported is "gi" for Galvant Industries, and "pl" for Prologix LLC. .. warning:: This function has been setup for use with the Prologix GPIBETHERNET adapter but has not been tested as confirmed working. """ conn = socket.socket() conn.connect((host, port)) return cls(GPIBCommunicator(conn, gpib_address, model))
[docs] @classmethod def open_visa(cls, resource_name): """ Opens an instrument, connecting using the VISA library. Note that `PyVISA`_ and a VISA implementation must both be present and installed for this method to function. :param str resource_name: Name of a VISA resource representing the given instrument. :rtype: `Instrument` :return: Object representing the connected instrument. .. seealso:: `National Instruments help page on VISA resource names <http://zone.ni.com/reference/en-XX/help/371361J-01/lvinstio/visa_resource_name_generic/>`_. .. _PyVISA: http://pyvisa.sourceforge.net/ """ version = list(map(int, pyvisa.__version__.split("."))) while len(version) < 3: version += [0] if version[0] >= 1 and version[1] >= 6: ins = pyvisa.ResourceManager().open_resource(resource_name) else: ins = pyvisa.instrument(resource_name) # pylint: disable=no-member return cls(VisaCommunicator(ins))
[docs] @classmethod def open_test(cls, stdin=None, stdout=None): """ Opens an instrument using a loopback communicator for a test connection. The primary use case of this is to instantiate a specific instrument class without requiring an actual physical connection of any kind. This is also very useful for creating unit tests through the parameters of this class method. :param stdin: The stream of data coming from the instrument :type stdin: `io.BytesIO` or `None` :param stdout: Empty data stream that will hold data sent from the Python class to the loopback communicator. This can then be checked for the contents. :type stdout: `io.BytesIO` or `None` :return: Object representing the virtually-connected instrument """ return cls(LoopbackCommunicator(stdin, stdout))
[docs] @classmethod def open_usbtmc(cls, *args, **kwargs): """ Opens an instrument, connecting to a USB-TMC device using the Python `usbtmc` library. .. warning:: The operational status of this is unknown. It is suggested that you connect via the other provided class methods. For Linux, if you have the ``usbtmc`` kernel module, the `~instruments.Instrument.open_file` class method will work. On Windows, using the `~instruments.Instrument.open_visa` class method along with having the VISA libraries installed will work. :return: Object representing the connected instrument """ usbtmc_comm = USBTMCCommunicator(*args, **kwargs) return cls(usbtmc_comm)
[docs] @classmethod def open_vxi11(cls, *args, **kwargs): """ Opens a vxi11 enabled instrument, connecting using the python library `python-vxi11`_. This package must be present and installed for this method to function. :rtype: `Instrument` :return: Object representing the connected instrument. .. _python-vxi11: https://github.com/python-ivi/python-vxi11 """ vxi11_comm = VXI11Communicator(*args, **kwargs) return cls(vxi11_comm)
[docs] @classmethod def open_usb(cls, vid, pid): """ Opens an instrument, connecting via a raw USB stream. .. note:: Note that raw USB a very uncommon of connecting to instruments, even for those that are connected by USB. Most will identify as either serial ports (in which case, `~instruments.Instrument.open_serial` should be used), or as USB-TMC devices. On Linux, USB-TMC devices can be connected using `~instruments.Instrument.open_file`, provided that the ``usbtmc`` kernel module is loaded. On Windows, some such devices can be opened using the VISA library and the `~instruments.Instrument.open_visa` method. :param str vid: Vendor ID of the USB device to open. :param str pid: Product ID of the USB device to open. :rtype: `Instrument` :return: Object representing the connected instrument. """ dev = usb.core.find(idVendor=vid, idProduct=pid) if dev is None: raise OSError("No such device found.") return cls(USBCommunicator(dev))
[docs] @classmethod def open_file(cls, filename): """ Given a file, treats that file as a character device file that can be read from and written to in order to communicate with the instrument. This may be the case, for instance, if the instrument is connected by the Linux ``usbtmc`` kernel driver. :param str filename: Name of the character device to open. :rtype: `Instrument` :return: Object representing the connected instrument. """ return cls(FileCommunicator(filename))
def __enter__(self) -> typing_extensions.Self: return self def __exit__(self, *exc): self._file.__exit__(*exc)