Source code for autoprotocol.harness

"""
Module containing the harness module which helps with Manifest interpretation

    :copyright: 2021 by The Autoprotocol Development Team, see AUTHORS
        for more details.
    :license: BSD, see LICENSE for more details

"""

import argparse
import io
import json

from . import UserError
from .compound import Compound, CompoundError
from .container import WellGroup
from .protocol import Protocol
from .unit import Unit, UnitError


_DYE_TEST_RS = {"dye4000": "rs18qmhr7t9jwq", "water": "rs17gmh5wafm5p"}


def get_protocol_preview(protocol, name, manifest="manifest.json"):
    """
    Parses the 'preview' section of a manifest to use as protocol inputs

        Example Usage:

        .. code-block:: python

          p = Protocol()
          preview = get_protocol_preview(p, name="qPCR")

        Output:

        .. code-block:: python
        {
            "qPCR_input1": "value",
            "qPCR_input2": "value2",
            "qPCR_group": {
                "group_entry1": "value",
                "group_entry2": "value"
            }
        }


    Parameters
    ----------
    protocol : Protocol
        Protocol object being parsed.
    name : str
        Name of protocol in manifest to get preview for
    manifest : str, optional
        Name of manifest file

    Returns
    -------
    dict
        Dictionary of parameters used as a Protocol input

    Raises
    ------
    RuntimeError
        No manifest.json file present in directory
    RuntimeError
        Protocol not found in manifest
    RuntimeError
        More than one protocol found in manifest

    """
    try:
        manifest_json = io.open(manifest, encoding="utf-8").read()
    except IOError as e:
        raise RuntimeError(f"'{manifest}' file not found in directory.") from e
    manifest = Manifest(json.loads(manifest_json))

    source = [m for m in manifest.protocols if m["name"] == name]
    if not source:
        raise RuntimeError(
            f"Protocol '{name}' not found in list of protocols in this " f"manifest."
        )
    if len(source) != 1:
        raise RuntimeError(
            f"More than one protocol with name '{name}' was found in the "
            f"manifest. All protocol names in a manifest must be unique for it "
            f"to be valid."
        )
    preview = source[0]["preview"]
    run_params = manifest.protocol_info(name).parse(protocol, preview)

    return run_params


def param_default(type_desc):
    if isinstance(type_desc, str):
        type_desc = {"type": type_desc}

    type = type_desc["type"]  # pylint: disable=redefined-builtin
    default = type_desc.get("default")

    if default is not None and type != "group-choice":
        return default

    if type_desc["type"] in ["aliquot+", "aliquot++", "container+"]:
        return []
    elif type_desc["type"] == "group+":
        return [{}]
    elif type_desc["type"] == "group":
        return {k: param_default(v) for k, v in type_desc["inputs"].items()}
    elif type_desc["type"] == "group-choice":
        default_inputs = {}
        for option in type_desc.get("options", []):
            value = option.get("value")
            inputs = option.get("inputs")
            if inputs:
                group_typedesc = {"type": "group", "inputs": inputs}
                default_inputs[value] = param_default(group_typedesc)

        return {"value": default, "inputs": default_inputs}
    elif type_desc["type"] == "csv-table":
        return [{}, [{}]]
    else:
        return None


