Source code for qslib.monitor

# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <qslib@mb.costi.net>
#
# SPDX-License-Identifier: EUPL-1.2

from __future__ import annotations

import asyncio
import functools
import logging
import os
import re
import shutil
import time
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, TextIO, Tuple, Type, Union, cast

import numpy as np
import numpy.typing as npt
from influxdb_client import InfluxDBClient, Point  # , Point, WritePrecision
from influxdb_client.client.write_api import ASYNCHRONOUS
from nio.client import AsyncClient
from nio.client.async_client import AsyncClientConfig
from nio.responses import JoinedRoomsError

from qslib.plate_setup import PlateSetup
from qslib.qs_is_protocol import CommandError
from qslib.qsconnection_async import FilterDataFilename, QSConnectionAsync
from qslib.scpi_commands import AccessLevel, ArgList

log = logging.getLogger("monitor")

LEDSTATUS = re.compile(
    rb"Temperature:([+\-\d.]+) Current:([+\-\d.]+) Voltage:([+\-\d.]+) JuncTemp:([+\-\d.]+)"
)


[docs] @dataclass class LEDStatus: temperature: float current: float voltage: float junctemp: float
[docs] @dataclass(frozen=True) class MatrixConfig: password: str user: str room: str host: str encryption: bool = False
[docs] @dataclass(frozen=True) class InfluxConfig: token: str org: str bucket: str url: str
[docs] @dataclass(frozen=True) class MachineConfig: password: Union[str, None] = None name: str = "localhost" host: str = "localhost" port: str | None = None retries: int = 3 compile: bool = False
[docs] @dataclass(frozen=True) class SyncConfig: completed_directory: Union[str, None] = None in_progress_directory: Union[str, None] = None
[docs] @dataclass(frozen=True) class Config: matrix: Union[MatrixConfig, None] = None influxdb: Union[InfluxConfig, None] = None machine: MachineConfig = MachineConfig() sync: SyncConfig = SyncConfig()
[docs] @dataclass class RunState: name: Optional[str] = None stage: Optional[int | str] = None cycle: Optional[int] = None step: Optional[int] = None plate_setup: Optional[PlateSetup] = None
[docs] async def refresh(self, c: QSConnectionAsync) -> None: runmsg = ArgList.from_string(await c.run_command("RunProgress?")) name = cast(str, runmsg.opts["RunTitle"]) if name == "-": self.name = None else: self.name = re.sub(r"(<([\w.]+)>)?([^<]+)(</[\w.]+>)?", r"\3", name) stage = runmsg.opts["Stage"] if stage == "-": self.stage = None else: self.stage = cast(int, stage) cycle = runmsg.opts["Cycle"] if cycle == "-": self.cycle = None else: self.cycle = cast(int, cycle) step = runmsg.opts["Step"] if self.stage else None if step == "-": self.step = None else: self.step = cast(Optional[int], step) if self.name: try: self.plate_setup = await PlateSetup.from_machine(c) except CommandError: self.plate_setup = None
[docs] @classmethod async def from_machine(cls: Type[RunState], c: QSConnectionAsync) -> RunState: n = cls.__new__(cls) await n.refresh(c) return n
[docs] def statemsg(self, timestamp: str) -> str: s = f'run_state name="{self.name}"' if self.stage: s += f",stage={self.stage}i,cycle={self.cycle}i,step={self.step}i" else: s += ",stage=0i,cycle=0i,step=0i" # FIXME: not great s += f" {timestamp}" return s
[docs] @dataclass class MachineState: zone_targets: List[float] zone_controls: List[bool] cover_target: float cover_control: bool drawer: str
[docs] async def refresh(self, c: QSConnectionAsync) -> None: targmsg = ArgList.from_string(await c.run_command("TBC:SETT?")) self.cover_target = cast(float, targmsg.opts["Cover"]) self.zone_targets = cast( List[float], [targmsg.opts[f"Zone{i}"] for i in range(1, 7)] ) contmsg = ArgList.from_string(await c.run_command("TBC:CONT?")) self.cover_control = cast(bool, contmsg.opts["Cover"]) self.zone_controls = cast( List[bool], [contmsg.opts[f"Zone{i}"] for i in range(1, 7)] ) self.drawer = await c.run_command("DRAW?")
[docs] @classmethod async def from_machine(cls, c: QSConnectionAsync) -> MachineState: n = cast(MachineState, cls.__new__(cls)) await n.refresh(c) return n
# def targetmsg(timestamp): # return ""
[docs] @dataclass class State: run: RunState machine: MachineState
[docs] @classmethod async def from_machine(cls, c: QSConnectionAsync) -> State: run = await RunState.from_machine(c) machine = await MachineState.from_machine(c) return cls(run, machine)
# def parse_fd_fn(x: str) -> Tuple[str, int, int, int, int]: # s = re.search(r"S(\d{2})_C(\d{3})_T(\d{2})_P(\d{4})_M(\d)_X(\d)_filterdata.xml$", x) # assert s is not None # return (f"x{s[6]}-m{s[5]}", int(s[1]), int(s[2]), int(s[3]), int(s[4]))
[docs] def index_to_filename_ref(i: Tuple[str, int, int, int, int]) -> str: x, s, c, t, p = i return f"S{s:02}_C{c:03}_T{t:02}_P{p:04}_M{x[4]}_X{x[1]}"
[docs] async def get_runinfo(c: QSConnectionAsync) -> State: state = await State.from_machine(c) return state
[docs] class Collector: def __init__(self, config: Config): self.config = config if self.config.influxdb: self.idbclient = InfluxDBClient( url=self.config.influxdb.url, token=self.config.influxdb.token, org=self.config.influxdb.org, ) self.idbw = self.idbclient.write_api(write_options=ASYNCHRONOUS) else: self.idbw = None if self.config.matrix: self.matrix_config = AsyncClientConfig( encryption_enabled=self.config.matrix.encryption ) self.matrix_client = AsyncClient( self.config.matrix.host, self.config.matrix.user, store_path="./matrix_store/", config=self.matrix_config, ) log.info(config.sync) self.run_log_file: TextIO | None = None
[docs] def inject( self, t: str | Iterable[str | Point] | Point, flush: bool = False ) -> None: if self.idbw: self.idbw.write(bucket=self.config.influxdb.bucket, record=t) # type:ignore if flush: self.idbw.flush() else: pass
[docs] async def matrix_announce(self, msg: str) -> None: assert self.config.matrix await self.matrix_client.room_send( room_id=self.config.matrix.room, message_type="m.room.message", content={"msgtype": "m.text", "body": msg}, ignore_unverified_devices=True, ) await self.matrix_client.sync()
[docs] async def setup_new_rundir( self, connection: QSConnectionAsync, name: str, *, firstmsg: str | None = None, overwrite: bool = False, ) -> None: # name = name.replace(" ", "_") assert self.ipdir is not None if not self.ipdir.is_dir(): log.error(f"Can't open in-progress directory {self.ipdir}.") return dirpath = self.ipdir / name if dirpath.exists() and (not overwrite): log.error(f"In-progress directory for {name} already exists.") return elif dirpath.exists(): assert dirpath != self.ipdir shutil.rmtree(dirpath) dirpath.mkdir() zf = await connection.read_dir_as_zip(name, "experiment") zf.extractall(dirpath) (dirpath / "apldbio" / "sds" / "quant").mkdir(exist_ok=True) (dirpath / "apldbio" / "sds" / "filter").mkdir(exist_ok=True) (dirpath / "apldbio" / "sds" / "calibrations").mkdir(exist_ok=True) self.run_log_file = (dirpath / "apldbio" / "sds" / "messages.log").open("a") if firstmsg is not None: self.run_log_file.write(firstmsg) self.run_log_file.flush()
@property def ipdir(self) -> Path | None: x = self.config.sync.in_progress_directory if x is None: return None else: return Path(x)
[docs] def run_ip_path(self, name: str) -> Path: # name = name.replace(" ", "_") if (ipdir := self.ipdir) is None: raise ValueError return ipdir / name / "apldbio" / "sds"
[docs] async def compile_eds(self, connection: QSConnectionAsync, name: str) -> None: # name = name.replace(" ", "_") # Wait 5 minutes in case machine compiles it (AB sofware run) await asyncio.sleep(300.0) try: await connection.set_access_level(AccessLevel.Controller) await connection.compile_eds(name) except FileNotFoundError as e: raise e finally: await connection.set_access_level(AccessLevel.Observer)
[docs] async def sync_completed(self, connection: QSConnectionAsync, name: str) -> None: # name = name.replace(" ", "_") try: await self.compile_eds(connection, name) except FileNotFoundError: pass dir = Path(cast(str, self.config.sync.completed_directory)) if not dir.is_dir(): log.error(f"Can't sync completed EDS to invalid path {dir}.") return path = dir / (name + ".eds") if path.exists(): log.error(f"Completed EDS already exists for {name}.") return try: with path.open("wb") as f: edsfile = await connection.read_file(f"public_run_complete:{name}.eds") f.write(edsfile) except Exception as e: log.error(f"Error synchronizing completed EDS {name}: {e}") return if self.ipdir: import shutil if (self.ipdir / name).exists(): shutil.rmtree(self.ipdir / name) if (x := (self.ipdir / (name + ".eds"))).exists(): x.unlink()
[docs] async def docollect( self, args: Dict[str, Union[str, int, bool, float]], state: State, connection: QSConnectionAsync, ) -> None: if state.run.plate_setup: pa: npt.NDArray[ np.object_ ] | None = state.run.plate_setup.well_samples_as_array() else: pa = None run = cast(str, args["run"]) if run.startswith('"'): run = run[1:-1] del args["run"] for k, v in args.items(): if k != "run": args[k] = int(v) pl = [ FilterDataFilename.fromstring(x) for x in await connection.get_expfile_list( "{run}/apldbio/sds/filter/S{stage:02}_C{cycle:03}" "_T{step:02}_P{point:04}_*_filterdata.xml".format( run=run, **cast(Dict[str, int], args) ), allow_nomatch=True, ) ] pl.sort() toget = [x for x in pl if x.is_same_point(pl[-1])] lp: List[str] = [] files: list[tuple[str, bytes]] = [] if ( self.ipdir and ( self.ipdir / run / "apldbio" / "sds" / "filter" # .replace(" ", "_") ).exists() ): for fdf in toget: fdr, files_one = await connection.get_filterdata_one( fdf, return_files=True ) lp += fdr.to_lineprotocol(run_name=run, sample_array=pa) files += files_one else: for fdf in toget: lp += (await connection.get_filterdata_one(fdf)).to_lineprotocol( run_name=run, sample_array=pa ) self.inject(lp, flush=True) for path, data in files: fullpath = self.run_ip_path(run) / path with fullpath.open("wb") as f: f.write(data) if ( self.ipdir and ( self.ipdir / run / "apldbio" / "sds" / "filter" # .replace(" ", "_") ).exists() ): saferun = run # .replace(" ", "_") ipp = self.ipdir / saferun with zipfile.ZipFile(self.ipdir / (saferun + ".eds"), "w") as z: for root, _, zfiles in os.walk(ipp): for zfile in zfiles: fpath = os.path.join(root, zfile) z.write(fpath, os.path.relpath(fpath, ipp))
[docs] async def handle_run_msg( self: Collector, state: State, c: QSConnectionAsync, topic: bytes, message: bytes, timestamp: float | None, ) -> None: topic_str = topic.decode() message_str = message.decode() # Are we logging? if self.run_log_file is not None: self.run_log_file.write(f"{topic_str} {timestamp} {message_str}") self.run_log_file.flush() assert timestamp is not None timestamp = int(1e9 * timestamp) msg = ArgList.from_string(message_str) log.debug(msg) contents = msg.args action = cast(str, contents[0]) if action == "Stage": assert isinstance(contents[1], (str, int)) state.run.stage = contents[1] self.inject( Point("run_action") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) self.inject( Point("run_status") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) elif action == "Cycle": state.run.cycle = int(contents[1]) self.inject( Point("run_action") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) self.inject( Point("run_status") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) elif action == "Step": state.run.step = int(contents[1]) self.inject( Point("run_status") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) self.inject( Point("run_action") .tag("type", action.lower()) .field(action.lower(), contents[1]) .time(timestamp) .to_line_protocol() ) elif action == "Holding": self.inject( f'run_action,type=Holding holdtime={msg.opts["time"]} {timestamp}' # noqa: E501 ) elif action == "Ramping": # TODO: check zones state.machine.zone_targets = [ float(x) for x in cast(list[float], msg.opts["targets"][0]) # type: ignore ] self.inject( f'run_action,type={action} run_name="{state.run.name}" {timestamp}' # noqa: E501 ) elif action == "Acquiring": self.inject( f'run_action,type={action} run_name="{state.run.name}" {timestamp}' # noqa: E501 ) elif action in ["Error", "Ended", "Aborted", "Stopped", "Starting"]: self.inject( f'run_action,type={action} run_name="{state.run.name}" {timestamp}' # noqa: E501 ) asyncio.tasks.create_task( self.matrix_announce( f"{self.config.machine.name} status: {action} {' '.join(str(x) for x in contents[1:])}" # noqa: E501 ) ) if action == "Ended": self.run_log_file = None if self.config.machine.compile: assert state.run.name compdir = self.config.sync.completed_directory if compdir != "": # This will need to compile and sync asyncio.tasks.create_task( self.sync_completed(c, state.run.name) ) else: # No sync; just compile asyncio.tasks.create_task(self.compile_eds(c, state.run.name)) elif action == "Starting": if self.ipdir: newname: str = cast(str, contents[1]) newname.strip('"') asyncio.tasks.create_task( self.setup_new_rundir( c, newname, firstmsg=f"\n{topic_str} {timestamp} {message_str}", ) ) elif action == "Collected": self.inject( f'run_action,type={action} run_name="{state.run.name}" {timestamp}' # noqa: E501 ) asyncio.tasks.create_task(self.docollect(msg.opts, state, c)) else: self.inject( Point("run_action") .tag("type", "Other") .tag("run_name", state.run.name) .field("message", " ".join(str(x) for x in contents)) .time(timestamp) ) await state.run.refresh(c) await state.machine.refresh(c) log.info(message_str) self.inject(state.run.statemsg(str(timestamp))) if state.run.plate_setup: self.inject( state.run.plate_setup.to_lineprotocol(timestamp, state.run.name) ) if self.idbw: self.idbw.flush()
[docs] async def handle_led( self, topic: bytes, message: bytes, timestamp: float | None ) -> None: # Are we logging? if self.run_log_file is not None: self.run_log_file.write(f"{topic.decode()} {timestamp} {message.decode()}") self.run_log_file.flush() assert timestamp is not None ls = LEDSTATUS.match(message) assert ls p = ( Point("lamp") .field("temperature", float(ls[1].decode())) .field("current", float(ls[2].decode())) .field("voltage", float(ls[3].decode())) .field("junctemp", float(ls[4].decode())) .time(int(1e9 * timestamp)) ) self.inject(p, flush=True)
[docs] async def handle_msg( self, state: State, c: QSConnectionAsync, topic: bytes, message: bytes, timestamp: float | None, ) -> None: # Are we logging? if self.run_log_file is not None: self.run_log_file.write(f"{topic.decode()} {timestamp} {message.decode()}") self.run_log_file.flush() assert timestamp is not None args = ArgList.from_string(message.decode()).opts log.debug(f"Handling message {topic.decode()} {message.decode()}") if topic == b"Temperature": recs = [] for i, (s, b, t) in enumerate( zip( # FIXME: parsing weirdness: these are single-element tuples [float(x) for x in cast(list[float], args["sample"][0])], # type: ignore [float(x) for x in cast(list[float], args["block"][0])], # type: ignore state.machine.zone_targets, ) ): recs.append( f"temperature,loc=zones,zone={i} sample={s},block={b},target={t} {int(1e9 * timestamp)}" # noqa: E501 ) recs.append( Point("temperature").tag("loc", "cover").field("cover", args["cover"]) ) recs.append( Point("temperature") .tag("loc", "heatsink") .field("heatsink", args["heatsink"]) ) self.inject(recs) elif topic == b"Time": p = Point("run_time") for key in ["elapsed", "remaining", "active"]: if key in args.keys(): p = p.field(key, args[key]) p.time(int(1e9 * timestamp)) self.inject(p) if self.idbw: self.idbw.flush()
[docs] async def monitor(self, connected_fut: asyncio.Future[bool] | None = None) -> None: if self.config.matrix is not None: await self.matrix_client.login(self.config.matrix.password) joinedroomresp = await self.matrix_client.joined_rooms() if isinstance(joinedroomresp, JoinedRoomsError): log.error(joinedroomresp) joinedrooms = [] else: joinedrooms = joinedroomresp.rooms if self.config.matrix.room not in joinedrooms: await self.matrix_client.join(self.config.matrix.room) async with QSConnectionAsync( host=self.config.machine.host, port=int(self.config.machine.port) if self.config.machine.port is not None else None, password=self.config.machine.password, ) as c: log.info("monitor connected") # Are we currently *in* a run? If so, we'll need to get info. state = await get_runinfo(c) log.info(f"status info: {state}") self.inject(state.run.statemsg(str(time.time_ns()))) if state.run.plate_setup: self.inject( state.run.plate_setup.to_lineprotocol( time.time_ns(), state.run.name ) ) if self.idbw: self.idbw.flush() # Setup directory if run already started: if state.run.name and self.ipdir: await self.setup_new_rundir(c, state.run.name, overwrite=True) await c.run_command("SUBS -timestamp Temperature Time Run LEDStatus") log.debug("subscriptions made") for t in [b"Temperature", b"Time"]: c._protocol.topic_handlers[t] = functools.partial( self.handle_msg, state, c ) c._protocol.topic_handlers[b"Run"] = functools.partial( self.handle_run_msg, state, c ) c._protocol.topic_handlers[b"LEDStatus"] = self.handle_led log.info(c._protocol.topic_handlers) if connected_fut is not None: connected_fut.set_result(True) ok = True while ok: await asyncio.wait((c._protocol.lostconnection,), timeout=60) # Have we lost the connection? if c._protocol.lostconnection.done(): log.error("Lost connection.") ok = False # Are we actually fine? if time.time() - c._protocol.last_received <= 60.0: continue # No, we have a sleep timeout. Send a test command. try: await asyncio.wait_for(c.run_command("ISTAT?"), 30.0) except TimeoutError: log.error( "No data received in 5 minutes and ISTAT? test timed out. Trying to disconnect." ) await c.disconnect() raise TimeoutError
[docs] async def reliable_monitor( self, connected_fut: asyncio.Future[bool] | None = None ) -> None: log.info("starting reconnectable monitoring") restart = True successive_failures = 0 while restart: try: await self.monitor(connected_fut=connected_fut) except asyncio.exceptions.TimeoutError as e: successive_failures = 0 log.warn(f"lost connection with timeout {e}") except OSError as e: log.error(f"connectio error {e}, retrying") except Exception as e: if self.config.machine.retries - successive_failures > 0: log.error( f"Error {repr(e)}, retrying {self.config.machine.retries-successive_failures} times" # noqa: E501 ) successive_failures += 1 else: log.critical(f"giving up, error {e}") restart = False log.debug("awaiting retry") await asyncio.sleep(30)