Source code for qslib.plate_setup

# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <const@costi.net>
# SPDX-FileCopyrightText: 2021 - 2023 Constantine Evans <qslib@mb.costi.net>
#
# SPDX-License-Identifier: EUPL-1.2

"""Code for handling plate setup."""
from __future__ import annotations

import xml.etree.ElementTree as ET
from dataclasses import dataclass
from io import BytesIO
from typing import (
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Union,
)
from uuid import uuid1

import attrs
import numpy as np
import pandas as pd
import tabulate

from .qsconnection_async import QSConnectionAsync

_ROWALPHAS = "ABCDEFGHIJKLMNOP"
_ROWALPHAS_96 = "ABCDEFGH"

_WELLNAMES_96 = [x + str(y) for x in _ROWALPHAS_96 for y in range(1, 13)]
_WELLNAMES_384 = [x + str(y) for x in _ROWALPHAS for y in range(1, 25)]

_WELLNAMESET_96 = set(_WELLNAMES_96)
_WELLNAMESET_384 = set(_WELLNAMES_384)

_WELLALPHREF_96 = [(x, f"{y}") for x in _ROWALPHAS_96 for y in range(1, 13)]
_WELLALPHREF_384 = [(x, f"{y}") for x in _ROWALPHAS for y in range(1, 25)]


def _process_color_from_str_int(x: str) -> Tuple[int, int, int, int]:
    """From a string that represents a signed int32 (this choice make no sense),
    interpret it as a unsigned 32-bit integer, then unpack the bits to get R,G,B,A."""

    color_bytes: Tuple[int, int, int, int] = tuple(
        int(b) for b in int(x).to_bytes(4, "little", signed=True)
    )  # type: ignore

    return color_bytes


def _color_to_str_int(x: Tuple[int, int, int, int]) -> str:
    return str(int.from_bytes(bytes(x), "little", signed=True))


def _str_or_list_to_list(v: str | Sequence[str]) -> list[str]:
    if isinstance(v, str):
        return [v]
    return list(v)


