Source code for qslib.cli

# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <qslib@mb.costi.net>
#
# SPDX-License-Identifier: EUPL-1.2

from __future__ import annotations

import time
from dataclasses import dataclass
from pathlib import Path

import click
from click_aliases import ClickAliasedGroup

from qslib.common import Experiment, Machine
from qslib.qs_is_protocol import (
    AccessLevelExceeded,
    AuthError,
    CommandError,
    InsufficientAccess,
    NoMatch,
)
from qslib.scpi_commands import AccessLevel
from .version import __version__


@click.group(cls=ClickAliasedGroup)
@click.version_option(__version__, "--version", "-V")
def cli():
    pass


@cli.command()
@click.option(
    "-o",
    "--output",
    help="output file name (defaults to experiment base filename + extension)",
    type=click.Path(),
)
@click.option(
    "-f",
    "--format",
    "format",
    help="Specify the output format (matplotlib formats are supported, eg, pdf, svg, png); defaults to pdf.",
    default="pdf",
)
@click.option(
    "-a",
    "--actual/--no-actual",
    default=False,
    help="NOT FINISHED Include actual temperature readings for finished experiments (not included by default).",
)
@click.option("-n", "--open/--no-open", help="Open the file after creating it.")
@click.argument("experiment", type=click.Path(exists=True))
def protocol_plot(experiment, output, format, actual, open) -> None:
    """Plot the temperature protocol in an experiment."""
    import matplotlib.pyplot as plt

    experiment = Path(experiment)

    exp = Experiment.from_file(experiment)

    if output is None:
        output = experiment.with_suffix("." + format)

    fig, ax = plt.subplots(figsize=(21.0 / 2.54, 15.0 / 2.54))

    exp.protocol.plot_protocol(ax)

    fig.savefig(output, format=format)

    if open:
        click.launch(str(output))


@cli.command()
@click.option(
    "-o",
    "--output",
    help="output file name (defaults to experiment base filename + extension)",
    type=click.Path(),
)
@click.option(
    "-n",
    "--open/--no-open",
    "openfile",
    help="Open the file after creating it.",
    default=True,
)
@click.argument("experiment", type=click.Path(exists=True))
def info_html(experiment, output, openfile) -> None:
    """Create an HTML summary of the experiment, and potentially open it."""

    experiment = Path(experiment)

    exp = Experiment.from_file(experiment)

    if output is None:
        output = experiment.with_suffix(".html")

    with open(output, "w") as f:
        f.write(exp.info_html())

    if openfile:
        click.launch(str(output))


@cli.command()
@click.argument("experiment", type=click.Path(exists=True))
def protocol_desc(experiment: str | Path) -> None:
    """Print a description of the protocol in an experiment."""
    experiment = Path(experiment)

    exp = Experiment.from_file(experiment)

    click.echo(str(exp.protocol))


@cli.command()
@click.argument("experiment", type=click.Path(exists=True))
def export_temperatures(experiment: str | Path) -> None:
    """Export temperature readings from an experiment, as a CSV file."""

    experiment = Path(experiment)

    exp = Experiment.from_file(experiment)

    assert exp.temperatures

    exp.temperatures.to_csv(path_or_buf=click.get_binary_stream("stdout"))  # type: ignore


@cli.command()
@click.argument("experiment", type=click.Path(exists=True))
def export_data(experiment: str | Path) -> None:
    """Export fluorescence reading data from an experiment, as a CSV file."""
    experiment = Path(experiment)

    exp = Experiment.from_file(experiment)

    exp.welldata.to_csv(path_or_buf=click.get_binary_stream("stdout"))  # type: ignore


@cli.command()
@click.argument("experiment", type=click.Path(exists=True))
def info(experiment: str) -> None:
    """Output information on the experiment."""

    exp = Experiment.from_file(experiment)

    click.echo(exp.info())


@cli.command()
@click.argument("experiment", type=click.Path(exists=True))
@click.argument("machine")
def run(experiment: str, machine: str) -> None:
    """Run an experiment."""
    exp = Experiment.from_file(experiment)

    import logging

    logging.basicConfig(level=logging.INFO)

    m = Machine(
        machine,
        max_access_level="Controller",
    )

    with m:
        exp.run(m)


@cli.command()
@click.argument("machine")
@click.argument("state", type=click.Choice(["on", "off"]))
def machine_power(machine: str, state: str) -> None:
    """Turn the lamp/heat-block on or off (if idle)."""
    m = Machine(
        machine,
        max_access_level="Controller",
    )

    with m:
        rs = m.run_status()
        m.machine_status()
        mn = m.run_command("SYST:SETT:NICK?")

        if rs.state != "Idle":
            raise click.UsageError(
                f"Machine {mn} is currently running {rs.name}, not changing power state during run."
            )
        else:
            with m.at_access("Controller"):
                m.power = {"on": True, "off": False}[state]