def convert_param(protocol, val, type_desc):
    """
    Convert parameters based on their input types

    Parameters
    ----------
    protocol : Protocol
        Protocol object being parsed.
    val : str or int or bool or dict or list
        Parameter value to be converted.
    type_desc : dict or str
        Description of input type.

    Returns
    -------
    list or dict or Unit or int or str or float or bool or None
        Converted parameter of the relevant type

    Raises
    ------
    RuntimeError
        Invalid aliquot reference provided for aliquot, aliquot+, aliquot++
    RuntimeError
        Invalid container reference provided for container, container+
    RuntimeError
        Invalid unit-type provided
    RuntimeError
        Invalid temperature condition provided
    RuntimeError
        Invalid format provided for integer or decimal
    RuntimeError
        Invalid value or format provided for group or group+
    RuntimeError
        Invalid format provided for thermocycle or thermocycle_step
    RuntimeError
        Invalid format provided for csv input
    ValueError
        Unknown input type provided

    """
    if isinstance(type_desc, str):
        type_desc = {"type": type_desc}
    if val is None:
        val = param_default(type_desc)
    if val is None:  # still None?
        return None
    type = type_desc["type"]  # pylint: disable=redefined-builtin

    if type == "aliquot":
        try:
            container = ("/").join(val.split("/")[0:-1])
            well_idx = val.split("/")[-1]
            return protocol.refs[container].container.well(well_idx)
        except (KeyError, AttributeError, ValueError) as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"'{val}' (supplied to input '{label}') is not "
                f"a valid reference to an aliquot"
            ) from e
    elif type == "aliquot+":
        try:
            return WellGroup([convert_param(protocol, a, "aliquot") for a in val])
        except RuntimeError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type aliquot+) is "
                f"improperly formatted."
            ) from e
    elif type == "aliquot++":
        try:
            return [convert_param(protocol, aqs, "aliquot+") for aqs in val]
        except RuntimeError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type aliquot++) is "
                f"improperly formatted."
            ) from e
    elif type == "compound":
        try:
            return Compound(val["format"], val["value"])
        except CompoundError as e:
            raise RuntimeError(f"Invalid Compound; Details: {e.value}") from e
    elif type == "compound+":
        try:
            return [convert_param(protocol, cont, "compound") for cont in val]
        except RuntimeError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type compound+) is "
                f"improperly formatted."
            ) from e
    elif type == "container":

        try:
            return protocol.refs[val].container
        except KeyError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"'{val}' (supplied to input '{label}') is not "
                f"a valid reference to a container"
            ) from e
    elif type == "container+":
        try:
            return [convert_param(protocol, cont, "container") for cont in val]
        except RuntimeError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type container+) is "
                f"improperly formatted."
            ) from e
    elif type in [
        "amount_concentration",
        "frequency",
        "length",
        "mass_concentration",
        "time",
        "volume",
        "volume_concentration",
        # TODO: Deprecate the following two types in next major release
        "concentration(mass)",
        "concentration(molar)",
    ]:
        try:
            return Unit(val)
        except UnitError as e:
            raise RuntimeError(
                f"The value supplied ({e.value}) as a unit of '{type}' is "
                f"improperly formatted. Units of {type} must be in the form: "
                f"'number:unit'"
            ) from e
    elif type == "temperature":
        try:
            if val in [
                "ambient",
                "warm_30",
                "warm_35",
                "warm_37",
                "cold_4",
                "cold_20",
                "cold_80",
                "cold_196",
            ]:
                return val
            return Unit(val)
        except UnitError as e:
            raise RuntimeError(
                f"Invalid temperature value for {e.value}: "
                f"temperature input types must be either "
                f"storage conditions (ex: 'cold_20') or "
                f"temperature units in the form of "
                f"'number:unit'"
            ) from e
    elif type in "bool":
        return bool(val)
    elif type in "csv":
        return val
    elif type in ["string", "choice"]:
        return str(val)
    elif type == "integer":
        try:
            return int(val)
        except ValueError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type integer) is "
                f"improperly formatted."
            ) from e
    elif type == "decimal":
        try:
            return float(val)
        except ValueError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type decimal) is "
                f"improperly formatted."
            ) from e
    elif type == "group":
        try:
            return {
                k: convert_param(protocol, val.get(k), type_desc["inputs"][k])
                for k in type_desc["inputs"]
            }
        except KeyError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type group) is "
                f"missing a(n) {e} field."
            ) from e
        except AttributeError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type group) is "
                f"improperly formatted."
            ) from e
    elif type == "group+":
        try:
            return [
                {
                    k: convert_param(protocol, x.get(k), type_desc["inputs"][k])
                    for k in type_desc["inputs"]
                }
                for x in val
            ]
        except (TypeError, AttributeError) as e:
            raise RuntimeError(
                f"The value supplied to input '{type_desc['label']}' "
                f"(type group+) must be in the form of a list of dictionaries"
            ) from e
        except KeyError as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The value supplied to input '{label}' (type group+) is "
                f"missing a(n) {e} field."
            ) from e
    elif type == "group-choice":
        try:
            return {
                "value": val["value"],
                "inputs": {
                    opt["value"]: convert_param(
                        protocol,
                        val["inputs"].get(opt["value"]),
                        {"type": "group", "inputs": opt["inputs"]},
                    )
                    for opt in type_desc["options"]
                    if opt["value"] == val["value"]
                },
            }
        except (KeyError, AttributeError) as e:
            label = type_desc.get("label") or "[unknown]"
            if e in ["value", "inputs"]:
                raise RuntimeError(
                    f"The value supplied to input '{label}' "
                    f"(type group-choice) is missing a(n) {e} field."
                ) from e
    elif type == "thermocycle":
        try:
            return [
                {
                    "cycles": g["cycles"],
                    "steps": [
                        convert_param(protocol, s, "thermocycle_step")
                        for s in g["steps"]
                    ],
                }
                for g in val
            ]
        except (TypeError, KeyError) as e:
            raise RuntimeError(_thermocycle_error_text()) from e

    elif type == "thermocycle_step":
        try:
            output = {"duration": Unit(val["duration"])}
        except UnitError as e:
            raise RuntimeError(
                f"Invalid duration value for {e.value}: duration input types "
                f"must be time units in the form of 'number:unit'"
            ) from e

        try:
            if "gradient" in val:
                output["gradient"] = {
                    "top": Unit(val["gradient"]["top"]),
                    "bottom": Unit(val["gradient"]["bottom"]),
                }
            else:
                output["temperature"] = Unit(val["temperature"])
        except UnitError as e:
            raise RuntimeError(
                f"Invalid temperature value for {e.value}: thermocycle "
                f"temperature input types must be temperature units in the "
                f"form of 'number:unit'"
            ) from e

        if "read" in val:
            output["read"] = val["read"]

        return output

    elif type == "csv-table":
        try:
            values = []
            for i, row in enumerate(val[1]):
                value = {}
                for header, header_value in row.items():
                    type_desc = {
                        "type": val[0].get(header),
                        "label": f"csv-table item ({i}): {header}",
                    }

                    value[header] = convert_param(
                        protocol, header_value, type_desc=type_desc
                    )

                values.append(value)

            return values

        except (AttributeError, IndexError, TypeError) as e:
            label = type_desc.get("label") or "[unknown]"
            raise RuntimeError(
                f"The values supplied to {label} (type csv-table) are "
                f"improperly formatted. Format must be a list of dictionaries "
                f"with the first dictionary comprising keys with associated "
                f"column input types."
            ) from e

    else:
        raise ValueError(f"Unknown input type {type!r}")


