Skip to content

tasks

CheckException

Bases: EnsembleException

For check failures

Source code in model_ensembler/tasks/exceptions.py
class CheckException(EnsembleException):
    """For check failures"""
    pass

ProcessingException

Bases: EnsembleException

For general processing failures

Source code in model_ensembler/tasks/exceptions.py
class ProcessingException(EnsembleException):
    """For general processing failures"""
    pass

TaskException

Bases: EnsembleException

For task processing failures

Source code in model_ensembler/tasks/exceptions.py
class TaskException(EnsembleException):
    """For task processing failures"""
    pass

check(ctx, cmd, cwd=None, log=False, fail=False, shell=None) async

Check: Call arbitrary command as a check.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
fail bool

If true, then the check returning nonzero will raise an error rather than return false, meaning run abandonment rather than recheck will take place.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
FailureNotToleratedError

Error when fail is true and the check returns a nonzero code.

Source code in model_ensembler/tasks/sys.py
@check_task
async def check(ctx, cmd, cwd=None, log=False, fail=False, shell=None):
    """Check: Call arbitrary command as a check.

    Args:
        ctx (object): Contextual configuration.
        cmd (str): See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        fail (bool, optional): If true, then the check returning nonzero will
            raise an error rather than return false, meaning run abandonment
            rather than recheck will take place.
        shell (str, optional):  See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        FailureNotToleratedError: Error when fail is true and the check
            returns a nonzero code.
    """

    logging.info("Running check: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    logging.debug("Check return code {}".format(res.returncode))

    if res.returncode == 0:
        return True

    if fail:
        raise FailureNotToleratedError("Check failed and it is not tolerable")
    return False

execute(ctx, cmd, cwd=None, log=False, shell=None) async

Process: Call arbitrary command as a processing task.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def execute(ctx, cmd, cwd=None, log=False, shell=None):
    """Process: Call arbitrary command as a processing task.

    Args:
        ctx (object): Contextual configuration.
        cmd (str):  See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        shell (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.
    """
    logging.info("Running command: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    if res.returncode == 0:
        return True
    return False

jobs(ctx, limit, match) async

Check: Assert whether number of jobs in SLURM is under limit.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
limit int

Number of jobs to check for.

required
match str

Prefix to match jobs by.

required

Returns:

Type Description
bool

True if number of jobs is less than limit, otherwise false.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def jobs(ctx, limit, match):
    """Check: Assert whether number of jobs in SLURM is under limit.

    Args:
        ctx (object): Contextual configuration.
        limit (int): Number of jobs to check for.
        match (str): Prefix to match jobs by.

    Returns:
        (bool): True if number of jobs is less than limit, otherwise false.
    """

    # TODO: match with regex
    async with cluster.job_lock:
        job_list = await cluster.current_jobs(ctx, match)
        res = len(job_list) < int(limit)

        logging.debug("Jobs in action {} with limit {}".format(
            len(job_list), limit))

    return res

move(ctx, dest, include=None, exclude=None, cwd=None) async

Process: rsync current working directory contents.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
dest str

Path to copy ctx.id named directory to.

required
include List[str]

rsync include specifiers.

None
exclude List[str]

rsync exclude specifiers, defaults to "*" when calling rsync if include specifiers are given and no exclude specifiers are provided.

None
cwd str

See utils.execute_command.

None

Returns:

Type Description
bool

true if return code is zero, false otherwise.

Raises:

Type Description
RuntimeError

If did not provide necessary context attribute for using this processing task.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def move(ctx, dest, include=None, exclude=None, cwd=None):
    """Process: rsync current working directory contents.

    Args:
        ctx (object): Contextual configuration.
        dest (str): Path to copy ctx.id named directory to.
        include (List[str], optional): rsync include specifiers.
        exclude (List[str], optional): rsync exclude specifiers, defaults to
            "*" when calling rsync if include specifiers are given and no
            exclude specifiers are provided.
        cwd (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): true if return code is zero, false otherwise.

    Raises:
        RuntimeError: If did not provide necessary context attribute for using
            this processing task.
    """

    if not hasattr(ctx, "id"):
        raise RuntimeError("No ID available for move")

    # TODO: Type checking
    include = [] if not include else include
    exclude = ["*"] if not exclude and include else []
    dest = os.path.join(dest, ctx.id)

    liststr = " ".join(["--include=\"{}\"".
                       format(i.strip("\"")) for i in include]) + " " + \
        " ".join(["--exclude=\"{}\"".
                 format(e.strip("\"")) for e in exclude])

    cmd = "rsync -aXE {} ./ {}/".format(liststr, dest)
    logging.info(cmd)

    res = await execute_command(cmd, cwd)
    if res.returncode == 0:
        return True
    return False

quota(ctx, atleast, mnt=None) async

Check: Make sure quota is sufficient.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
atleast int

Amount in kB.

required
mnt str

Path for mount to check quota on if explicitly required.

None

Returns:

Type Description
bool

True if available space is less than atleast, false otherwise.

Raises:

Type Description
IndexError / TypeError

If quota cannot be determined.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def quota(ctx, atleast, mnt=None):
    """Check: Make sure quota is sufficient.

    Args:
        ctx (object): Contextual configuration.
        atleast (int): Amount in kB.
        mnt (str, optional): Path for mount to check quota on if explicitly
                            required.

    Returns:
        (bool): True if available space is less than atleast, false otherwise.

    Raises:
        IndexError/TypeError: If quota cannot be determined.
    """
    # Command responds in 1k blocks
    path_arg = " -f " + mnt if mnt else ""
    quota_cmd = "quota -uw" + path_arg
    res = await execute_command(quota_cmd)
    quota_out = res.stdout.decode()

    try:
        fields = quota_out.splitlines()[-1].split()
        usage = int(fields[1])
        limit = int(fields[2])
        atleast = int(atleast)
    except (IndexError, TypeError):
        logging.exception("Could not reliably determine quota information")
        return False

    res = (limit - usage) >= atleast

    if not res:
        logging.warning("Quota remaining {} is less than {}".
                        format(limit - usage, atleast))
    return res

remove(ctx, directory=None) async

Process: Remove directory using shutil.rmtree.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
directory str

Specify the directory to remove, otherwise this will use the path specified by ctx.dir.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
OSError

If directory cannot be removed.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def remove(ctx, directory=None):
    """Process: Remove directory using shutil.rmtree.

    Args:
        ctx (object): Contextual configuration.
        directory (str, optional): Specify the directory to remove, otherwise
            this will use the path specified by ctx.dir.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        OSError: If directory cannot be removed.
    """
    if not directory:
        directory = ctx.dir

    logging.info("Attempting to remove data on {}".format(directory))

    try:
        shutil.rmtree(directory)
    except OSError as e:
        logging.exception("Could not remove {}: {}".
                          format(directory, e.strerror))
        return False
    return True

submit(ctx, script=None) async

Process: Submit a new job to SLURM.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
script str

Slurm submission script for sbatch.

None

Returns:

Type Description
int

Job identifier.

Source code in model_ensembler/tasks/hpc.py
@processing_task
async def submit(ctx, script=None):
    """Process: Submit a new job to SLURM.

    Args:
        ctx (object): Contextual configuration.
        script (str, optional): Slurm submission script for sbatch.

    Returns:
        (int): Job identifier.
    """

    # TODO: check this as an optional argument avoids run submission
    #  as intended
    if script:
        async with cluster.job_lock:
            await cluster.submit_job(ctx, script)

    return None

exceptions

Task Exceptions

Exceptions relating to tasks

CheckException

Bases: EnsembleException

For check failures

Source code in model_ensembler/tasks/exceptions.py
class CheckException(EnsembleException):
    """For check failures"""
    pass

EnsembleException

Bases: Exception

Common ensemble exception for common implementation

Source code in model_ensembler/tasks/exceptions.py
7
8
9
class EnsembleException(Exception):
    """Common ensemble exception for common implementation"""
    pass

FailureNotToleratedError

Bases: RuntimeError

Check failure error cannot be tolerated

Source code in model_ensembler/tasks/exceptions.py
class FailureNotToleratedError(RuntimeError):
    """Check failure error cannot be tolerated"""
    pass

ProcessingException

Bases: EnsembleException

For general processing failures

Source code in model_ensembler/tasks/exceptions.py
class ProcessingException(EnsembleException):
    """For general processing failures"""
    pass

TaskException

Bases: EnsembleException

For task processing failures

Source code in model_ensembler/tasks/exceptions.py
class TaskException(EnsembleException):
    """For task processing failures"""
    pass

hpc

init_hpc_backend(backend)

Initialise cluster backend to use.

Parameters:

Name Type Description Default
backend str

Cluster backed to use.

required

Raises:

Type Description
ImportError / ModuleNotFoundError

If cluster backend cannot be imported.

Source code in model_ensembler/tasks/hpc.py
def init_hpc_backend(backend):
    """ Initialise cluster backend to use.

    Args:
        backend (str): Cluster backed to use.

    Raises:
        ImportError/ModuleNotFoundError: If cluster backend cannot be imported.
    """
    global cluster

    logging.info("Importing {}".format(backend))
    try:
        cluster = importlib.import_module(backend)
    except (ImportError, ModuleNotFoundError) as e:
        logging.exception("Couldn't dynamically import cluster backend")
        raise e

jobs(ctx, limit, match) async

Check: Assert whether number of jobs in SLURM is under limit.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
limit int

Number of jobs to check for.

required
match str

Prefix to match jobs by.

required

Returns:

Type Description
bool

True if number of jobs is less than limit, otherwise false.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def jobs(ctx, limit, match):
    """Check: Assert whether number of jobs in SLURM is under limit.

    Args:
        ctx (object): Contextual configuration.
        limit (int): Number of jobs to check for.
        match (str): Prefix to match jobs by.

    Returns:
        (bool): True if number of jobs is less than limit, otherwise false.
    """

    # TODO: match with regex
    async with cluster.job_lock:
        job_list = await cluster.current_jobs(ctx, match)
        res = len(job_list) < int(limit)

        logging.debug("Jobs in action {} with limit {}".format(
            len(job_list), limit))

    return res

quota(ctx, atleast, mnt=None) async

Check: Make sure quota is sufficient.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
atleast int

Amount in kB.

required
mnt str

Path for mount to check quota on if explicitly required.

None

Returns:

Type Description
bool

True if available space is less than atleast, false otherwise.

Raises:

Type Description
IndexError / TypeError

If quota cannot be determined.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def quota(ctx, atleast, mnt=None):
    """Check: Make sure quota is sufficient.

    Args:
        ctx (object): Contextual configuration.
        atleast (int): Amount in kB.
        mnt (str, optional): Path for mount to check quota on if explicitly
                            required.

    Returns:
        (bool): True if available space is less than atleast, false otherwise.

    Raises:
        IndexError/TypeError: If quota cannot be determined.
    """
    # Command responds in 1k blocks
    path_arg = " -f " + mnt if mnt else ""
    quota_cmd = "quota -uw" + path_arg
    res = await execute_command(quota_cmd)
    quota_out = res.stdout.decode()

    try:
        fields = quota_out.splitlines()[-1].split()
        usage = int(fields[1])
        limit = int(fields[2])
        atleast = int(atleast)
    except (IndexError, TypeError):
        logging.exception("Could not reliably determine quota information")
        return False

    res = (limit - usage) >= atleast

    if not res:
        logging.warning("Quota remaining {} is less than {}".
                        format(limit - usage, atleast))
    return res

submit(ctx, script=None) async

Process: Submit a new job to SLURM.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
script str

Slurm submission script for sbatch.

None

Returns:

Type Description
int

Job identifier.

Source code in model_ensembler/tasks/hpc.py
@processing_task
async def submit(ctx, script=None):
    """Process: Submit a new job to SLURM.

    Args:
        ctx (object): Contextual configuration.
        script (str, optional): Slurm submission script for sbatch.

    Returns:
        (int): Job identifier.
    """

    # TODO: check this as an optional argument avoids run submission
    #  as intended
    if script:
        async with cluster.job_lock:
            await cluster.submit_job(ctx, script)

    return None

sys

check(ctx, cmd, cwd=None, log=False, fail=False, shell=None) async

Check: Call arbitrary command as a check.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
fail bool

If true, then the check returning nonzero will raise an error rather than return false, meaning run abandonment rather than recheck will take place.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
FailureNotToleratedError

Error when fail is true and the check returns a nonzero code.

Source code in model_ensembler/tasks/sys.py
@check_task
async def check(ctx, cmd, cwd=None, log=False, fail=False, shell=None):
    """Check: Call arbitrary command as a check.

    Args:
        ctx (object): Contextual configuration.
        cmd (str): See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        fail (bool, optional): If true, then the check returning nonzero will
            raise an error rather than return false, meaning run abandonment
            rather than recheck will take place.
        shell (str, optional):  See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        FailureNotToleratedError: Error when fail is true and the check
            returns a nonzero code.
    """

    logging.info("Running check: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    logging.debug("Check return code {}".format(res.returncode))

    if res.returncode == 0:
        return True

    if fail:
        raise FailureNotToleratedError("Check failed and it is not tolerable")
    return False

execute(ctx, cmd, cwd=None, log=False, shell=None) async

Process: Call arbitrary command as a processing task.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def execute(ctx, cmd, cwd=None, log=False, shell=None):
    """Process: Call arbitrary command as a processing task.

    Args:
        ctx (object): Contextual configuration.
        cmd (str):  See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        shell (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.
    """
    logging.info("Running command: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    if res.returncode == 0:
        return True
    return False

move(ctx, dest, include=None, exclude=None, cwd=None) async

Process: rsync current working directory contents.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
dest str

Path to copy ctx.id named directory to.

required
include List[str]

rsync include specifiers.

None
exclude List[str]

rsync exclude specifiers, defaults to "*" when calling rsync if include specifiers are given and no exclude specifiers are provided.

None
cwd str

See utils.execute_command.

None

Returns:

Type Description
bool

true if return code is zero, false otherwise.

Raises:

Type Description
RuntimeError

If did not provide necessary context attribute for using this processing task.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def move(ctx, dest, include=None, exclude=None, cwd=None):
    """Process: rsync current working directory contents.

    Args:
        ctx (object): Contextual configuration.
        dest (str): Path to copy ctx.id named directory to.
        include (List[str], optional): rsync include specifiers.
        exclude (List[str], optional): rsync exclude specifiers, defaults to
            "*" when calling rsync if include specifiers are given and no
            exclude specifiers are provided.
        cwd (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): true if return code is zero, false otherwise.

    Raises:
        RuntimeError: If did not provide necessary context attribute for using
            this processing task.
    """

    if not hasattr(ctx, "id"):
        raise RuntimeError("No ID available for move")

    # TODO: Type checking
    include = [] if not include else include
    exclude = ["*"] if not exclude and include else []
    dest = os.path.join(dest, ctx.id)

    liststr = " ".join(["--include=\"{}\"".
                       format(i.strip("\"")) for i in include]) + " " + \
        " ".join(["--exclude=\"{}\"".
                 format(e.strip("\"")) for e in exclude])

    cmd = "rsync -aXE {} ./ {}/".format(liststr, dest)
    logging.info(cmd)

    res = await execute_command(cmd, cwd)
    if res.returncode == 0:
        return True
    return False

remove(ctx, directory=None) async

Process: Remove directory using shutil.rmtree.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
directory str

Specify the directory to remove, otherwise this will use the path specified by ctx.dir.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
OSError

If directory cannot be removed.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def remove(ctx, directory=None):
    """Process: Remove directory using shutil.rmtree.

    Args:
        ctx (object): Contextual configuration.
        directory (str, optional): Specify the directory to remove, otherwise
            this will use the path specified by ctx.dir.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        OSError: If directory cannot be removed.
    """
    if not directory:
        directory = ctx.dir

    logging.info("Attempting to remove data on {}".format(directory))

    try:
        shutil.rmtree(directory)
    except OSError as e:
        logging.exception("Could not remove {}: {}".
                          format(directory, e.strerror))
        return False
    return True

utils

execute_command(cmd, cwd=None, log=False, shell=None) async

Standard handling for calling external command.

Parameters:

Name Type Description Default
cmd str

The relative path of the command being called to cwd.

required
cwd str

The current working directory to call the cmd from, passed to subprocess.

None
log bool

If true, output stdout/stderr to logfile in cwd.

False
shell str

Which shell to ask subprocess to invoke when processing the command, will default to bash internally.

None

Returns:

Type Description
object

Namespace containing the returncode, stdout and stderr from the process that was invoked.

Source code in model_ensembler/tasks/utils.py
async def execute_command(cmd, cwd=None, log=False, shell=None):
    """Standard handling for calling external command.

    Args:
        cmd (str): The relative path of the command being called to cwd.
        cwd (str, optional): The current working directory to call the cmd
            from, passed to subprocess.
        log (bool, optional): If true, output stdout/stderr to logfile in cwd.
        shell (str, optional): Which shell to ask subprocess to invoke when
            processing the command, will default to bash internally.

    Returns:
        (object): Namespace containing the returncode, stdout and stderr from
            the process that was invoked.
    """

    logging.debug("Executing command {0}, cwd {1}".
                  format(cmd, cwd if cwd else "unset"))

    start_dt = datetime.now()

    args = Arguments()
    shell = args.shell if not shell else shell

    proc = await asyncio.create_subprocess_shell(
        cmd,
        executable=shell,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT,
        cwd=cwd)

    (stdout, stderr) = await proc.communicate()
    await proc.wait()

    if log and stdout:
        log_name = "execute_command.{}.log".\
            format(start_dt.strftime("%H%M%S.%f"))

        if cwd:
            log_name = os.path.join(cwd, log_name)

        with open(log_name, "w") as fh:
            fh.write(stdout.decode())

        logging.info("Command log written to {}".format(log_name))

    ret = types.SimpleNamespace(
        returncode=proc.returncode, stdout=stdout, stderr=stderr)

    if ret.returncode != 0:
        logging.warning("Command returned err: {}".format(ret.stderr))
        return ret
    else:
        logging.debug("Command successful")
    return ret

flight_task(func, check=True)

Decorator for making func as a task, providing context preprocessing.

Parameters:

Name Type Description Default
func callable

Callable to wrap with context.

required
check bool

Determine whether the func is to be treated as a check or another type of action (checks can be skipped).

True

Returns:

Type Description
func

The wrapped function that can process the context provided appropriately.

Source code in model_ensembler/tasks/utils.py
def flight_task(func, check=True):
    """Decorator for making func as a task, providing context preprocessing.

    Args:
        func (callable): Callable to wrap with context.
        check (bool, optional): Determine whether the func is to be treated
            as a check or another type of action (checks can be skipped).

    Returns:
        (func): The wrapped function that can process the context provided
            appropriately.
    """

    @functools.wraps(func)
    def new_func(ctx, *args, **kwargs):
        config = Arguments()

        for k, v in kwargs.items():
            try:
                if type(v) == str and str(v).startswith("run."):
                    nom = v.split(".")[1]
                    kwargs[k] = getattr(ctx, nom)
            finally:
                logging.debug("Calling with {} = {}".format(
                    k, kwargs[k])
                )

        # FIXME: context magic for execute_command calls below,
        #  not necessarily the best manner to achieve this as it would be
        #  better handled via the decorator
        if hasattr(ctx, 'dir') and 'cwd' in inspect.signature(func).parameters:
            kwargs['cwd'] = ctx.dir

        if config.no_checks and check:
            logging.info("Skipping checks for {}".format(func.__name__))
            return True
        return func(ctx, *args, **kwargs)

    new_func.check = check
    return new_func