@cli.command()
@click.argument("machine")
def machine_status(machine: str) -> None:
    """Print the current status of a machine."""
    m = Machine(
        machine,
        max_access_level="Observer",
    )

    m.connect()
    rs = m.run_status()
    ms = m.machine_status()
    mn = m.run_command("SYST:SETT:NICK?")
    m.disconnect()

    drawer = ms.drawer
    if drawer == "Closed":
        drawer = click.style(drawer, fg="green")
    if drawer == "Open":
        drawer = click.style(drawer, fg="yellow")
    if drawer == "Unknown":
        drawer = click.style(drawer, fg="red")

    cover = ms.cover
    if cover == "Down":
        cover = click.style(cover, fg="green")
    if cover == "Up":
        cover = click.style(cover, fg="blue")
    if cover == "Unknown":
        cover = click.style(cover, fg="red")

    lamp = ms.lamp_status.split()[0]
    if lamp == "Off":
        lamp = click.style(lamp, fg="yellow")
    if lamp == "On":
        lamp = click.style(lamp, fg="green")

    state = rs.state
    if state == "Running":
        state = click.style(state, fg="green")
    if state == "Idle":
        state = click.style(state, fg="blue")
    if state == "Error":
        state = click.style(state, fg="red")

    click.echo(f"Machine {mn} is {state}.")
    click.echo(
        f"Drawer is {drawer}, cover is {cover}, and lamp is {lamp} and {ms.led_temperature:.2f} °C."
    )

    click.echo(f"Cover temperature is {ms.cover_temperature:.2f} °C", nl=False)
    if ms.target_controlled["Cover"]:
        click.echo(
            f" and is controlled with a target of {ms.target_temperatures['Cover']:.2f} °C."
        )
    else:
        click.echo(" and is uncontrolled.")

    click.echo(
        "Block temperatures are "
        + ", ".join("{:.2f}".format(x) for x in ms.block_temperatures)
        + " °C."
    )
    click.echo(
        "Sample temperatures are "
        + ", ".join("{:.2f}".format(x) for x in ms.block_temperatures)
        + " °C."
    )

    if all(ms.target_controlled[f"Zone{i}"] for i in range(1, 7)):
        click.echo(
            "Zone temperatures are controlled with a target of "
            + ", ".join(
                "{:.2f}".format(ms.target_temperatures[f"Zone{i}"]) for i in range(1, 7)
            )
            + " °C."
        )
    else:
        click.echo("Zone temperatures are uncontrolled.")

    if rs.state != "Idle":
        click.echo(
            f"Run {rs.name}. Stage {rs.stage}/{rs.num_stages},"
            f" cycle {rs.cycle}/{rs.num_cycles}, and step {rs.step}."
        )

    del m

@cli.command(name="list-stored")
@click.argument("machine_path", nargs=-1)
@click.option("-v", "--verbose", is_flag=True, help="Show detailed file information")
@click.option("-s", "--sort", type=click.Choice(['name', 'time', 'size', 'created']), default='name',
              help="Sort by name, modification time, creation time, or file size")
@click.option("-r", "--reverse", is_flag=True, help="Reverse the sort order")
@click.option("-U", "--created", is_flag=True, help="Use creation time instead of modification time")
def list_stored(machine_path: tuple[str, ...], verbose: bool, sort: str, reverse: bool, created: bool) -> None:
    """List experiments stored on a machine.
    
    MACHINE_PATH can be either just a machine name/address, or machine:pattern where
    pattern is used to filter results using shell-style wildcards (*, ?, etc)."""
    
    for mp in machine_path:
        if mp.startswith("["):
            # Handle IPv6 address in brackets
            closing_bracket = mp.find("]")
            if closing_bracket == -1:
                raise click.UsageError("Missing closing bracket for IPv6 address")
        
            machine = mp[1:closing_bracket]
            if len(mp) > closing_bracket + 1:
                if mp[closing_bracket + 1] != ":":
                    raise click.UsageError("Expected ':' after IPv6 address")
                pattern = mp[closing_bracket + 2:]
            else:
                pattern = "*"
        elif ":" in mp:
            machine, pattern = mp.split(":", 1)
        else:
            machine = mp
            pattern = "*"

        m = Machine(
            machine,
            max_access_level="Observer",
        )

        with m:
            if len(machine_path) > 1:
                click.echo(f"{machine}:")
            try:
                files = m.list_runs_in_storage(pattern, verbose=True)
                
                # Sort the files according to the specified criteria
                if sort == 'name':
                    files.sort(key=lambda x: x['path'], reverse=reverse)
                elif sort == 'time':
                    files.sort(key=lambda x: x['mtime'] if not created else x['ctime'], reverse=reverse)
                elif sort == 'size':
                    files.sort(key=lambda x: x['size'], reverse=reverse)
                    
            except NoMatch:
                click.echo(f"No runs found matching {pattern}")
                continue
            for f in files:
                if verbose:
                    size_str = f"{f['size']/1000/1000:.1f}M" if f['size'] > 1000*1000 else f"{f['size']/1000:.1f}k"
                    time_str = f['ctime'].strftime("%Y-%m-%d %H:%M") if created else f['mtime'].strftime("%Y-%m-%d %H:%M")
                    click.echo(f"{size_str:>8}  {time_str}  {f['path']}")
                else:
                    click.echo(f['path'])