class ProtocolInfo(object):
    def __init__(self, json_dict):
        self.input_types = json_dict["inputs"]

    def parse(self, protocol, inputs):
        refs = inputs["refs"]
        params = inputs["parameters"]

        for name in refs:
            ref = refs[name]
            c = protocol.ref(
                name,
                ref.get("id"),
                ref["type"],
                storage=ref.get("store"),
                discard=ref.get("discard"),
                cover=ref.get("cover"),
                properties=ref.get("properties"),
                ctx_properties=ref.get("contextual_custom_properties"),
            )
            aqs = ref.get("aliquots")
            if aqs:
                for idx in aqs:
                    aq = aqs[idx]
                    c.well(idx).set_volume(aq["volume"])
                    if "name" in aq:
                        c.well(idx).set_name(aq["name"])
                    if "mass" in aq:
                        c.well(idx).set_mass(aq["mass"])
                    if "properties" in aq:
                        c.well(idx).set_properties(aq.get("properties"))
                    if "contextual_custom_properties" in aq:
                        c.well(idx).set_ctx_properties(
                            aq.get("contextual_custom_properties")
                        )
                    if "compounds" in aq:
                        c.well(idx).set_compounds(aq.get("compounds"))

        out_params = {}
        for k in self.input_types:
            typeDesc = self.input_types[k]
            out_params[k] = convert_param(protocol, params.get(k), typeDesc)

        return out_params