[docs] @attrs.define(init=False) class Sample: name: str color: Tuple[int, int, int, int] = attrs.field(default=(255, 0, 0, 255)) properties: dict[str, str] = attrs.field(factory=dict) description: str | None = None wells: list[str] = attrs.field( factory=list, converter=_str_or_list_to_list, on_setattr=attrs.setters.convert ) def __init__( self, name: str, uuid: str | None = None, color: Tuple[int, int, int, int] = (255, 0, 0, 255), properties: dict[str, str] | None = None, description: str | None = None, wells: str | list[str] | None = None, ) -> None: if properties is None: properties = dict() if "SP_UUID" not in properties: if not uuid: uuid = uuid1().hex properties["SP_UUID"] = uuid if wells is None: wells = list() self.__attrs_init__(name, color, properties, description, wells=wells) # type: ignore @property def uuid(self) -> str: return self.properties["SP_UUID"] @uuid.setter def uuid(self, val: str) -> None: self.properties["SP_UUID"] = val
[docs] @classmethod def from_platesetup_sample(cls, se: ET.Element) -> Sample: name = se.findtext("Name") or "Unnamed" color = _process_color_from_str_int(se.findtext("Color") or "-1") keys = [ x.text for x in se.findall("CustomProperty/Property") if x.text is not None ] values = [ x.text for x in se.findall("CustomProperty/Value") if x.text is not None ] properties = {key: value for key, value in zip(keys, values)} return cls( name=name, color=color, properties=properties, description=se.findtext("Description"), )
[docs] def to_xml(self) -> ET.Element: x = ET.Element("Sample") ET.SubElement(x, "Name").text = self.name ET.SubElement(x, "Color").text = _color_to_str_int(self.color) if self.description: ET.SubElement(x, "Description").text = self.description u = ET.SubElement(x, "CustomProperty") for key in self.properties.keys(): ET.SubElement(u, "Property").text = key ET.SubElement(u, "Property").text = "SP_UUID" for value in self.properties.values(): ET.SubElement(u, "Value").text = value ET.SubElement(u, "Value").text = self.uuid return x
@attrs.define() class _SampleWellsView(Mapping[str, list[str]]): samples_by_name: Dict[str, Sample] def __getitem__(self, name: str) -> list[str]: return self.samples_by_name[name].wells def __setitem__(self, name: str, wells: str | list[str]) -> None: if isinstance(wells, str): wells = [wells] try: self.samples_by_name[name].wells = wells except KeyError: self.samples_by_name[name] = Sample(name=name, wells=wells) def __len__(self) -> int: return len(self.samples_by_name) def __iter__(self) -> Iterator[str]: return iter(self.samples_by_name)
[docs] @dataclass class PlateSetup: samples_by_name: Dict[str, Sample] plate_type: Literal[96, 384] = 96 @property def sample_wells(self): return _SampleWellsView(self.samples_by_name)
[docs] @classmethod def from_platesetup_xml(cls, platexml: ET.Element) -> PlateSetup: # type: ignore pt = platexml.find("PlateKind/Type") if pt is None: raise ValueError qs_platetype = pt.text if qs_platetype == "TYPE_8X12": plate_type = 96 # type: Literal[96, 384] elif qs_platetype == "TYPE_16X24": plate_type = 384 else: raise ValueError sample_fvs = platexml.findall( "FeatureMap/Feature/Id[.='sample']/../../FeatureValue" ) samples_by_name: Dict[str, Sample] = dict() samples_by_uuid: Dict[str, Sample] = dict() sample_wells: Dict[str, list[str]] = dict() wn = _WELLNAMES_96 if plate_type == 96 else _WELLNAMES_384 for fv in sample_fvs: if x := fv.findtext("Index"): idx = int(x) else: raise ValueError if y := fv.find("FeatureItem/Sample"): sample = Sample.from_platesetup_sample(y) if sample.name in samples_by_name.keys(): assert sample == samples_by_name[sample.name] assert sample == samples_by_uuid[sample.uuid] sample_wells[sample.name].append(wn[idx]) else: assert sample.uuid not in samples_by_uuid.keys() samples_by_name[sample.name] = sample samples_by_uuid[sample.uuid] = sample sample_wells[sample.name] = [wn[idx]] return cls(sample_wells, samples_by_name, plate_type=plate_type)
def __init__( self, sample_wells: Mapping[str, str | List[str]] | None = None, samples: Iterable[Sample] | Mapping[str, Sample] = tuple(), plate_type: Literal[96, 384] = 96, ) -> None: assert plate_type in (96, 384) self.plate_Type = plate_type if isinstance(samples, Mapping): self.samples_by_name = dict(samples) else: self.samples_by_name = {s.name: s for s in samples} if sample_wells is not None: for name, wells in sample_wells.items(): if name in self.samples_by_name: if isinstance(wells, str): wells = [wells] self.samples_by_name[name].wells = wells else: self.samples_by_name[name] = Sample(name, wells=wells) @property def well_sample(self): well_sample_name = pd.Series( np.full(8 * 12, None, object), index=_WELLNAMES_96 if self.plate_Type == 96 else _WELLNAMES_384, ) for s, ws in self.sample_wells.items(): for w in ws: well_sample_name.loc[w] = s return well_sample_name
[docs] def get_wells(self, samples_or_wells: str | Sequence[str]) -> list[str]: """ Given a sample, well, or list of the two, returns the corresponding wells. Note that this relies on samples not having well-like names. """ wells = [] if isinstance(samples_or_wells, str): samples_or_wells = [samples_or_wells] for sw in samples_or_wells: if sw.upper() in ( _WELLNAMESET_96 if self.plate_Type == 96 else _WELLNAMESET_384 ): wells.append(sw.upper()) else: wells += self.sample_wells[sw] return wells
[docs] def get_descriptive_string(self, name: str) -> str: if (w := name.upper()) in ( _WELLNAMESET_96 if self.plate_Type == 96 else _WELLNAMESET_384 ): return w sample = self.samples_by_name[name] return sample.description or sample.name
[docs] def well_samples_as_array(self) -> np.ndarray[Any, Any]: return self.well_sample.to_numpy().reshape((8, 12))
[docs] def to_lineprotocol(self, timestamp: int, run_name: str | None = None) -> list[str]: if run_name: rts = f',run_name="{run_name}"' else: rts = "" return [ f'platesetup,row={r},col={c} sample="{s}"{rts} {timestamp}' for ((r, c), s) in zip( _WELLALPHREF_96 if self.plate_type == 96 else _WELLALPHREF_384, self.well_sample, ) ]
[docs] @classmethod def from_array( cls, array: Union[np.ndarray, pd.DataFrame], *, make_unique: bool = False ) -> PlateSetup: raise NotImplementedError
[docs] @classmethod def from_tsv(cls, tsvstr: str) -> PlateSetup: raise NotImplementedError
[docs] def to_table( self, headers: Sequence[Union[str, int]] = list(range(1, 13)), tablefmt: str = "orgtbl", showindex: Sequence[str] | None = None, **kwargs: Any, ) -> str: if showindex is None: showindex = _ROWALPHAS_96 if self.plate_Type == 96 else _ROWALPHAS return tabulate.tabulate( self.well_samples_as_array(), tablefmt=tablefmt, headers=[str(x) for x in headers], showindex=showindex, **kwargs, )
[docs] def update_xml(self, root: ET.Element) -> None: samplemap = root.find("FeatureMap/Feature/Id[.='sample']/../..") e: Optional[ET.Element] e = ET.SubElement(root, "PlateKind") ET.SubElement(e, "Type").text = ( "TYPE_8X12" if self.plate_Type == 96 else "TYPE_16X24" ) ET.SubElement(e, "Name").text = ( "96-Well Plate (8x12)" if self.plate_Type == 96 else "384-Well Plate (16x24)" ) ET.SubElement(e, "RowCount").text = "8" if self.plate_Type == 96 else "16" ET.SubElement(e, "ColumnCount").text = "12" if self.plate_Type == 96 else "24" if not samplemap: e = ET.SubElement(root, "FeatureMap") v = ET.SubElement(e, "Feature") ET.SubElement(v, "Id").text = "sample" ET.SubElement(v, "Name").text = "sample" samplemap = e ws = np.array(self.well_sample) for welli in range(0, self.plate_Type): if ws[welli]: e = samplemap.find(f"FeatureValue/Index[.='{welli}']/../FeatureItem") if not e: e = ET.SubElement(samplemap, "FeatureValue") ET.SubElement(e, "Index").text = str(welli) e = ET.SubElement(e, "FeatureItem") if s := e.find("Sample"): e.remove(s) e.append(self.samples_by_name[ws[welli]].to_xml()) else: if e := samplemap.find(f"FeatureValue/Index[.='{welli}']/.."): samplemap.remove(e)
[docs] @classmethod async def from_machine( cls, c: QSConnectionAsync, runtitle: Optional[str] = None ) -> PlateSetup: s = await c.get_sds_file("plate_setup.xml", runtitle=runtitle) x = ET.parse(BytesIO(s), parser=None) return cls.from_platesetup_xml(x.getroot())
def __repr__(self) -> str: return f"PlateSetup(samples {self.sample_wells.keys()}))" def __str__(self) -> str: s = "" if self.sample_wells: s += "Plate setup:\n\n" for sample, wells in self.sample_wells.items(): s += f" - {sample}: {wells}\n" return s def _repr_markdown_(self) -> str: if len(self.sample_wells) < 12: return str(self) else: return self.to_table(tablefmt="pipe")