@cli.command()
@click.argument("machine")
@click.argument("experiment")
@click.option("-o", "--output", type=click.Path())
def copy(
    machine: str,
    experiment: str,
    output: str | None,
) -> None:
    """Copy experiment from machine storage."""
    m = Machine(
        machine,
        max_access_level="Observer",
    )

    if output is None:
        output = experiment

    with m:
        exp = Experiment.from_machine(m, experiment)
        exp.save_file(output, update_files=False)

@cli.command()
@click.argument("path", nargs=-1)
@click.option("-f", "--force/--no-force", default=False, help="Overwrite existing files")
@click.option("-q", "--quiet/--no-quiet", default=False, help="Suppress output messages")
@click.option("-o", "--output-dir", type=click.Path(), help="Output directory for saved files")
def get_eds(path: tuple[str, ...], force: bool, quiet: bool, output_dir: str | None) -> None:
    """Get all experiments matching a name from the machine.
    
    Each argument should be of the form machine:name, where machine is the machine address
    and name is the glob pattern to match experiment names against.
    """
    if output_dir is not None:
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
    else:
        output_path = Path(".")

    for machine_name in path:
        try:
            machine, name = machine_name.split(":", 1)
        except ValueError:
            click.echo(f"Error: argument '{machine_name}' must be in format 'machine:name'", err=True)
            continue

        m = Machine(machine, max_access_level="Observer")
        try:
            runs = m.list_runs_in_storage(name)
        except NoMatch:
            click.echo(f"No runs found matching {name} on {machine}", err=True)
            continue

        for r in runs:
            click.echo(f"Getting {r} from {machine}: ", nl=False)
            exp = Experiment.from_machine(m, r)

            oname = exp.runtitle_safe

            output_file = output_path / (oname + ".eds")
            if not force and output_file.exists():
                if not quiet:
                    click.echo(f"skipping, {output_file} already exists")
                continue
                
            if not quiet:
                click.echo(f"saving to {output_file}")
            exp.save_file(output_file, update_files=False)

