Source code for qslib.plate_setup

# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <const@costi.net>
# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <qslib@mb.costi.net>
#
# SPDX-License-Identifier: EUPL-1.2

"""Code for handling plate setup."""
from __future__ import annotations

import xml.etree.ElementTree as ET
from dataclasses import dataclass
from io import BytesIO
from typing import (
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Union,
    TYPE_CHECKING
)
from uuid import uuid1

import attrs
import numpy as np
import pandas as pd
import tabulate

if TYPE_CHECKING:
    from kithairon import PickList, Labware
    from typing_extensions import Self

from .qsconnection_async import QSConnectionAsync

_ROWALPHAS = "ABCDEFGHIJKLMNOP"
_ROWALPHAS_96 = "ABCDEFGH"

_WELLNAMES_96 = [x + str(y) for x in _ROWALPHAS_96 for y in range(1, 13)]
_WELLNAMES_384 = [x + str(y) for x in _ROWALPHAS for y in range(1, 25)]

_WELLNAMESET_96 = set(_WELLNAMES_96)
_WELLNAMESET_384 = set(_WELLNAMES_384)

_WELLALPHREF_96 = [(x, f"{y}") for x in _ROWALPHAS_96 for y in range(1, 13)]
_WELLALPHREF_384 = [(x, f"{y}") for x in _ROWALPHAS for y in range(1, 25)]


def _process_color_from_str_int(x: str) -> Tuple[int, int, int, int]:
    """From a string that represents a signed int32 (this choice make no sense),
    interpret it as a unsigned 32-bit integer, then unpack the bits to get R,G,B,A."""

    color_bytes: Tuple[int, int, int, int] = tuple(
        int(b) for b in int(x).to_bytes(4, "little", signed=True)
    )  # type: ignore

    return color_bytes


def _color_to_str_int(x: Tuple[int, int, int, int]) -> str:
    return str(int.from_bytes(bytes(x), "little", signed=True))


def _str_or_list_to_list(v: str | Sequence[str]) -> list[str]:
    if isinstance(v, str):
        return [v]
    return list(v)


