Source code for instruments.comet.cito_plus_1310

#!/usr/bin/env python
"""Support for Comet Cito Plus RF generator."""

# IMPORTS #####################################################################

from enum import IntEnum
from typing import Union

from instruments.abstract_instruments import Instrument
from instruments.units import ureg as u
from instruments.util_fns import assume_units

# CLASSES #####################################################################


[docs] class CitoPlus1310(Instrument): """Communicate with the Comet Cito Plus 1310 RF generator. Various connection options are available for different models. Note that this driver is only tested with the RS-232 interface and that, according to the manual, communication over TCP/IP is different. Important: Make sure that the correct parity is set in the instrument and when calling the instrument. The default seems to be even parity. Below example used even parity for the communication. In general, all communication parameters (baud rate, parity, etc.) can be set in the instrument itself. Below example just shows one possible configuration. Example: >>> import serial >>> import instruments as ik >>> port = '/dev/ttyUSB0' >>> baud = 115200 >>> inst = ik.comet.CitoPlus1310.open_serial(port, baud, parity=serial.PARITY_EVEN) >>> inst.rf # query RF state False >>> inst.rf = True # turn on RF """
[docs] class RegulationMode(IntEnum): """Regulation modes that are available on the Cito Plus 1310.""" ForwardPower = 0 LoadPower = 1 ProcessControl = 2
def __init__(self, filelike): super().__init__(filelike) self._address = 0x0A self._exception_codes = { 0x01: "Unknown parameter or illegal function code", 0x04: "Value invalid", 0x05: "Parameter not writable", 0x06: "Parameter not readable", 0x07: "Stop", 0x08: "Not allowed", 0x09: "Wrong data type", 0x0A: "Internal error", 0x0B: "Value too high", 0x0C: "Value too low", } self._byte_order = "big" # byte order for command and data self._byte_order_crc = "little" # byte order for CRC-16 checksum @property def name(self) -> str: """Get the name of the instrument.""" data = self.query(self._make_pkg(10)) return data.decode("utf-8") @property def forward_power(self) -> u.Quantity: """Get the actual forward power of the generator in W. :return: Forward power. :rtype: Quantity """ data = self.query(self._make_pkg(8021)) data = int.from_bytes(data, byteorder=self._byte_order) return assume_units(data, u.mW).to(u.W) @property def load_power(self) -> u.Quantity: """Get the actual load power of the generator in W. :return: Load power. :rtype: Quantity """ data = self.query(self._make_pkg(8023)) data = int.from_bytes(data, byteorder=self._byte_order) return assume_units(data, u.mW).to(u.W) @property def output_power(self) -> u.Quantity: """Get/set the set output power of the generator in W. :return: Output power. :rtype: Quantity """ data = self.query(self._make_pkg(1206)) data = int.from_bytes(data, byteorder=self._byte_order) return assume_units(data, u.mW).to(u.W) @output_power.setter def output_power(self, value: u.Quantity) -> None: value = assume_units(value, u.W).to(u.mW) if value < 1 * u.W: value = 0 * u.W # instrument can't set anything lower value = int(value.magnitude) self.sendcmd(self._make_pkg(1206, value)) @property def reflected_power(self) -> u.Quantity: """Get the actual reflected power of the generator in W. :return: Reflected power. :rtype: Quantity """ data = self.query(self._make_pkg(8022)) data = int.from_bytes(data, byteorder=self._byte_order) return assume_units(data, u.mW).to(u.W) @property def regulation_mode(self) -> RegulationMode: """Get/set the regulation mode of the generator. :return: Regulation mode. :rtype: RegulationMode """ data = self.query(self._make_pkg(1201)) return self.RegulationMode(int.from_bytes(data, byteorder=self._byte_order)) @regulation_mode.setter def regulation_mode(self, value) -> None: self.sendcmd(self._make_pkg(1201, value.value)) @property def rf(self) -> bool: """Get/set the RF state. :return: The RF state. :rtype: bool """ data = self.query(self._make_pkg(8000)) return int.from_bytes(data, byteorder=self._byte_order) != 1 @rf.setter def rf(self, value: bool) -> None: data = 1 if value else 0 self.sendcmd(self._make_pkg(1001, data))
[docs] def sendcmd(self, pkg: bytes) -> None: """Write a command to the instrument. Uses the query command to check return, i.e., that everything is fine, but does not return data. :param bytes pkg: The package to send to the instrument. """ self.query(pkg, write_cmd=True)
[docs] def query(self, pkg: bytes, write_cmd=False) -> Union[None, bytes]: """Query instrument. This will check if the command is accepted by the instrument and if not, raise an OSError with the appropriate return code that came back. :param bytes pkg: The package to send to the instrument. :param boolwrite_cmd: If True, this is a write command and will only check if received package the same as sent one. """ self._file.write_raw(pkg) hdr = self._file.read_raw(2) fn_code = hdr[1] if fn_code != 0x41 and fn_code != 0x42: exc_code = self._file.read_raw(1)[0] self._check_exception(fn_code, exc_code) if write_cmd: # read the rest, make sure the packages agree and if not raise OSError. len_to_read = len(pkg) - 2 rest = self._file.read_raw(len_to_read) pkg_return = hdr + rest if pkg_return != pkg: raise OSError("Received package does not match sent package.") return # so it is a query and we expect data data_length = self._file.read_raw(1) data = self._file.read_raw( int.from_bytes(data_length, byteorder=self._byte_order) ) crc = self._file.read_raw(2) crc_exp = _crc16(hdr + data_length + data).to_bytes( 2, byteorder=self._byte_order_crc ) if crc != crc_exp: raise OSError("CRC-16 checksum of returned package does not match.") return data
def _check_exception(self, fn_code: int, exc_code: int) -> None: """Checks if the function code is an exception and raises an OSError if so. :param int fn_code: The function code. :param int exc_code: The exception code. :raises OSError: If the function code is an exception. """ if fn_code != 0x41 or fn_code != 0x42: raise OSError( f"Exception code: {hex(exc_code)}: {self._exception_codes.get(exc_code, 'Unknown')}" ) def _make_hdr(self, fn_code: int) -> bytes: """Make the header according to our init settings. :param int fn_code: The function code to use. :return: The header bytes. :rtype: bytes """ hdr = bytes([self._address, fn_code]) return hdr def _make_pkg(self, cmd_code, data=None, data_length=4): """Create a package to send to the instrument. :param int cmd_code: The command code. :param data: The data to send. If None, this is a read command. Defaults to None. :param int data_length: The length of the data in bytes. Only used when writing. :return: Properly packed data to send to the instrument. :rtype: bytes """ if data is None: fn_code = 0x41 else: fn_code = 0x42 hdr = self._make_hdr(fn_code) cmd = cmd_code.to_bytes(length=2, byteorder=self._byte_order) if data is not None: dat = data.to_bytes(length=data_length, byteorder=self._byte_order) else: dat = (0x01).to_bytes(length=2, byteorder=self._byte_order) pkg = hdr + cmd + dat crc = _crc16(pkg) crc_bytes = crc.to_bytes(2, byteorder=self._byte_order_crc) return pkg + crc_bytes
def _crc16(data: bytes): """Create the CRC-16 checksum for the given data. :param bytes data: The data for which to create the checksum. :return: The CRC-16 checksum. :rtype: int """ crc16tab = [ 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, 0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, 0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, 0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, 0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, 0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, 0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, 0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, 0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, 0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, 0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, 0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, 0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, 0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, 0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, 0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040, ] crc = 0 for dat in data: tmp = (0xFF & crc) ^ dat # only last 16 bits of `crc`! crc = (crc >> 8) ^ crc16tab[tmp] return crc