[docs]class Manifest(object): """ Object representation of a manifest.json file Parameters ---------- object : JSON object A manifest.json file with the following format: .. code-block:: none { "format": "python", "license": "MIT", "description": "This is a protocol.", "protocols": [ { "name": "SampleProtocol", "version": 1.0.0, "command_string": "python sample_protocol.py", "preview": { "refs":{}, "parameters": {}, "inputs": {}, "dependencies": [] } } ] } """ def __init__(self, json_dict): self.protocols = json_dict["protocols"] def protocol_info(self, name): try: return ProtocolInfo(next(p for p in self.protocols if p["name"] == name)) except StopIteration as e: raise RuntimeError( f"Harness.run(): {name} does not match the " f"'name' field of any protocol in the " f"associated manifest.json file." ) from e
[docs]def run(fn, protocol_name=None, seal_after_run=True, protocol_class=None): """ Run the protocol specified by the function. If protocol_name is passed, use preview parameters from the protocol with the matching "name" value in the manifest.json file to run the given function. Otherwise, take configuration JSON file from the command line and run the given function. Parameters ---------- fn : function Function that generates Autoprotocol protocol_name : str, optional str matching the "name" value in the manifest.json file seal_after_run : bool, optional Implicitly add a seal/cover to all stored refs within the protocol using seal_on_store() protocol_class: Protocol, optional References the base protocol class to be used for instantiation. If not provided, defaults to using Autoprotocol Python's default Protocol implementation Raises ------ TypeError If protocol_class provided is not a subclass of Protocol """ parser = argparse.ArgumentParser() parser.add_argument("config", help="JSON-formatted protocol configuration file") parser.add_argument( "--dye_test", help="Execute protocol by pre-filling preview aliquots with OrangeG " "dye, and provisioning water only.", action="store_true", ) args = parser.parse_args() source = json.loads(io.open(args.config, encoding="utf-8").read()) if protocol_class is None: protocol = Protocol() else: if not issubclass(protocol_class, Protocol): raise TypeError( "Protocol class provided needs to be subclass of " "Protocol" ) protocol = protocol_class() # pragma pylint: disable=protected-access if protocol_name: manifest_json = io.open("manifest.json", encoding="utf-8").read() manifest = Manifest(json.loads(manifest_json)) params = manifest.protocol_info(protocol_name).parse(protocol, source) # Add dye to preview aliquots if --dye_test included as an optional # argument if args.dye_test: num_dye_steps = _add_dye_to_preview_refs(protocol) else: params = protocol._ref_containers_and_wells(source["parameters"]) # pragma pylint: enable=protected-access try: fn(protocol, params) if seal_after_run: seal_on_store(protocol) # Convert all provisions to water if --dye_test is included as an # optional argument if args.dye_test: _convert_provision_instructions( protocol, num_dye_steps, len(protocol.instructions) - 1 ) _convert_dispense_instructions( protocol, num_dye_steps, len(protocol.instructions) - 1 ) except UserError as e: print( json.dumps({"errors": [{"message": e.message, "info": e.info}]}, indent=2) ) return print(json.dumps(protocol.as_dict(), indent=2))
def _add_dye_to_preview_refs(protocol, rs=_DYE_TEST_RS["dye4000"]): # Store starting number of instructions starting_num = len(protocol.instructions) # For each ref in protocol for _, ref_obj in protocol.refs.items(): ref_cont = ref_obj.container # Raise RuntimeError if any refs have an id, to avoid adding dye to # real samples if ref_cont.id: raise RuntimeError( "Cannot run a dye test when any ref has a defined container " "id. Please resubmit using only new containers." ) # Add dye to each well for well in ref_cont.all_wells(): current_vol = well.volume if current_vol and current_vol > Unit(0, "microliter"): protocol.provision(rs, well, current_vol) well.set_volume(current_vol) # Return number of instructions added return len(protocol.instructions) - starting_num def _convert_provision_instructions( protocol, first_index, last_index, rs=_DYE_TEST_RS["water"] ): # Make sure inputs are valid if not isinstance(first_index, int): raise ValueError("first_index must be a non-negative integer") if not isinstance(last_index, int): raise ValueError("last_index must be a non-negative integer") if first_index < 0: raise ValueError("Indices out of range. first_index must be 0 or greater") if first_index > len(protocol.instructions) - 1: raise ValueError( "Indices out of range. The last instruction index in the protocol " "is %d" % (len(protocol.instructions) - 1) ) if last_index > len(protocol.instructions) - 1: raise ValueError( "Indices out of range. The last instruction index in the protocol " "is %d" % (len(protocol.instructions) - 1) ) if last_index < first_index: raise ValueError("last_index must be greater than or equal to first_index") for instruction in protocol.instructions[first_index : last_index + 1]: if instruction.op == "provision": instruction.data["resource_id"] = rs def _convert_dispense_instructions( protocol, first_index, last_index, rs=_DYE_TEST_RS["water"] ): # Make sure inputs are valid if not isinstance(first_index, int): raise ValueError("first_index must be a non-negative integer") if not isinstance(last_index, int): raise ValueError("last_index must be a non-negative integer") if first_index < 0: raise ValueError("Indices out of range. first_index must be 0 or greater") if first_index > len(protocol.instructions) - 1: raise ValueError( "Indices out of range. The last instruction index in the protocol " "is %d" % (len(protocol.instructions) - 1) ) if last_index > len(protocol.instructions) - 1: raise ValueError( "Indices out of range. The last instruction index in the protocol " "is %d" % (len(protocol.instructions) - 1) ) if last_index < first_index: raise ValueError("last_index must be greater than or equal to first_index") for instruction in protocol.instructions[first_index : last_index + 1]: if instruction.op == "dispense": if "resource_id" in instruction.data: instruction.data["resource_id"] = rs if "reagent" in instruction.data: instruction.data.pop("reagent", None) instruction.data["resource_id"] = rs def _thermocycle_error_text(): """ Returns formatted error text for thermocycle value errors """ return """Thermocycle input types must take a list of dictionaries in the form of: [{"cycles": integer, "steps": [{ "duration": duration, "temperature": temperature "read": boolean (optional) }] }] --or-- [{"cycles": integer, "steps": [{ "duration": duration, "gradient": { "top": temperature, "bottom": temperature } "read": boolean (optional) }] }] (You can intermix gradient and non-gradient steps)"""
[docs]def seal_on_store(protocol): """ Implicitly adds seal/cover instructions to the end of a run for containers that do not have a cover. Cover type applied defaults first to "seal" if its within the capabilities of the container type, otherwise to "cover". Example Usage: .. code-block:: python def example_method(protocol, params): cont = params['container'] p.transfer(cont.well("A1"), cont.well("A2"), "10:microliter") p.seal(cont) p.unseal(cont) p.cover(cont) p.uncover(cont) Autoprotocol Output: .. code-block:: json { "refs": { "plate": { "new": "96-pcr", "store": { "where": "ambient" } } }, "instructions": [ { "groups": [ { "transfer": [ { "volume": "10.0:microliter", "to": "plate/1", "from": "plate/0" } ] } ], "op": "pipette" }, { "object": "plate", "type": "ultra-clear", "op": "seal" }, { "object": "plate", "op": "unseal" }, { "lid": "universal", "object": "plate", "op": "cover" }, { "object": "plate", "op": "uncover" }, { "type": "ultra-clear", "object": "plate", "op": "seal" } ] } """ for _, ref in protocol.refs.items(): if ref.opts.store: if not (ref.container.is_covered() or ref.container.is_sealed()): default_method = ref.container.container_type.prioritize_seal_or_cover sealable = "seal" in ref.container.container_type.capabilities coverable = "cover" in ref.container.container_type.capabilities if default_method == "seal" and sealable: protocol.seal( ref.container, ref.container.container_type.seal_types[0] ) elif default_method == "cover" and coverable: protocol.cover( ref.container, ref.container.container_type.cover_types[0] ) elif sealable: protocol.seal( ref.container, ref.container.container_type.seal_types[0] ) elif coverable: protocol.cover( ref.container, ref.container.container_type.cover_types[0] ) else: continue