from __future__ import annotations
import asyncio
import base64
import io
import logging
import re
import zipfile
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Generator, IO, Literal, overload
import nest_asyncio
import paramiko.pkey
from .parser import arglist
from .qsconnection_async import QSConnectionAsync
from .tcprotocol import Protocol
from .util import _unwrap_tags
from sshtunnel import SSHTunnelForwarder
nest_asyncio.apply()
from .base import RunStatus, MachineStatus, AccessLevel
log = logging.getLogger(__name__)
[docs]@dataclass(init=False)
class Machine:
"""
A connection to a QuantStudio machine. The connection can be opened and closed, and reused.
A maximum access level can be set and changed, which will prevent the access level from going
above that level. By default, the initial connection is as Observer.
For clean access, the class provides a context manager for the connection, and the
`at_level` method provides a context manager for access level:
>>> with Machine('machine', 'password', max_access_level="Controller") as m:
>>> # Now connected
>>> print(m.run_status()) # runs at Observer level
>>> with m.at_level("Controller", exclusive=True):
>>> # Runs as Controller, and fails if another Controller is connected (exclusive)
>>> m.abort_run()
>>> m.drawer_open()
>>> # Now back to Observer
>>> print(m.status())
>>> # Now disconnected.
The connection context manager can also be used with :code:`with m:` form for a Machine
instance :code:`m` that already exists, in which case it will connect and disconnect.
If you don't want to use these, you can also use :any:`connect` and :any:`disconnect`.
Note that there is *no supported method* on the machine's server for removing hanging connections
other than a reboot, and AB's software will not start runs when other connections hold Controller
level.
Parameters
----------
host: str
The host name or IP to connect to.
password: str
The password to use. Note that this class does not obscure or protect the password at all,
because it should not be relied on for security. See :ref:`access-and-security` for more
information.
max_access_level: "Observer", "Controller", "Administrator", or "Full"
The maximum access level to allow. This is *not* the initial access level, which
will be Observer. The parameter can be changed later by changing the :code:`max_access_level`
attribute.
port: int (default = 7000)
The port to connect to. (Use the normal SCPI port, not the line-editor connection usually
on 2323).
tunnel_host: str or tuple[str, int], optional
If set, to a hostname/IP string or (hostname/IP, port) tuple, create an SSH tunnel to the tunnel_host
in order to connect to the machine, rather than connecting directly. This uses paramiko, and
will not read your ssh configuration file for any host aliases, but will use your keys if you
have an ssh-agent running.
tunnel_user: str, optional
If set, specify the user for the tunnel connection. If unset, the local user name is used.
tunnel_key: str or paramiko.pkey.PKey, optional
If set, specify the filename of a private key, or the paramiko-loaded key, to use for the tunnel
connection.
Examples
--------
Set up a connection
"""
host: str
password: str | None = None
max_access_level: AccessLevel | str = AccessLevel.Observer
port: int = 7000
_initial_access_level: AccessLevel | str = AccessLevel.Observer
tunnel_host: str | tuple[str, int] | None = None
tunnel_user: str | None = None
tunnel_key: str | "paramiko.pkey.PKey" | None = None
def __init__(
self,
host: str,
password: str | None = None,
max_access_level: AccessLevel | str = AccessLevel.Observer,
port: int = 7000,
connect_now: bool = False,
tunnel_host: str | tuple[str, int] | None = None,
tunnel_user: str | None = None,
tunnel_key: str | "paramiko.pkey.PKey" | None = None,
_initial_access_level: AccessLevel | str = AccessLevel.Observer,
):
self.host = host
self.port = port
self.password = password
self.max_access_level = AccessLevel(max_access_level)
self._initial_access_level = AccessLevel(_initial_access_level)
self.tunnel_host = tunnel_host
self.tunnel_user = tunnel_user
self.tunnel_key = tunnel_key
self._tunnel: SSHTunnelForwarder | None = None
self._qsc: QSConnectionAsync | None = None
if connect_now:
self.connect()
@property
def _use_tunnel(self) -> bool:
return self.tunnel_host is not None
[docs] def connect(self) -> None:
"""Open the connection."""
loop = asyncio.get_event_loop()
if self._use_tunnel:
self._tunnel = SSHTunnelForwarder(
self.tunnel_host,
ssh_username=self.tunnel_user,
ssh_pkey=self.tunnel_key,
remote_bind_address=(self.host, self.port),
)
self._tunnel.start()
port = self._tunnel.local_bind_port
host = "localhost"
else:
port = self.port
host = self.host
self._qsc = QSConnectionAsync(
host,
port,
password=self.password,
initial_access_level=AccessLevel(self._initial_access_level),
)
loop.run_until_complete(self._qsc.connect())
def __enter__(self) -> Machine:
self.connect()
return self
[docs] def run_command(self, command: str) -> str:
"""Run a SCPI command, and return the response as a string.
Waits for OK, not just NEXT.
Parameters
----------
command : str
command to run
Returns
-------
str
Response message (after "OK", not including it)
Raises
------
CommandError
Received an Error response.
"""
if self._qsc is None:
raise ConnectionError(f"Not connected to {self.host}.")
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._qsc.run_command(command))
[docs] def run_command_to_ack(self, command: str) -> str:
"""Run an SCPI command, and return the response as a string.
Returns after the command is processed (OK or NEXT), but potentially
before it has completed (NEXT).
Parameters
----------
command : str
command to run
Returns
-------
str
Response message (after "OK" or "NEXT", likely "" in latter case)
Raises
------
CommandError
Received an Error response.
"""
if self._qsc is None:
raise ConnectionError(f"Not connected to {self.host}")
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._qsc.run_command(command, just_ack=True))
[docs] def define_protocol(self, protocol: Protocol) -> None:
"""Send a protocol to the machine. This *is not related* to a particular
experiment. The name on the machine is set by the protocol.
Parameters
----------
protocol : Protocol
protocol to send
"""
self.run_command(f"{protocol.to_command()}")
[docs] def read_dir_as_zip(self, path: str, leaf: str = "FILE") -> zipfile.ZipFile:
"""Read a directory on the
Parameters
----------
path : str
path on the machine
leaf : str, optional
leaf to use, by default "FILE"
Returns
-------
zipfile.ZipFile
the returned zip file
"""
x = self.run_command_bytes(f"{leaf}:ZIPREAD? {path}")
return zipfile.ZipFile(io.BytesIO(base64.decodebytes(x[7:-10])))
@overload
def list_files(
self,
path: str,
*,
leaf: str = "FILE",
verbose: Literal[True],
recursive: bool = False,
) -> list[dict[str, Any]]:
...
@overload
def list_files(
self,
path: str,
*,
leaf: str = "FILE",
verbose: Literal[False],
recursive: bool = False,
) -> list[str]:
...
[docs] def list_files(
self,
path: str,
*,
leaf: str = "FILE",
verbose: bool = False,
recursive: bool = False,
) -> list[str] | list[dict[str, Any]]:
if not verbose:
if recursive:
raise NotImplementedError
return self.run_command(f"{leaf}:LIST? {path}").split("\n")[1:-1]
else:
v = self.run_command(f"{leaf}:LIST? -verbose {path}").split("\n")[1:-1]
v = [arglist.parseString(x) for x in v]
ret = []
for x in v:
d = {}
d["path"] = x["arglist"]["args"][0] # type: ignore
d |= x["arglist"]["opts"] # type: ignore
if d["type"] == "folder" and recursive:
ret += self.list_files(
d["path"], leaf=leaf, verbose=True, recursive=True
)
else:
ret.append(d)
return ret
[docs] def read_file(
self, path: str, context: str | None = None, leaf: str = "FILE"
) -> bytes:
"""Read a file.
Parameters
----------
path : str
File path on the machine.
context : str | None (default None)
Context.
leaf: str (default FILE)
Returns
-------
bytes
returned file
"""
if not context:
contexts = ""
elif context[-1] == ":":
contexts = context
else:
contexts = context + ":"
x = self.run_command_bytes(f"{leaf}:READ? {contexts}{path}")
return base64.decodebytes(x[7:-10])
[docs] def write_file(self, path: str, data: str | bytes) -> None:
if isinstance(data, str):
data = data.encode()
self.run_command_bytes(
b"FILE:WRITE "
+ path.encode()
+ b" <quote.base64>\n"
+ base64.encodebytes(data)
+ b"\n</quote.base64>"
)
[docs] def list_runs_in_storage(self) -> list[str]:
"""List runs in machine storage.
Returns
-------
list[str]
run filenames. Retrieve with load_run_from_storage
(to open as :any`Experiment`) or save_run_from_storage
(to download and save it without opening.)
"""
x = self.run_command("FILE:LIST? public_run_complete:")
a = x.split("\n")[1:-1]
return [re.sub("^public_run_complete:", "", s) for s in a if s.endswith("eds")]
[docs] def load_run_from_storage(self, path: str) -> "Experiment": # type: ignore
from .experiment import Experiment
"""Load a run from machine storage as an Experiment
"""
return Experiment.from_machine_storage(self, path)
[docs] def save_run_from_storage(
self, machine_path: str, download_path: str | IO[bytes], overwrite: bool = False
) -> None:
"""Download a file from run storage on the machine.
Parameters
----------
machine_path : str
filename on the machine
download_path : str | IO[bytes]
filename to download to, or an open file
overwrite : bool, optional
if False and provided a filename rather than an
open file, will not overwrite existing filies; by default
False
"""
fdata = self.read_file(machine_path, context="public_run_complete")
if not isinstance(download_path, str):
file = download_path
file.write(fdata)
else:
if overwrite:
file = open(download_path, "wb")
else:
file = open(download_path, "xb")
try:
file.write(fdata)
finally:
file.close()
[docs] def run_command_bytes(self, command: str | bytes) -> bytes:
"""Run an SCPI command, and return the response as bytes (undecoded).
Returns after the command is processed (OK or NEXT), but potentially
before it has completed (NEXT).
Parameters
----------
command : str | bytes
command to run
Returns
-------
bytes
Response message (after "OK" or "NEXT", likely "" in latter case)
Raises
------
CommandError
Received
"""
if self._qsc is None:
raise ConnectionError(f"Not connected to {self.host}.")
if isinstance(command, str):
command = command.encode()
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._qsc._protocol.run_command(command))
[docs] def run_status(self) -> RunStatus:
"""Return information on the status of any run."""
return RunStatus.from_machine(self)
[docs] def machine_status(self) -> MachineStatus:
"""Return information on the status of the machine."""
return MachineStatus.from_machine(self)
[docs] def get_running_protocol(self) -> Protocol:
p = _unwrap_tags(self.run_command("PROT? ${Protocol}"))
pn, svs, rm = self.run_command(
"RET ${Protocol} ${SampleVolume} ${RunMode}"
).split()
p = f"PROT -volume={svs} -runmod={rm} {pn} " + p
return Protocol.from_command(p)
[docs] def set_access_level(
self,
access_level: AccessLevel | str,
exclusive: bool = False,
stealth: bool = False,
_log: bool = True,
) -> None:
access_level = AccessLevel(access_level)
if access_level > AccessLevel(self.max_access_level):
raise ValueError(
f"Access level {access_level} is above maximum {self.max_access_level}."
" Change max_access level to continue."
)
self.run_command(
f"ACC -stealth={stealth} -exclusive={exclusive} {access_level}"
)
if _log:
log.info(f"Took access level {access_level} {exclusive=} {stealth=}")
[docs] def get_access_level(
self,
) -> tuple[AccessLevel, bool, bool]:
ret = self.run_command("ACC?")
m = re.match(r"^-stealth=(\w+) -exclusive=(\w+) (\w+)", ret)
if m is None:
raise ValueError(ret)
return AccessLevel(m[3]), m[2] == "True", m[1] == "True"
[docs] def drawer_open(self) -> None:
"""Open the machine drawer using the OPEN command. This will ensure proper
cover/drawer operation. It *will not check run status*, and will open and
close the drawer during runs and potentially during imaging.
"""
self.run_command("OPEN")
[docs] def drawer_close(self, lower_cover: bool = False) -> None:
"""Close the machine drawer using the OPEN command. This will ensure proper
cover/drawer operation. It *will not check run status*, and will open and
close the drawer during runs and potentially during imaging.
By default, it will not lower the cover automaticaly after closing, use
lower_cover=True to do so.
"""
self.run_command("CLOSE")
if lower_cover:
self.cover_lower()
@property
def status(self) -> RunStatus:
"""Return the current status of the run."""
return RunStatus.from_machine(self)
@property
def drawer_position(self) -> Literal["Open", "Closed", "Unknown"]:
"""Return the drawer position from the DRAW? command."""
return self.run_command("DRAW?") # type: ignore
@property
def cover_position(self) -> Literal["Up", "Down", "Unknown"]:
"""Return the cover position from the ENG? command. Note that
this does not always seem to work."""
f = self.run_command("ENG?")
assert f in ["Up", "Down", "Unknown"]
return f # type: ignore
[docs] def cover_lower(self) -> None:
"""Lower/engage the plate cover, closing the drawer if needed."""
self.drawer_close(lower_cover=False)
self.run_command("COVerDOWN")
def __exit__(self, exc_type: type, exc: Exception, tb: Any) -> None:
self.disconnect()
def __del__(self) -> None:
if self._qsc is not None:
self.disconnect()
[docs] def disconnect(self) -> None:
"""Cleanly disconnect from the machine."""
if self._qsc is None:
raise ConnectionError(f"Not connected to {self.host}.")
loop = asyncio.get_event_loop()
loop.run_until_complete(self._qsc.disconnect())
self._qsc = None
if self._tunnel is not None:
self._tunnel.stop()
self._tunnel = None
[docs] def abort_current_run(self) -> None:
"""Abort (stop immediately) the current run."""
self.run_command("AbortRun ${RunTitle}")
[docs] def stop_current_run(self) -> None:
"""Stop (stop after cycle end) the current run."""
self.run_command("StopRun ${RunTitle}")
[docs] def pause_current_run(self) -> None:
"""Pause the current run now."""
self.run_command_to_ack("PAUSe")
[docs] def pause_current_run_at_temperature(self) -> None:
raise NotImplementedError
[docs] def resume_current_run(self) -> None:
"""Resume the current run."""
self.run_command_to_ack("RESume")
@property
def power(self) -> bool:
"""Get and set the machine's operational power (lamp, etc) as a bool.
Setting this to False will not turn off the machine, just power down
the lamp, temperature control, etc. It will do so even if there is
currently a run.
"""
s = self.run_command("POW?").lower()
if s == "on":
return True
elif s == "off":
return False
else:
raise ValueError(f"Unexpected power status: {s}")
@power.setter
def power(self, value: Literal["on", "off", True, False]): # type: ignore
if value is True:
value = "on"
elif value is False:
value = "off"
self.run_command(f"POW {value}")
@property
def current_run_name(self) -> str | None:
"""Name of current run, or None if no run is active."""
out = self.run_command("RUNTitle?")
if out == "-":
return None
else:
return out
[docs] @contextmanager
def at_access(
self,
access_level: AccessLevel | str,
exclusive: bool = False,
stealth: bool = False,
) -> Generator[Machine, None, None]:
fac, fex, fst = self.get_access_level()
self.set_access_level(access_level, exclusive, stealth, _log=False)
log.info(f"Took access level {access_level} {exclusive=} {stealth=}.")
yield self
self.set_access_level(fac, fex, fst, _log=False)
log.info(
f"Dropped access level {access_level}, returning to {fac} exclusive={fex} stealth={fst}."
)