[docs] @attrs.define(init=False) class Sample: name: str color: Tuple[int, int, int, int] = attrs.field(default=(255, 0, 0, 255)) properties: dict[str, str] = attrs.field(factory=dict) description: str | None = None wells: list[str] = attrs.field( factory=list, converter=_str_or_list_to_list, on_setattr=attrs.setters.convert ) def __init__( self, name: str, uuid: str | None = None, color: Tuple[int, int, int, int] = (255, 0, 0, 255), properties: dict[str, str] | None = None, description: str | None = None, wells: str | list[str] | None = None, ) -> None: if properties is None: properties = dict() if "SP_UUID" not in properties: if not uuid: uuid = uuid1().hex properties["SP_UUID"] = uuid if wells is None: wells = list() self.__attrs_init__(name, color, properties, description, wells=wells) # type: ignore @property def uuid(self) -> str: return self.properties["SP_UUID"] @uuid.setter def uuid(self, val: str) -> None: self.properties["SP_UUID"] = val
[docs] @classmethod def from_platesetup_sample(cls, se: ET.Element) -> Sample: name = se.findtext("Name") or "Unnamed" color = _process_color_from_str_int(se.findtext("Color") or "-1") keys = [ x.text for x in se.findall("CustomProperty/Property") if x.text is not None ] values = [ x.text for x in se.findall("CustomProperty/Value") if x.text is not None ] properties = {key: value for key, value in zip(keys, values)} return cls( name=name, color=color, properties=properties, description=se.findtext("Description"), )
[docs] def to_xml(self) -> ET.Element: x = ET.Element("Sample") ET.SubElement(x, "Name").text = self.name ET.SubElement(x, "Color").text = _color_to_str_int(self.color) if self.description: ET.SubElement(x, "Description").text = self.description u = ET.SubElement(x, "CustomProperty") for key in self.properties.keys(): ET.SubElement(u, "Property").text = key ET.SubElement(u, "Property").text = "SP_UUID" for value in self.properties.values(): ET.SubElement(u, "Value").text = value ET.SubElement(u, "Value").text = self.uuid return x
@attrs.define() class _SampleWellsView(Mapping[str, list[str]]): samples_by_name: Dict[str, Sample] def __getitem__(self, name: str) -> list[str]: return self.samples_by_name[name].wells def __setitem__(self, name: str, wells: str | list[str]) -> None: if isinstance(wells, str): wells = [wells] try: self.samples_by_name[name].wells = wells except KeyError: self.samples_by_name[name] = Sample(name=name, wells=wells) def __len__(self) -> int: return len(self.samples_by_name) def __iter__(self) -> Iterator[str]: return iter(self.samples_by_name)
[docs] @dataclass class PlateSetup: samples_by_name: Dict[str, Sample] plate_type: Literal[96, 384] = 96 @property def sample_wells(self): return _SampleWellsView(self.samples_by_name)
[docs] @classmethod def from_platesetup_xml(cls, platexml: ET.Element) -> PlateSetup: # type: ignore pt = platexml.find("PlateKind/Type") if pt is None: raise ValueError qs_platetype = pt.text if qs_platetype == "TYPE_8X12": plate_type = 96 # type: Literal[96, 384] elif qs_platetype == "TYPE_16X24": plate_type = 384 else: raise ValueError sample_fvs = platexml.findall( "FeatureMap/Feature/Id[.='sample']/../../FeatureValue" ) samples_by_name: Dict[str, Sample] = dict() samples_by_uuid: Dict[str, Sample] = dict() sample_wells: Dict[str, list[str]] = dict() wn = _WELLNAMES_96 if plate_type == 96 else _WELLNAMES_384 for fv in sample_fvs: if x := fv.findtext("Index"): idx = int(x) else: raise ValueError if y := fv.find("FeatureItem/Sample"): sample = Sample.from_platesetup_sample(y) if sample.name in samples_by_name.keys(): assert sample == samples_by_name[sample.name] assert sample == samples_by_uuid[sample.uuid] sample_wells[sample.name].append(wn[idx]) else: assert sample.uuid not in samples_by_uuid.keys() samples_by_name[sample.name] = sample samples_by_uuid[sample.uuid] = sample sample_wells[sample.name] = [wn[idx]] return cls(sample_wells, samples_by_name, plate_type=plate_type)
def __init__( self, sample_wells: Mapping[str, str | List[str]] | None = None, samples: Iterable[Sample] | Mapping[str, Sample] = tuple(), plate_type: Literal[96, 384] = 96, ) -> None: assert plate_type in (96, 384) self.plate_Type = plate_type if isinstance(samples, Mapping): self.samples_by_name = dict(samples) else: self.samples_by_name = {s.name: s for s in samples} if sample_wells is not None: for name, wells in sample_wells.items(): if name in self.samples_by_name: if isinstance(wells, str): wells = [wells] self.samples_by_name[name].wells = wells else: self.samples_by_name[name] = Sample(name, wells=wells) @property def well_sample(self): well_sample_name = pd.Series( np.full(8 * 12, None, object), index=_WELLNAMES_96 if self.plate_Type == 96 else _WELLNAMES_384, ) for s, ws in self.sample_wells.items(): for w in ws: well_sample_name.loc[w] = s return well_sample_name
[docs] def get_wells(self, samples_or_wells: str | Sequence[str]) -> list[str]: """ Given a sample, well, or list of the two, returns the corresponding wells. Note that this relies on samples not having well-like names. """ wells = [] if isinstance(samples_or_wells, str): samples_or_wells = [samples_or_wells] for sw in samples_or_wells: if sw.upper() in ( _WELLNAMESET_96 if self.plate_Type == 96 else _WELLNAMESET_384 ): wells.append(sw.upper()) else: wells += self.sample_wells[sw] return wells
[docs] def get_descriptive_string(self, name: str) -> str: if (w := name.upper()) in ( _WELLNAMESET_96 if self.plate_Type == 96 else _WELLNAMESET_384 ): return w sample = self.samples_by_name[name] return sample.description or sample.name
[docs] def well_samples_as_array(self) -> np.ndarray[Any, Any]: return self.well_sample.to_numpy().reshape((8, 12))
[docs] def to_lineprotocol(self, timestamp: int, run_name: str | None = None) -> list[str]: if run_name: rts = f',run_name="{run_name}"' else: rts = "" return [ f'platesetup,row={r},col={c} sample="{s}"{rts} {timestamp}' for ((r, c), s) in zip( _WELLALPHREF_96 if self.plate_type == 96 else _WELLALPHREF_384, self.well_sample, ) ]
[docs] @classmethod def from_array( cls, array: Union[np.ndarray, pd.DataFrame], *, make_unique: bool = False ) -> PlateSetup: raise NotImplementedError
[docs] @classmethod def from_tsv(cls, tsvstr: str) -> PlateSetup: raise NotImplementedError
[docs] def to_table( self, headers: Sequence[Union[str, int]] = list(range(1, 13)), tablefmt: str = "orgtbl", showindex: Sequence[str] | None = None, **kwargs: Any, ) -> str: if showindex is None: showindex = _ROWALPHAS_96 if self.plate_Type == 96 else _ROWALPHAS return tabulate.tabulate( self.well_samples_as_array(), tablefmt=tablefmt, headers=[str(x) for x in headers], showindex=showindex, **kwargs, )
[docs] def update_xml(self, root: ET.Element) -> None: samplemap = root.find("FeatureMap/Feature/Id[.='sample']/../..") e: Optional[ET.Element] e = ET.SubElement(root, "PlateKind") ET.SubElement(e, "Type").text = ( "TYPE_8X12" if self.plate_Type == 96 else "TYPE_16X24" ) ET.SubElement(e, "Name").text = ( "96-Well Plate (8x12)" if self.plate_Type == 96 else "384-Well Plate (16x24)" ) ET.SubElement(e, "RowCount").text = "8" if self.plate_Type == 96 else "16" ET.SubElement(e, "ColumnCount").text = "12" if self.plate_Type == 96 else "24" if not samplemap: e = ET.SubElement(root, "FeatureMap") v = ET.SubElement(e, "Feature") ET.SubElement(v, "Id").text = "sample" ET.SubElement(v, "Name").text = "sample" samplemap = e ws = np.array(self.well_sample) for welli in range(0, self.plate_Type): if ws[welli]: e = samplemap.find(f"FeatureValue/Index[.='{welli}']/../FeatureItem") if not e: e = ET.SubElement(samplemap, "FeatureValue") ET.SubElement(e, "Index").text = str(welli) e = ET.SubElement(e, "FeatureItem") if s := e.find("Sample"): e.remove(s) e.append(self.samples_by_name[ws[welli]].to_xml()) else: if e := samplemap.find(f"FeatureValue/Index[.='{welli}']/.."): samplemap.remove(e)
[docs] @classmethod async def from_machine( cls, c: QSConnectionAsync, runtitle: Optional[str] = None ) -> PlateSetup: s = await c.get_sds_file("plate_setup.xml", runtitle=runtitle) x = ET.parse(BytesIO(s), parser=None) return cls.from_platesetup_xml(x.getroot())
def __repr__(self) -> str: return f"PlateSetup(samples {self.sample_wells.keys()}))" def __str__(self) -> str: s = "" if self.sample_wells: s += "Plate setup:\n\n" for sample, wells in self.sample_wells.items(): s += f" - {sample}: {wells}\n" return s def _repr_markdown_(self) -> str: if len(self.sample_wells) < 12: return str(self) else: return self.to_table(tablefmt="pipe")
[docs] @classmethod def from_picklist(cls, picklist: 'PickList' | str, plate_name: str | None = None, labware: 'Labware' | None = None) -> 'Self': """Create a PlateSetup from a Kithairon PickList. Parameters ---------- picklist: PickList or str The picklist to read. If a string, it is treated as a path to a CSV picklist. plate_name: str or None The destination plate that the PlateSetup is for. If None, and there is only one destination plate, that one is used. If there are multiple destination plates, the user must specify one. labware: Labware or None The Kithairon labware to use. If None, the default labware is used. Raises ------ ValueError If: - There is more than one destination plate and no plate_name is specified. - There are multiple sample names in a single well. - The destination plate is not a destination plate type in the Labware. - The destination plate shape is not 96 or 384 (taken from Labware) """ try: import polars as pl except ImportError: raise ValueError("Polars is required to use this method") try: import kithairon as kt except ImportError: raise ValueError("Kithairon is required to use this method") if isinstance(picklist, str): picklist = kt.PickList.read_csv(picklist) destplates = picklist.data.select("Destination Plate Name", "Destination Plate Type").unique() if plate_name is None: if len(destplates) > 1: raise ValueError("Multiple destination plates found; please specify one: " + ", ".join(destplates["Destination Plate Name"].to_list())) plate_name = destplates["Destination Plate Name"][0] plate_type = destplates["Destination Plate Type"][0] else: if plate_name not in destplates["Destination Plate Name"].to_list(): raise ValueError(f"Destination plate {plate_name} not a destination. Destinations are: " + ", ".join(destplates["Destination Plate Name"].to_list())) plate_type = destplates.filter(pl.col("Destination Plate Name") == plate_name)["Destination Plate Type"][0] if labware is None: lw = kt.labware.get_default_labware() else: lw = labware try: plate = lw[plate_type] except KeyError: raise ValueError(f"Labware does not have a plate type {plate_type}") if plate.shape == (8, 12): plate_type = 96 elif plate.shape == (16, 24): plate_type = 384 else: raise ValueError(f"Plate shape {plate.shape} not supported") if plate.usage != "DEST": raise ValueError(f"Plate {plate_name} is not a destination plate type") plate = picklist.data.filter(pl.col("Destination Plate Name") == plate_name) u = pl.col("Destination Sample Name").unique() sample_names = plate.group_by("Destination Well").agg( u.alias("sample_names"), u.len().alias("n_sample_names") ) errs = sample_names.filter(pl.col("n_sample_names") > 1).sort("Destination Well") if not errs.is_empty(): errstring = ", ".join(f"{r['Destination Well']}: {r['sample_names']}" for r in errs.iter_rows(named=True)) raise ValueError(f"Multiple sample names found in well(s): {errstring}") return cls( samples=[ Sample(r["sample_names"][0], wells=r["Destination Well"]) for r in sample_names.iter_rows(named=True) ], plate_type=plate_type )