[docs] @dataclass class OutP: level: bool
[docs] def verbose(self, s: str): if self.level: click.secho(s, fg="blue")
[docs] def out(self, s: str): click.secho(s, nl=False)
[docs] def good(self, s: str): click.secho(s, fg="green")
[docs] def warn(self, s: str): click.secho(s, fg="yellow")
[docs] def error(self, s: str): click.secho(s, fg="red")
[docs] class NoNewAccess(BaseException): ...
[docs] class NoAccess(BaseException): ...
[docs] def set_default_access(m: Machine, p: OutP): limitcontexts = m.run_command("CONF* access accessLimits").split() p.verbose("Current access:") for lc in limitcontexts: al = m.run_command(f"CONF? access accessLimits {lc}") p.verbose(f"\t{lc}: {al}") p.out("Setting default (passwordless) maximum access level to CONTROLLER: ") m.run_command("CONF= access accessLimits default CONTROLLER") m.run_command("CONF= access accessLimits eth0 CONTROLLER") p.good("done.")
[docs] def add_controller_password(m: Machine, p: OutP, newpass: str): p.verbose("Current password information:") passwords = m.run_command("CONF* access secrets").split() for password in passwords: pp = m.run_command(f"CONF? access secrets {password}") p.verbose(f"\t{password} : {pp}") p.out("Adding controller password: ") m.run_command(f"CONF= -createMissing access secrets {newpass} CONTROLLER") p.good("done.")
[docs] def add_administrator_password(m: Machine, p: OutP, newpass: str): p.verbose("Current password information:") passwords = m.run_command("CONF* access secrets").split() for password in passwords: pp = m.run_command(f"CONF? access secrets {password}") p.verbose(f"\t{password} : {pp}") p.out("Adding administrator password: ") m.run_command(f"CONF= -createMissing access secrets {newpass} ADMINISTRATOR") p.good("done.")
[docs] def start_ssh_backup(m: Machine, p: OutP, sshpass: str): p.out("Starting SSH in case of failure: ") m.run_command("SYST:EXEC 'dropbear -Y appletini &'") psout = m.run_command("SYST:EXEC -verbose 'ps'") assert "dropbear" in psout p.good("done.") p.warn( f"If procedure fails, SSH is running with user root and password {sshpass}" " (but note options required to connect)." )
[docs] def check_access( host: str, p: OutP, controller_pw: str | None, admin_pw: str | None, default_controller: bool, ): errs: list[CommandError] = [] if controller_pw is not None: p.out("Checking controller password (machine LED should turn yellow): ") m = Machine( host, max_access_level=AccessLevel.Controller, password=controller_pw ) try: with m.ensured_connection(AccessLevel.Controller): m.run_command("LED:YELLOWON") time.sleep(2) m.run_command("LED:BLUEON") p.good("succeeded.") except AuthError as e: p.error("Controller password was not set successfully.") errs.append(e) if admin_pw is not None: p.out("Checking administrator password (machine LED should turn red): ") m = Machine(host, max_access_level=AccessLevel.Administrator, password=admin_pw) try: with m.ensured_connection(AccessLevel.Administrator): m.run_command("LED:REDON") time.sleep(2) m.run_command("LED:BLUEON") p.good("succeeded.") except AuthError as e: p.error("Administrator password was not set successfully.") errs.append(e) if default_controller: p.out( "Checking passwordless controller access (machine LED should turn green): " ) m = Machine(host, max_access_level=AccessLevel.Controller) try: with m.ensured_connection(AccessLevel.Controller): m.run_command("LED:GREENON") time.sleep(2) m.run_command("LED:BLUEON") p.good("succeeded.") except AccessLevelExceeded as e: p.error("Setting passwordless controller access failed.") errs.append(e) if errs: raise NoNewAccess
[docs] def stop_ssh_backup(m: Machine, p: OutP): p.out("Stopping SSH: ") m.run_command("SYST:EXEC 'killall dropbear'") psout = m.run_command("SYST:EXEC -verbose 'ps'") assert "dropbear" not in psout p.good("done.")
[docs] def error_ssh(m: Machine, p: OutP): raise NotImplementedError
[docs] def restart_is(m: Machine, p: OutP): p.out("Restarting zygote on machine: ") with m: with m.at_access(AccessLevel.Controller): try: m.restart_system() except ConnectionError: p.good("succeeded.") else: p.good("command succeeded.") p.out("Waiting 30 seconds for restart: ") time.sleep(30) p.good("done.") p.out("Checking connection: ") with m: pass p.good("succeeded.")
@cli.command() @click.argument("host") @click.argument("current_password") @click.option("-c", "--controller-password", default=None) @click.option("-a", "--admin-password", default=None) @click.option("-d", "--default-controller/--no-default-controller", default=False) @click.option("-v", "--verbose/--no-verbose", default=False) def setup_machine( host: str, current_password: str, controller_password: str | None, admin_password: str | None, default_controller: bool, verbose: bool, ): p = OutP(verbose) m = Machine( host, password=current_password, max_access_level=AccessLevel.Controller ) try: with m.ensured_connection(AccessLevel.Controller): start_ssh_backup(m, p, "emergencypassword") if controller_password: add_controller_password(m, p, controller_password) if admin_password: add_administrator_password(m, p, admin_password) if default_controller: set_default_access(m, p) except AuthError as e: p.error("Current password was not valid.") if verbose: p.error(str(e)) except InsufficientAccess as e: p.error( "Current password was valid, but insufficient. This script needs at least Controller access." ) p.error(str(e)) restart_is(m, p) try: check_access(host, p, controller_password, admin_password, default_controller) with m: with m.at_access(AccessLevel.Controller): stop_ssh_backup(m, p) except NoNewAccess: # Do we have *any* access? try: with Machine( host, password=current_password, max_access_level=AccessLevel.Controller ) as m: stop_ssh_backup(m, p) except Exception as e: p.error("Access failed even with old password.") p.error(f"Error was {e}") p.error("Machine can be accessed directly via SSH using:") p.error( f" ssh root@{host} -p 2222 -o KexAlgorithms=diffie-hellman-group1-sha1" " -o HostKeyAlgorithms='+ssh-rsa'" ) p.error("Password is 'emergencypassword'.") p.error("DO NOT REBOOT THE MACHINE or SSH access will be lost.") else: p.error("New access failed, but old password still works.") if __name__ == "__main__": cli()