Skip to content

model_ensembler

Top-level package for model_ensembler.

batcher

extra_ctx = contextvars.ContextVar('extra') module-attribute

Main ensembler module with execution core code

BatchExecutor

Bases: object

Create an executor for a ensemble configuration.

The purpose of this is act as the extensible master executor for the ensemble configuration provided. It handles the event loop and should be used to contain and control the execution overall.

Source code in model_ensembler/batcher.py
class BatchExecutor(object):
    """Create an executor for a ensemble configuration.

    The purpose of this is act as the extensible master executor for the
    ensemble configuration provided. It handles the event loop and should be
    used to contain and control the execution overall.
    """

    def __init__(self, cfg, backend="slurm", extra_vars=[]):
        """Constructor.

        Args:
            cfg (object): EnsembleConfig ensemble configuration.
            backend (str): Backend to execute on,
                        should be one of {'dummy'|'slurm'}.
            extra_vars (list): Additional variables.
        """
        self._cfg = cfg

        self._init_cluster(backend)
        self._init_ctx(extra_vars)

    def _init_cluster(self, backend):
        """Initialise the cluster backend for batch execution.

        Args:
            backend (str): identifier for backend in
            `model_ensembler.cluster`.

        Raises:
            ModuleNotFoundError: If cluster backend specified is not supported.
        """
        nom = "model_ensembler.cluster.{}".format(backend)
        try:
            mod = importlib.import_module(nom)
        except ModuleNotFoundError:
            raise NotImplementedError("No {} implementation exists in "
                                      "model_ensembler.cluster!".
                                      format(backend))

        init_hpc_backend(nom)
        cluster_ctx.set(mod)

    def _init_ctx(self, extra_vars):
        """Initialise contexts.

        Initialise the root context vars for batch execution and set
        extra context vars that can be added last thing to the run.

        Args:
            extra_vars (list): Additional variables.
        """
        var_dict = run_ctx.get(dict())
        var_dict.update(self._cfg.vars)
        run_ctx.set(var_dict)

        extra_ctx.set(extra_vars)

    def run(self, loop=None):
        """Run the executor.

        This will establish the event loop, run the preprocessing actions for
        the ensemble, cycle through executing the batches and then run
        postprocessing actions. Exceptions will be caught and the event loop
        closed, currently with no specific handling.

        Args:
            loop (object): Event loop.
        """
        logging.info("Running batcher")

        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(
                run_task_items(self._cfg.pre_process))

            # Should batch be in function signature?
            for batch in self._cfg.batches:
                do_batch_execution(loop, batch, repeat=batch.repeat)
                # do_batch_execution(loop, batch) moves to
                # self.execute(loop, batch)

            loop.run_until_complete(
                run_task_items(self._cfg.post_process))

        finally:
            if loop:
                loop.run_until_complete(loop.shutdown_asyncgens())
                loop.close()

# What does this do?
    def execute(self, loop, batch):
        raise NotImplementedError("")
__init__(cfg, backend='slurm', extra_vars=[])

Constructor.

Parameters:

Name Type Description Default
cfg object

EnsembleConfig ensemble configuration.

required
backend str

Backend to execute on, should be one of {'dummy'|'slurm'}.

'slurm'
extra_vars list

Additional variables.

[]
Source code in model_ensembler/batcher.py
def __init__(self, cfg, backend="slurm", extra_vars=[]):
    """Constructor.

    Args:
        cfg (object): EnsembleConfig ensemble configuration.
        backend (str): Backend to execute on,
                    should be one of {'dummy'|'slurm'}.
        extra_vars (list): Additional variables.
    """
    self._cfg = cfg

    self._init_cluster(backend)
    self._init_ctx(extra_vars)
run(loop=None)

Run the executor.

This will establish the event loop, run the preprocessing actions for the ensemble, cycle through executing the batches and then run postprocessing actions. Exceptions will be caught and the event loop closed, currently with no specific handling.

Parameters:

Name Type Description Default
loop object

Event loop.

None
Source code in model_ensembler/batcher.py
def run(self, loop=None):
    """Run the executor.

    This will establish the event loop, run the preprocessing actions for
    the ensemble, cycle through executing the batches and then run
    postprocessing actions. Exceptions will be caught and the event loop
    closed, currently with no specific handling.

    Args:
        loop (object): Event loop.
    """
    logging.info("Running batcher")

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(
            run_task_items(self._cfg.pre_process))

        # Should batch be in function signature?
        for batch in self._cfg.batches:
            do_batch_execution(loop, batch, repeat=batch.repeat)
            # do_batch_execution(loop, batch) moves to
            # self.execute(loop, batch)

        loop.run_until_complete(
            run_task_items(self._cfg.post_process))

    finally:
        if loop:
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.close()

do_batch_execution(loop, batch, repeat=False)

Execute a batch configuration.

Parameters:

Name Type Description Default
loop object

Event loop.

required
batch object

Batch configuration.

required
repeat number

Loop n times.

False

Returns:

Type Description
str

Prints "Success" on completion.

Raises:

Type Description
ProcessingException

If there is pre or post_batch processing error.

Source code in model_ensembler/batcher.py
def do_batch_execution(loop, batch, repeat=False):
    """Execute a batch configuration.

    Args:
        loop (object): Event loop.
        batch (object): Batch configuration.
        repeat (number): Loop n times.

    Returns:
        (str): Prints "Success" on completion.

    Raises:
        ProcessingException: If there is pre or post_batch processing error.
    """

    logging.info("Start batch: {}".format(datetime.utcnow()))
    logging.debug(pformat(batch))

    args = Arguments()
    skip_indexes = args.indexes if args.indexes else list()
    batch_ctx.set(batch)

    batch_dict = {k: v
                  for k, v in batch._asdict().items() \
                  if not (k.startswith("pre_")
                          or k.startswith("post_")
                          or k in "runs")
                  and not (k in ["cluster", "email", "nodes", "ntasks", "length"]
                           and v is None)}

    run_vars = run_ctx.get()
    run_vars.update(batch_dict)
    run_ctx.set(run_vars)

    # We are process dependent here, so this is where we have the choice of
    # concurrency strategies but each batch
    # is dependent on chdir remaining consistent after this point.
    orig = os.getcwd()
    if not os.path.exists(batch.basedir):
        os.makedirs(batch.basedir, exist_ok=True)
    os.chdir(batch.basedir)

    # TODO: Gross implementation for #26 - repeat parameter, this should be
    #  abstracted away into executor implementations (BatchExecutor.execute)
    #  and made to work better
    if not repeat:
        repeat_count = 2
    else:
        logging.warning("We are due to repeat until a batch check fails us")
        repeat_count = 1000000

    for rep_i in range(1, repeat_count):
        logging.info("Running cycle {}".format(rep_i))

        batch_tasks = list()
        _batch_job_sems[batch.name] = asyncio.Semaphore(batch.maxjobs)

        try:
            loop.run_until_complete(run_task_items(batch.pre_batch))
        except ProcessingException:
            logging.error("We have received a pre_batch failure, "
                          "will stop execution")
            break

        if len(sorted(set(skip_indexes))) == len(batch.runs):
            logging.error("No longer able to run this batch, all runs are in "
                          "the indexes to skip")
            break

        for idx, run in enumerate(batch.runs):
            # Auto-generated context vars for run
            run['idx'] = idx
            run['id'] = "{}-{}".format(batch.name, run['idx'])
            run['dir'] = os.path.abspath(os.path.join(os.getcwd(), run['id']))
            run['batch_idx'] = rep_i

            if idx < args.skips:
                logging.warning("Skipping run index {} due to {} skips, run "
                                "ID: {}".format(idx, args.skips, run['id']))
                continue

            if idx in skip_indexes:
                logging.warning("Skipping run index {} due to being in "
                                "skip indexes, run ID: {}".
                                format(idx, run['id']))
                continue

            # At this point the context changes at root to property based
            ctx_dict = run_ctx.get()
            ctx_dict.update(run)
            ctx_dict.update(extra_ctx.get())

            Run = collections.namedtuple('Run', field_names=ctx_dict.keys())
            r = Run(**ctx_dict)
            task = run_batch_item(r)
            batch_tasks.append(task)

        batch_results = loop.run_until_complete(
            run_runner(batch.maxruns, batch_tasks))

        for idx, result in enumerate(batch_results):
            job, run = result
            logging.debug("Batch {} result #{} from run {}: job {}".format(
                batch.name, idx, run.idx, str(job)
            ))

            if not job and run.idx not in skip_indexes:
                logging.warning("Result #{} for run {} indicates unsuccessful "
                                "submission, adding to indexes to skip".
                                format(idx, run.idx))
                skip_indexes.append(run.idx)

        try:
            loop.run_until_complete(run_task_items(batch.post_batch))
        except ProcessingException:
            logging.error("We have received a post_batch failure, "
                          "will stop execution")
            break

    os.chdir(orig)
    logging.info("Batch {} completed: {}".
                 format(batch.name, datetime.utcnow()))
    # TODO: return batch windows/info
    return "Success"

run_batch_item(run) async

Execute a run configuration.

Parameters:

Name Type Description Default
run object

Specific run configuration.

required

Returns:

Name Type Description
job_id int

Job id number.

run object

Specific run configuration.

Raises:

Type Description
TemplatingError

If a job cannot be templated.

ProcessingException

If an individual run failure is caught.

Source code in model_ensembler/batcher.py
async def run_batch_item(run):
    """Execute a run configuration.

    Args:
        run (object): Specific run configuration.

    Returns:
        job_id (int): Job id number.
        run (object): Specific run configuration.

    Raises:
        TemplatingError: If a job cannot be templated.
        ProcessingException: If an individual run failure is caught.
    """

    # TODO: my understanding is that all context from here through to end
    #  methods/tasks will now be under run_context in do_batch_execution
    batch = batch_ctx.get()
    run_ctx.set(run)
    cluster = cluster_ctx.get()

    logging.info("Start run {} at {}".format(run.id, datetime.utcnow()))
    logging.debug(pformat(run))

    args = Arguments()
    job_id = None

    try:
        await prepare_run_directory(batch, run)
        process_templates(run, batch.templates)
    except TemplatingError as e:
        # We catch gracefully and just prevent the run from happening
        logging.error("We cannot template the job {}: {}".format(run.id, e))
        return job_id, run

    # It's very tempting to move pre_run, but don't: we DO NOT execute until
    # job is templated. Instead I've created the ability to run tasks prior
    # to preparation/templating of the job for scenarios where you don't want
    # the templating to error out/job to even be prepared
    try:
        await run_task_items(batch.pre_run)

        if args.no_submission:
            logging.info("Skipping actual slurm submission based on arguments")
        else:
            async with _batch_job_sems[batch.name]:
                func = getattr(model_ensembler.tasks, "jobs")
                check = collections.namedtuple("check", ["args"])

                await run_check(func, check({
                    "limit": batch.maxjobs,
                    "match": batch.name,
                }))

                job_id = await cluster.submit_job(run, script=batch.job_file)

                if not job_id:
                    logging.exception(
                        "{} could not be submitted, we won't continue".format(
                            batch.name
                        ))
                else:
                    running = False
                    state = None

                    while not running:
                        try:
                            job = await cluster.find_id(job_id)
                            state = job.state
                        except (IndexError, ValueError) as e:
                            logging.warning("Job {} not registered yet, "
                                            "or error encountered".
                                            format(job_id))
                            logging.exception(e)

                        if state and (
                                state in cluster.START_STATES or
                                state in cluster.FINISH_STATES):
                            running = True
                        else:
                            await asyncio.sleep(args.submit_timeout)

                    while True:
                        try:
                            job = await cluster.find_id(job_id)
                            state = job.state
                        except (IndexError, ValueError):
                            logging.exception("Job status for run {} retrieval"
                                              " whilst slurm running, waiting "
                                              "and retrying".
                                              format(run.id))
                            await asyncio.sleep(args.error_timeout)
                            continue

                        logging.debug("{} monitor got state {} for job {}".
                                      format(run.id, state, job_id))

                        if state in cluster.FINISH_STATES:
                            logging.info("{} monitor got state {} for job {}".
                                         format(run.id, state, job_id))
                            break
                        else:
                            await asyncio.sleep(args.running_timeout)

        await run_task_items(batch.post_run)
    except ProcessingException:
        logging.error("Run failure caught, abandoning {} but not the "
                      "batch".format(run.id))

    logging.info("End run {} at {}".format(run.id, datetime.utcnow()))
    return job_id, run

cli

check()

CLI native sanity checking Contains pre-set sanity check configuration, combines them with the user's CLI arguments in a list (e.g. dummy/slurm), which is passed to main().

Allow checking of successful installation.

Source code in model_ensembler/cli.py
def check():
    """CLI native sanity checking
    Contains pre-set sanity check configuration, combines them with
    the user's CLI arguments in a list (e.g. dummy/slurm), which is passed to main().

    Allow checking of successful installation.
    """
    # Get the user CLI args
    user_args = sys.argv[1:]

    # Directly pass sanity check yml + user args to
    # the argument parser as args_list
    args_list = ["examples/sanity-check.yml"] + user_args
    args = parse_args(args_list)

    main(args)

main(args=None)

CLI entry point.

Source code in model_ensembler/cli.py
def main(args=None):
    """CLI entry point.
    """
    if args is None:
        args = parse_args()

    if args.daemon:
        background_fork(True)

    setup_logging("{}".format(os.path.basename(args.configuration)),
                  verbose=args.verbose)

    logging.info("Model Ensemble Runner")

    config = EnsembleConfig(args.configuration)
    # TODO: get_batch_executor
    BatchExecutor(config,
                  args.backend,
                  dict(args.extra)).run()

parse_args(args_list=None)

Parse command line parameters.

Returns:

Type Description
object

Arguments(), immutable instance from .utils.

Source code in model_ensembler/cli.py
def parse_args(args_list=None):
    """Parse command line parameters.

    Returns:
        (object): Arguments(), immutable instance from ``.utils``.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--daemon",
                        help="Daemonise the ensembler", default=False,
                        action="store_true")

    # TODO: Need to validate the argument selections/group certain commands
    parser.add_argument("-v", "--verbose",
                        help="Log verbosely", default=False,
                        action="store_true")
    parser.add_argument("-c", "--no-checks",
                        help="Do not run check commands", default=False,
                        action="store_true")
    parser.add_argument("-s", "--no-submission",
                        help="Do not try to submit job, just log the step",
                        default=False, action="store_true")
    parser.add_argument("-p", "--pickup",
                        help="Continue a previous set of runs by picking up "
                        "existing directories rather, than assuming to create "
                        "them; for example if ensemble has previously failed",
                        default=False, action="store_true")
    parser.add_argument("-l", "--shell",
                        help="Allows the user to specify the shell passed to "
                             "subprocess execs.",
                        default="/bin/bash", type=str)

    # FIXME: These should not be applied in multi-batch ensembles
    parser.add_argument("-k", "--skips",
                        help="Number of run entries to skip", default=0,
                        type=int)

    parser.add_argument("-i", "--indexes",
                        help="Specify which indexes to run",
                        type=parse_indexes)

    parser.add_argument("-ct", "--check-timeout", default=30, type=int)
    parser.add_argument("-st", "--submit-timeout", default=20, type=int)
    parser.add_argument("-rt", "--running-timeout", default=60, type=int)
    parser.add_argument("-et", "--error-timeout", default=120, type=int)

    parser.add_argument("-ms", "--max-stagger", default=1, type=int)

    parser.add_argument("-x", "--extra-vars", dest="extra", nargs="*",
                        default=[], type=parse_extra_vars)

    parser.add_argument("configuration")
    parser.add_argument("backend", default="slurm", choices=("slurm", "dummy"),
                        nargs="?")

    # Required to allow passing pre-set config to be 
    # passed as first positional argument
    if args_list is None:
        parsed_args = parser.parse_args()
    else:
        parsed_args = parser.parse_args(args_list)

    # Prefer retaining immutable Arguments()
    # by not using the instance as a namespace
    return Arguments(**vars(parsed_args))

parse_extra_vars(arg)

Method for processing extra var arguments.

Parameters:

Name Type Description Default
arg tuple

Collection of extra var arguments.

required

Returns:

Type Description
tuple

Name and value for the argument to be overridden.

Raises: argparse.ArgumentTypeError: If arguments do not match.

Source code in model_ensembler/cli.py
def parse_extra_vars(arg):
    """ Method for processing extra var arguments.

    Args:
        arg (tuple): Collection of extra var arguments.

    Returns:
        (tuple): Name and value for the argument to be overridden.
    Raises:
        argparse.ArgumentTypeError: If arguments do not match.
    """

    arg_match = re.match(r'^([^=]+)=(.+)$', arg)
    if arg_match:
        return arg_match.groups()
    raise argparse.ArgumentTypeError("Argument does not match "
                                     "name=value format: {}".format(arg))

parse_indexes(argv)

Method for ensuring a CSV string of integers.

Parameters:

Name Type Description Default
argv list

expecting delimited integer list.

required

Returns:

Type Description
list

Matched integer values.

Raises:

Type Description
ArgumentTypeError

If argv is not CSV delimited integer list.

Source code in model_ensembler/cli.py
def parse_indexes(argv):
    """ Method for ensuring a CSV string of integers.

    Args:
        argv (list): expecting delimited integer list.

    Returns:
        (list): Matched integer values.

    Raises:
        argparse.ArgumentTypeError: If argv is not CSV delimited integer list.
    """
    if re.match(r'^([0-9]+,)*[0-9]+$', argv):
        return [int(v) for v in argv.split(",")]
    raise argparse.ArgumentTypeError("{} is not a CSV delimited integer list "
                                     "of indexes".format(argv))

cluster

dummy

current_jobs(ctx, match) async

Dummy method to find current jobs.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
match str

Jobs to match the job list with.

required

Returns:

Type Description
list

Current jobs.

Source code in model_ensembler/cluster/dummy.py
async def current_jobs(ctx, match):
    """Dummy method to find current jobs.

    Args:
        ctx (object): Context object for retrieving configuration.
        match (str): Jobs to match the job list with.

    Returns:
        (list): Current jobs.
    """
    global _jobs

    job_arr = [el for el in _jobs.values()
               if el.name.startswith(match)
               and el.state in START_STATES]

    return job_arr
find_id(job_id) async

Dummy method to find local job id.

Parameters:

Name Type Description Default
job_id int

Local job identifier.

required

Returns:

Type Description
int

job id.

Raises:

Type Description
LookupError

If job id not found.

Source code in model_ensembler/cluster/dummy.py
async def find_id(job_id):
    """Dummy method to find local job id.

    Args:
        job_id (int): Local job identifier.

    Returns:
        (int): job id.

    Raises:
        LookupError: If job id not found.
    """
    global _jobs

    job = None
    job_arr = [el for el in _jobs.values() if el.name == job_id]

    if len(job_arr) == 1:
        job = job_arr[0]
    elif len(job_arr) > 1:
        raise LookupError("{} jobs found for ID {}".format(len(job_arr),
                                                           job_id))
    return job
submit_job(ctx, script=None) async

Dummy method to submit job locally.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
script str

Script name to submit.

None

Returns:

Type Description
int

Job ID.

Source code in model_ensembler/cluster/dummy.py
async def submit_job(ctx, script=None):
    """Dummy method to submit job locally.

    Args:
        ctx (object): Context object for retrieving configuration.
        script (str): Script name to submit.

    Returns:
        (int): Job ID.
    """
    # TODO: ugh, we could use contextvars for this
    global _jobs
    args = Arguments()

    max_submit_sleep = args.max_stagger
    sleep_for = random.randint(0, max_submit_sleep)
    logging.debug(
        "Sleeping for {} seconds before submission".format(sleep_for))
    await asyncio.sleep(sleep_for)

    with _dict_lock:
        _jobs[ctx.dir] = Job(ctx.id, "SUBMITTED", False, False)

    threading.Thread(target=threaded_job, args=(ctx.dir, script)).start()
    return ctx.id
threaded_job(run_dir, script)

Dummy method to set off local job

Parameters:

Name Type Description Default
run_dir str

Directory script is running in.

required
script str

Name of script to run.

required
Source code in model_ensembler/cluster/dummy.py
def threaded_job(run_dir, script):
    """Dummy method to set off local job

    Args:
        run_dir (str): Directory script is running in.
        script (str): Name of script to run.
    """
    global _jobs

    with _dict_lock:
        _jobs[run_dir] = Job(_jobs[run_dir].name,
                             "RUNNING",
                             True,
                             False)

    logging.info("DUMMY RUN: {} - {}".format(run_dir, _jobs[run_dir]))
    subprocess.run("./{}".format(script), cwd=run_dir)

    # TODO: failed
    with _dict_lock:
        _jobs[run_dir] = Job(_jobs[run_dir].name,
                             "COMPLETED",
                             True,
                             True)

slurm

current_jobs(ctx, match) async

Method to get list of current jobs

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
match str

Jobs to match the job list with.

required

Returns:

Type Description
list

Filtered jobs.

Source code in model_ensembler/cluster/slurm.py
async def current_jobs(ctx, match):
    """Method to get list of current jobs

    Args:
        ctx (object): Context object for retrieving configuration.
        match (str): Jobs to match the job list with.

    Returns:
        (list): Filtered jobs.
    """
    filtered_jobs = None

    # Ensure we account for empty lists
    while not filtered_jobs and filtered_jobs is None:
        try:
            res = await execute_command("squeue -o \"%j,%T\" -h -p {}".
                                        format(ctx.cluster),
                                        cwd=ctx.dir)
            output = res.stdout.decode()
        except Exception as e:
            logging.warning("Could not retrieve list: {}".format(e))
        else:
            jobs = []
            for line in output.split():
                fields = line.strip().split(",")
                jobs.append({"name": fields[0], "job_state": fields[1]})

            filtered_jobs = [{"name": j['name'], "state": j["job_state"]}
                             for j in jobs
                             if j['name'].startswith(match)
                             and j['job_state'] in START_STATES]

            logging.debug("SLURM JOBS result: {}".
                          format(pformat(filtered_jobs)))

    return filtered_jobs
find_id(job_id) async

Method to find SLURM job by ID.

This method provides an interface to the squeue SLURM queue utility to identify a job and return it along with it's state.

Parameters:

Name Type Description Default
job_id int

SLURM job identifier.

required

Returns:

Name Type Description
jobs list

Job objects including name and state.

Raises:

Type Description
ValueError

If cannot retrieve job from list.

Source code in model_ensembler/cluster/slurm.py
async def find_id(job_id):
    """Method to find SLURM job by ID.

    This method provides an interface to the squeue SLURM queue utility to
    identify a job and return it along with it's state.

    Args:
        job_id (int): SLURM job identifier.

    Returns:
        jobs (list): Job objects including name and state.

    Raises:
        ValueError: If cannot retrieve job from list.
    """
    job = None
    args = Arguments()

    while not job:
        try:
            res = await execute_command("sacct -XnP -j {} "
                                        "-o jobname,state,start,end".
                                        format(job_id))
            output = res.stdout.decode().strip()
            (name, state, started, finished) = output.split("|")
        except ValueError:
            logging.debug("Could not retrieve job from list")
            await asyncio.sleep(args.check_timeout)
        else:
            job = Job(
                name=name,
                state=state.split()[0],
                started=started == "Unknown",
                finished=finished == "Unknown"
            )

    logging.debug("SLURM find result name: {}".format(job.name))
    return job
submit_job(ctx, script=None) async

Method to submit jobs to SLURM.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
script str

Script name to submit.

None

Returns:

Type Description
int

Job ID.

Source code in model_ensembler/cluster/slurm.py
async def submit_job(ctx, script=None):
    """Method to submit jobs to SLURM.

    Args:
        ctx (object): Context object for retrieving configuration.
        script (str): Script name to submit.

    Returns:
        (int): Job ID.
    """
    r_sbatch_id = re.compile(r'Submitted batch job (\d+)$')
    args = Arguments()

    # Don't smash the scheduler immediately, it appears to have the potential
    # to cause problems.
    sleep_for = random.randint(0, args.max_stagger)
    logging.debug(
        "Sleeping for {} seconds before submission".format(sleep_for))
    await asyncio.sleep(sleep_for)
    res = await execute_command("sbatch --no-requeue {}".format(script),
                                cwd=ctx.dir)
    output = res.stdout.decode()

    sbatch_match = r_sbatch_id.match(output)
    if sbatch_match:
        job_id = sbatch_match.group(1)
        logging.info("Submitted job with ID {}".format(job_id))

        return int(job_id)
    return None

config

Batch

Bases: BatchSpec, TaskArrayMixin

Class to represent the entirety of the ensembler configuration.

Parameters:

Name Type Description Default
pre_batch object

TaskArray configuration setup.

None
pre_run object

TaskArray configuration setup.

None
post_run object

TaskArray configuration setup.

None
post_batch object

TaskArray configuration setup.

None
**kwargs tuple

Keyword arguments for BatchSpec instance.

{}
Source code in model_ensembler/config.py
class Batch(BatchSpec, TaskArrayMixin):
    """Class to represent the entirety of the ensembler configuration.

    Args:
        pre_batch (object): TaskArray configuration setup.
        pre_run (object): TaskArray configuration setup.
        post_run (object): TaskArray configuration setup.
        post_batch (object): TaskArray configuration setup.
        **kwargs (tuple): Keyword arguments for BatchSpec instance.
    """
    def __init__(self,
                 *args,
                 pre_batch=None, pre_run=None, post_run=None, post_batch=None,
                 **kwargs):
        super().__init__()
        self._pre_batch = pre_batch
        self._pre_run = pre_run
        self._post_run = post_run
        self._post_batch = post_batch

    @property
    def pre_batch(self):
        """ Property decorator managing pre batch tasks.

        Returns:
            (list): Pre batch tasks.
        """
        return self.task_array("_pre_batch")

    @property
    def pre_run(self):
        """ Property decorator managing pre run tasks.

        Returns:
            (list): Pre run tasks.
        """
        return self.task_array("_pre_run")

    @property
    def post_run(self):
        """ Property decorator managing post run tasks.

        Returns:
            (list): Post run tasks.
        """
        return self.task_array("_post_run")

    @property
    def post_batch(self):
        """ Property decorator managing post batch tasks.

        Returns:
            (list): Post batch tasks.
        """
        return self.task_array("_post_batch")
post_batch property

Property decorator managing post batch tasks.

Returns:

Type Description
list

Post batch tasks.

post_run property

Property decorator managing post run tasks.

Returns:

Type Description
list

Post run tasks.

pre_batch property

Property decorator managing pre batch tasks.

Returns:

Type Description
list

Pre batch tasks.

pre_run property

Property decorator managing pre run tasks.

Returns:

Type Description
list

Pre run tasks.

EnsembleConfig

Bases: YAMLConfig, TaskArrayMixin

Class to represent the entirety of the ensembler configuration.

Parameters:

Name Type Description Default
*args tuple

See YAMLConfig.

()
**kwargs tuple

arbitrary keyword arguments.

{}
Source code in model_ensembler/config.py
class EnsembleConfig(YAMLConfig, TaskArrayMixin):
    """Class to represent the entirety of the ensembler configuration.

    Args:
        *args (tuple): See ``YAMLConfig``.
        **kwargs (tuple): arbitrary keyword arguments.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._vars = self._data['ensemble']['vars']
        self._pre_process = self._data['ensemble']['pre_process']
        self._post_process = self._data['ensemble']['post_process']
        self._batches = self._data['ensemble']['batches']

    @property
    def pre_process(self):
        """ Property decorator managing preprocessing attributes.

        Returns:
            (list): Preprocessing Tasks.
        """
        return self.task_array("_pre_process")

    @property
    def post_process(self):
        """ Property decorator managing postprocessing attributes.

        Returns:
            (list): Postprocessing Tasks.
        """
        return self.task_array("_post_process")

    @property
    def batches(self):
        """ Property decorator managing batches contained in ensemble config.

        Returns:
            (list): Batches contained in the ensemble configuration.
        """
        batches = list()
        for batch in self._batches:
            batches.append(Batch(**batch))
        return batches

    @property
    def vars(self):
        """ Property decorator managing vars from the ensemble configuration.

        Returns:
            (dict): vars from ensemble configuration.
        """
        return self._vars
batches property

Property decorator managing batches contained in ensemble config.

Returns:

Type Description
list

Batches contained in the ensemble configuration.

post_process property

Property decorator managing postprocessing attributes.

Returns:

Type Description
list

Postprocessing Tasks.

pre_process property

Property decorator managing preprocessing attributes.

Returns:

Type Description
list

Preprocessing Tasks.

vars property

Property decorator managing vars from the ensemble configuration.

Returns:

Type Description
dict

vars from ensemble configuration.

Task

Bases: TaskSpec

Task definition class derived from the TaskSpec namedtuple

Source code in model_ensembler/config.py
class Task(TaskSpec):
    """Task definition class derived from the TaskSpec namedtuple"""
    pass

TaskArrayMixin

Generates sets of Task objects from Batch object members

Source code in model_ensembler/config.py
class TaskArrayMixin:
    """Generates sets of Task objects from Batch object members"""

    def task_array(self, attr):
        """Yields Tasks from a configuration instance.

        Args:
            attr (str): Name of the member property in the configuration that
                        defines the list of Task objects.

        Yields:
            (object): Task object
        """
        field = getattr(self, attr)
        if field:
            for raw_task in field:
                yield Task(**raw_task)
        return None
task_array(attr)

Yields Tasks from a configuration instance.

Parameters:

Name Type Description Default
attr str

Name of the member property in the configuration that defines the list of Task objects.

required

Yields:

Type Description
object

Task object

Source code in model_ensembler/config.py
def task_array(self, attr):
    """Yields Tasks from a configuration instance.

    Args:
        attr (str): Name of the member property in the configuration that
                    defines the list of Task objects.

    Yields:
        (object): Task object
    """
    field = getattr(self, attr)
    if field:
        for raw_task in field:
            yield Task(**raw_task)
    return None

YAMLConfig

Configuration processor for model-ensemble YAML-based configurations.

Parameters:

Name Type Description Default
configuration str

Name of the YAML configuration to load.

required
Source code in model_ensembler/config.py
class YAMLConfig():
    """Configuration processor for model-ensemble YAML-based configurations.

    Args:
        configuration (str): Name of the YAML configuration to load.
    """

    def __init__(self, configuration):
        self._schema = os.path.join(path, "model-ensemble.json")
        self._configuration_file = configuration

        self._schema_data, self._data = \
            self.__class__.validate(self._schema, self._configuration_file)

    @staticmethod
    def validate(json_schema, yaml_file):
        """Validate a YAML configuration against a JSON schema.

        Args:
            json_schema (str): Name of schema to validate against.
            yaml_file (str): Name of the configuration to validate.

        Returns:
            (tuple): contains JSON schema, YAML data.

        Raises:
            RuntimeError: If "name" and "basedir" are specified in
                        "batch_config:" instead of "batches:".
        """
        logging.debug("Assessing {} against {}".format(
            json_schema, yaml_file
        ))

        with open(yaml_file, "r") as fh:
            yaml_data = load(fh, Loader=Loader)

        # FIXME: this is a cheat for extreme batch numbers by allowing common
        #  parameters
        if "batch_config" in yaml_data["ensemble"]:
            batch_config = yaml_data["ensemble"]["batch_config"]

            for batch in yaml_data["ensemble"]["batches"]:
                for k, v in batch_config.items():
                    if k in ["name", "basedir"]:
                        raise RuntimeError("'name' and 'basedir' should be defined "
                                           "for each batch, rather than all batches."
                                           "Please move these from 'batch_config:'"
                                           "to 'batch:'.")
                    if k not in batch:
                        batch[k] = v

        with open(json_schema, "r") as fh:
            json_data = json.load(fh)

        try:
            jsonschema.validate(instance=yaml_data, schema=json_data)
        except jsonschema.ValidationError as e:
            logging.error("There's an error with configuration file: {}".
                          format(yaml_file))
            raise e
        logging.info("Validated configuration file {} successfully".
                     format(yaml_file))
        return json_data, yaml_data
validate(json_schema, yaml_file) staticmethod

Validate a YAML configuration against a JSON schema.

Parameters:

Name Type Description Default
json_schema str

Name of schema to validate against.

required
yaml_file str

Name of the configuration to validate.

required

Returns:

Type Description
tuple

contains JSON schema, YAML data.

Raises:

Type Description
RuntimeError

If "name" and "basedir" are specified in "batch_config:" instead of "batches:".

Source code in model_ensembler/config.py
@staticmethod
def validate(json_schema, yaml_file):
    """Validate a YAML configuration against a JSON schema.

    Args:
        json_schema (str): Name of schema to validate against.
        yaml_file (str): Name of the configuration to validate.

    Returns:
        (tuple): contains JSON schema, YAML data.

    Raises:
        RuntimeError: If "name" and "basedir" are specified in
                    "batch_config:" instead of "batches:".
    """
    logging.debug("Assessing {} against {}".format(
        json_schema, yaml_file
    ))

    with open(yaml_file, "r") as fh:
        yaml_data = load(fh, Loader=Loader)

    # FIXME: this is a cheat for extreme batch numbers by allowing common
    #  parameters
    if "batch_config" in yaml_data["ensemble"]:
        batch_config = yaml_data["ensemble"]["batch_config"]

        for batch in yaml_data["ensemble"]["batches"]:
            for k, v in batch_config.items():
                if k in ["name", "basedir"]:
                    raise RuntimeError("'name' and 'basedir' should be defined "
                                       "for each batch, rather than all batches."
                                       "Please move these from 'batch_config:'"
                                       "to 'batch:'.")
                if k not in batch:
                    batch[k] = v

    with open(json_schema, "r") as fh:
        json_data = json.load(fh)

    try:
        jsonschema.validate(instance=yaml_data, schema=json_data)
    except jsonschema.ValidationError as e:
        logging.error("There's an error with configuration file: {}".
                      format(yaml_file))
        raise e
    logging.info("Validated configuration file {} successfully".
                 format(yaml_file))
    return json_data, yaml_data

exceptions

TemplatingError

Bases: RuntimeError

For templating issues

Source code in model_ensembler/exceptions.py
3
4
5
class TemplatingError(RuntimeError):
    """For templating issues"""
    pass

runners

run_check(func, check) async

Run a check configuration.

Parameters:

Name Type Description Default
func callable

Async check method.

required
check dict

Check configuration.

required

Raises:

Type Description
CheckException

Any exception from the called check.

Source code in model_ensembler/runners.py
async def run_check(func, check):
    """Run a check configuration.

    Args:
        func (callable): Async check method.
        check (dict): Check configuration.

    Raises:
        CheckException: Any exception from the called check.
    """
    result = False
    args = Arguments()

    while not result:
        try:
            logging.debug("PRE CHECK")
            ctx = model_ensembler.batcher.run_ctx.get()
            result = await func(ctx, **check.args)
            logging.debug("POST CHECK")
        except Exception as e:
            logging.exception(e)
            raise CheckException("Issues with flight checks, abandoning")

        if not result:
            logging.debug("Cannot continue, waiting {} seconds for next check".
                          format(args.check_timeout))
            await asyncio.sleep(args.check_timeout)

run_runner(limit, tasks) async

Runs a list of tasks asynchronously.

Given a particular limit, establish a semaphore and run up to limit tasks. Once the list of tasks is complete, return.

Parameters:

Name Type Description Default
limit int

Context object for retrieving configuration.

required
tasks list

Tasks and checks.

required

Returns:

Type Description
list

Completed tasks.

Source code in model_ensembler/runners.py
async def run_runner(limit, tasks):
    """Runs a list of tasks asynchronously.

    Given a particular limit, establish a semaphore and run up to limit tasks.
    Once the list of tasks is complete, return.

    Args:
        limit (int): Context object for retrieving configuration.
        tasks (list): Tasks and checks.

    Returns:
        (list): Completed tasks.
    """

    # TODO: return run task windows/info
    sem = asyncio.Semaphore(limit)

    async def sem_task(task):
        async with sem:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))

run_task(func, task) async

Run a task configuration.

Parameters:

Name Type Description Default
run_ctx object

Context object for retrieving configuration.

required
task dict

Task configuration.

required

Raises:

Type Description
TaskException

Any exception from the called task.

Returns:

Type Description
bool

True if task runs without exception.

Source code in model_ensembler/runners.py
async def run_task(func, task):
    """Run a task configuration.

    Args:
        run_ctx (object): Context object for retrieving configuration.
        task (dict): Task configuration.

    Raises:
        TaskException: Any exception from the called task.

    Returns:
        (bool): True if task runs without exception.
    """
    try:
        args = dict() if not task.args else task.args
        ctx = model_ensembler.batcher.run_ctx.get()
        await func(ctx, **args)
    except Exception as e:
        logging.exception(e)
        raise TaskException("Issues with flight checks, abandoning")
    return True

run_task_items(items) async

Run a set of task and checks.

Run the list of tasks and check items, the configuration references the model_ensemble.tasks method to use and the context/configuration provides the arguments. TaskException and CheckException are trapped and rethrown as ProcessingException.

Parameters:

Name Type Description Default
items list

Tasks and checks.

required

Raises:

Type Description
ProcessingException

A common exception thrown for failures in the individual tasks.

Source code in model_ensembler/runners.py
async def run_task_items(items):
    """Run a set of task and checks.

    Run the list of tasks and check items, the configuration references the
    ``model_ensemble.tasks`` method to use and the context/configuration
    provides the arguments. TaskException and CheckException are trapped and
    rethrown as ProcessingException.

    Args:
        items (list): Tasks and checks.

    Raises:
        ProcessingException: A common exception thrown for failures in the
                            individual tasks.
    """
    try:
        ctx = model_ensembler.batcher.run_ctx.get()

        for item in items:
            func = getattr(model_ensembler.tasks, item.name)

            logging.debug("TASK CWD: {}".format(os.getcwd()))
            logging.debug("TASK CTX: {}".format(pformat(ctx)))
            logging.debug("TASK FUNC: {}".format(pformat(item)))

            if func.check:
                await run_check(func, item)
            else:
                await run_task(func, item)
    except (TaskException, CheckException) as e:
        raise ProcessingException(e)

tasks

CheckException

Bases: EnsembleException

For check failures

Source code in model_ensembler/tasks/exceptions.py
class CheckException(EnsembleException):
    """For check failures"""
    pass

ProcessingException

Bases: EnsembleException

For general processing failures

Source code in model_ensembler/tasks/exceptions.py
class ProcessingException(EnsembleException):
    """For general processing failures"""
    pass

TaskException

Bases: EnsembleException

For task processing failures

Source code in model_ensembler/tasks/exceptions.py
class TaskException(EnsembleException):
    """For task processing failures"""
    pass

check(ctx, cmd, cwd=None, log=False, fail=False, shell=None) async

Check: Call arbitrary command as a check.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
fail bool

If true, then the check returning nonzero will raise an error rather than return false, meaning run abandonment rather than recheck will take place.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
FailureNotToleratedError

Error when fail is true and the check returns a nonzero code.

Source code in model_ensembler/tasks/sys.py
@check_task
async def check(ctx, cmd, cwd=None, log=False, fail=False, shell=None):
    """Check: Call arbitrary command as a check.

    Args:
        ctx (object): Contextual configuration.
        cmd (str): See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        fail (bool, optional): If true, then the check returning nonzero will
            raise an error rather than return false, meaning run abandonment
            rather than recheck will take place.
        shell (str, optional):  See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        FailureNotToleratedError: Error when fail is true and the check
            returns a nonzero code.
    """

    logging.info("Running check: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    logging.debug("Check return code {}".format(res.returncode))

    if res.returncode == 0:
        return True

    if fail:
        raise FailureNotToleratedError("Check failed and it is not tolerable")
    return False

execute(ctx, cmd, cwd=None, log=False, shell=None) async

Process: Call arbitrary command as a processing task.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def execute(ctx, cmd, cwd=None, log=False, shell=None):
    """Process: Call arbitrary command as a processing task.

    Args:
        ctx (object): Contextual configuration.
        cmd (str):  See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        shell (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.
    """
    logging.info("Running command: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    if res.returncode == 0:
        return True
    return False

jobs(ctx, limit, match) async

Check: Assert whether number of jobs in SLURM is under limit.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
limit int

Number of jobs to check for.

required
match str

Prefix to match jobs by.

required

Returns:

Type Description
bool

True if number of jobs is less than limit, otherwise false.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def jobs(ctx, limit, match):
    """Check: Assert whether number of jobs in SLURM is under limit.

    Args:
        ctx (object): Contextual configuration.
        limit (int): Number of jobs to check for.
        match (str): Prefix to match jobs by.

    Returns:
        (bool): True if number of jobs is less than limit, otherwise false.
    """

    # TODO: match with regex
    async with cluster.job_lock:
        job_list = await cluster.current_jobs(ctx, match)
        res = len(job_list) < int(limit)

        logging.debug("Jobs in action {} with limit {}".format(
            len(job_list), limit))

    return res

move(ctx, dest, include=None, exclude=None, cwd=None) async

Process: rsync current working directory contents.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
dest str

Path to copy ctx.id named directory to.

required
include List[str]

rsync include specifiers.

None
exclude List[str]

rsync exclude specifiers, defaults to "*" when calling rsync if include specifiers are given and no exclude specifiers are provided.

None
cwd str

See utils.execute_command.

None

Returns:

Type Description
bool

true if return code is zero, false otherwise.

Raises:

Type Description
RuntimeError

If did not provide necessary context attribute for using this processing task.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def move(ctx, dest, include=None, exclude=None, cwd=None):
    """Process: rsync current working directory contents.

    Args:
        ctx (object): Contextual configuration.
        dest (str): Path to copy ctx.id named directory to.
        include (List[str], optional): rsync include specifiers.
        exclude (List[str], optional): rsync exclude specifiers, defaults to
            "*" when calling rsync if include specifiers are given and no
            exclude specifiers are provided.
        cwd (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): true if return code is zero, false otherwise.

    Raises:
        RuntimeError: If did not provide necessary context attribute for using
            this processing task.
    """

    if not hasattr(ctx, "id"):
        raise RuntimeError("No ID available for move")

    # TODO: Type checking
    include = [] if not include else include
    exclude = ["*"] if not exclude and include else []
    dest = os.path.join(dest, ctx.id)

    liststr = " ".join(["--include=\"{}\"".
                       format(i.strip("\"")) for i in include]) + " " + \
        " ".join(["--exclude=\"{}\"".
                 format(e.strip("\"")) for e in exclude])

    cmd = "rsync -aXE {} ./ {}/".format(liststr, dest)
    logging.info(cmd)

    res = await execute_command(cmd, cwd)
    if res.returncode == 0:
        return True
    return False

quota(ctx, atleast, mnt=None) async

Check: Make sure quota is sufficient.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
atleast int

Amount in kB.

required
mnt str

Path for mount to check quota on if explicitly required.

None

Returns:

Type Description
bool

True if available space is less than atleast, false otherwise.

Raises:

Type Description
IndexError / TypeError

If quota cannot be determined.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def quota(ctx, atleast, mnt=None):
    """Check: Make sure quota is sufficient.

    Args:
        ctx (object): Contextual configuration.
        atleast (int): Amount in kB.
        mnt (str, optional): Path for mount to check quota on if explicitly
                            required.

    Returns:
        (bool): True if available space is less than atleast, false otherwise.

    Raises:
        IndexError/TypeError: If quota cannot be determined.
    """
    # Command responds in 1k blocks
    path_arg = " -f " + mnt if mnt else ""
    quota_cmd = "quota -uw" + path_arg
    res = await execute_command(quota_cmd)
    quota_out = res.stdout.decode()

    try:
        fields = quota_out.splitlines()[-1].split()
        usage = int(fields[1])
        limit = int(fields[2])
        atleast = int(atleast)
    except (IndexError, TypeError):
        logging.exception("Could not reliably determine quota information")
        return False

    res = (limit - usage) >= atleast

    if not res:
        logging.warning("Quota remaining {} is less than {}".
                        format(limit - usage, atleast))
    return res

remove(ctx, directory=None) async

Process: Remove directory using shutil.rmtree.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
directory str

Specify the directory to remove, otherwise this will use the path specified by ctx.dir.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
OSError

If directory cannot be removed.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def remove(ctx, directory=None):
    """Process: Remove directory using shutil.rmtree.

    Args:
        ctx (object): Contextual configuration.
        directory (str, optional): Specify the directory to remove, otherwise
            this will use the path specified by ctx.dir.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        OSError: If directory cannot be removed.
    """
    if not directory:
        directory = ctx.dir

    logging.info("Attempting to remove data on {}".format(directory))

    try:
        shutil.rmtree(directory)
    except OSError as e:
        logging.exception("Could not remove {}: {}".
                          format(directory, e.strerror))
        return False
    return True

submit(ctx, script=None) async

Process: Submit a new job to SLURM.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
script str

Slurm submission script for sbatch.

None

Returns:

Type Description
int

Job identifier.

Source code in model_ensembler/tasks/hpc.py
@processing_task
async def submit(ctx, script=None):
    """Process: Submit a new job to SLURM.

    Args:
        ctx (object): Contextual configuration.
        script (str, optional): Slurm submission script for sbatch.

    Returns:
        (int): Job identifier.
    """

    # TODO: check this as an optional argument avoids run submission
    #  as intended
    if script:
        async with cluster.job_lock:
            await cluster.submit_job(ctx, script)

    return None

exceptions

Task Exceptions

Exceptions relating to tasks

CheckException

Bases: EnsembleException

For check failures

Source code in model_ensembler/tasks/exceptions.py
class CheckException(EnsembleException):
    """For check failures"""
    pass
EnsembleException

Bases: Exception

Common ensemble exception for common implementation

Source code in model_ensembler/tasks/exceptions.py
7
8
9
class EnsembleException(Exception):
    """Common ensemble exception for common implementation"""
    pass
FailureNotToleratedError

Bases: RuntimeError

Check failure error cannot be tolerated

Source code in model_ensembler/tasks/exceptions.py
class FailureNotToleratedError(RuntimeError):
    """Check failure error cannot be tolerated"""
    pass
ProcessingException

Bases: EnsembleException

For general processing failures

Source code in model_ensembler/tasks/exceptions.py
class ProcessingException(EnsembleException):
    """For general processing failures"""
    pass
TaskException

Bases: EnsembleException

For task processing failures

Source code in model_ensembler/tasks/exceptions.py
class TaskException(EnsembleException):
    """For task processing failures"""
    pass

hpc

init_hpc_backend(backend)

Initialise cluster backend to use.

Parameters:

Name Type Description Default
backend str

Cluster backed to use.

required

Raises:

Type Description
ImportError / ModuleNotFoundError

If cluster backend cannot be imported.

Source code in model_ensembler/tasks/hpc.py
def init_hpc_backend(backend):
    """ Initialise cluster backend to use.

    Args:
        backend (str): Cluster backed to use.

    Raises:
        ImportError/ModuleNotFoundError: If cluster backend cannot be imported.
    """
    global cluster

    logging.info("Importing {}".format(backend))
    try:
        cluster = importlib.import_module(backend)
    except (ImportError, ModuleNotFoundError) as e:
        logging.exception("Couldn't dynamically import cluster backend")
        raise e
jobs(ctx, limit, match) async

Check: Assert whether number of jobs in SLURM is under limit.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
limit int

Number of jobs to check for.

required
match str

Prefix to match jobs by.

required

Returns:

Type Description
bool

True if number of jobs is less than limit, otherwise false.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def jobs(ctx, limit, match):
    """Check: Assert whether number of jobs in SLURM is under limit.

    Args:
        ctx (object): Contextual configuration.
        limit (int): Number of jobs to check for.
        match (str): Prefix to match jobs by.

    Returns:
        (bool): True if number of jobs is less than limit, otherwise false.
    """

    # TODO: match with regex
    async with cluster.job_lock:
        job_list = await cluster.current_jobs(ctx, match)
        res = len(job_list) < int(limit)

        logging.debug("Jobs in action {} with limit {}".format(
            len(job_list), limit))

    return res
quota(ctx, atleast, mnt=None) async

Check: Make sure quota is sufficient.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
atleast int

Amount in kB.

required
mnt str

Path for mount to check quota on if explicitly required.

None

Returns:

Type Description
bool

True if available space is less than atleast, false otherwise.

Raises:

Type Description
IndexError / TypeError

If quota cannot be determined.

Source code in model_ensembler/tasks/hpc.py
@check_task
async def quota(ctx, atleast, mnt=None):
    """Check: Make sure quota is sufficient.

    Args:
        ctx (object): Contextual configuration.
        atleast (int): Amount in kB.
        mnt (str, optional): Path for mount to check quota on if explicitly
                            required.

    Returns:
        (bool): True if available space is less than atleast, false otherwise.

    Raises:
        IndexError/TypeError: If quota cannot be determined.
    """
    # Command responds in 1k blocks
    path_arg = " -f " + mnt if mnt else ""
    quota_cmd = "quota -uw" + path_arg
    res = await execute_command(quota_cmd)
    quota_out = res.stdout.decode()

    try:
        fields = quota_out.splitlines()[-1].split()
        usage = int(fields[1])
        limit = int(fields[2])
        atleast = int(atleast)
    except (IndexError, TypeError):
        logging.exception("Could not reliably determine quota information")
        return False

    res = (limit - usage) >= atleast

    if not res:
        logging.warning("Quota remaining {} is less than {}".
                        format(limit - usage, atleast))
    return res
submit(ctx, script=None) async

Process: Submit a new job to SLURM.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
script str

Slurm submission script for sbatch.

None

Returns:

Type Description
int

Job identifier.

Source code in model_ensembler/tasks/hpc.py
@processing_task
async def submit(ctx, script=None):
    """Process: Submit a new job to SLURM.

    Args:
        ctx (object): Contextual configuration.
        script (str, optional): Slurm submission script for sbatch.

    Returns:
        (int): Job identifier.
    """

    # TODO: check this as an optional argument avoids run submission
    #  as intended
    if script:
        async with cluster.job_lock:
            await cluster.submit_job(ctx, script)

    return None

sys

check(ctx, cmd, cwd=None, log=False, fail=False, shell=None) async

Check: Call arbitrary command as a check.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
fail bool

If true, then the check returning nonzero will raise an error rather than return false, meaning run abandonment rather than recheck will take place.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
FailureNotToleratedError

Error when fail is true and the check returns a nonzero code.

Source code in model_ensembler/tasks/sys.py
@check_task
async def check(ctx, cmd, cwd=None, log=False, fail=False, shell=None):
    """Check: Call arbitrary command as a check.

    Args:
        ctx (object): Contextual configuration.
        cmd (str): See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        fail (bool, optional): If true, then the check returning nonzero will
            raise an error rather than return false, meaning run abandonment
            rather than recheck will take place.
        shell (str, optional):  See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        FailureNotToleratedError: Error when fail is true and the check
            returns a nonzero code.
    """

    logging.info("Running check: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    logging.debug("Check return code {}".format(res.returncode))

    if res.returncode == 0:
        return True

    if fail:
        raise FailureNotToleratedError("Check failed and it is not tolerable")
    return False
execute(ctx, cmd, cwd=None, log=False, shell=None) async

Process: Call arbitrary command as a processing task.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
cmd str

See utils.execute_command.

required
cwd str

See utils.execute_command.

None
log bool

See utils.execute_command.

False
shell str

See utils.execute_command.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def execute(ctx, cmd, cwd=None, log=False, shell=None):
    """Process: Call arbitrary command as a processing task.

    Args:
        ctx (object): Contextual configuration.
        cmd (str):  See ``utils.execute_command``.
        cwd (str, optional): See ``utils.execute_command``.
        log (bool, optional): See ``utils.execute_command``.
        shell (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): True if return code is zero, false otherwise.
    """
    logging.info("Running command: {}".format(cmd))

    res = await execute_command(cmd, cwd, log, shell)
    if res.returncode == 0:
        return True
    return False
move(ctx, dest, include=None, exclude=None, cwd=None) async

Process: rsync current working directory contents.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
dest str

Path to copy ctx.id named directory to.

required
include List[str]

rsync include specifiers.

None
exclude List[str]

rsync exclude specifiers, defaults to "*" when calling rsync if include specifiers are given and no exclude specifiers are provided.

None
cwd str

See utils.execute_command.

None

Returns:

Type Description
bool

true if return code is zero, false otherwise.

Raises:

Type Description
RuntimeError

If did not provide necessary context attribute for using this processing task.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def move(ctx, dest, include=None, exclude=None, cwd=None):
    """Process: rsync current working directory contents.

    Args:
        ctx (object): Contextual configuration.
        dest (str): Path to copy ctx.id named directory to.
        include (List[str], optional): rsync include specifiers.
        exclude (List[str], optional): rsync exclude specifiers, defaults to
            "*" when calling rsync if include specifiers are given and no
            exclude specifiers are provided.
        cwd (str, optional): See ``utils.execute_command``.

    Returns:
        (bool): true if return code is zero, false otherwise.

    Raises:
        RuntimeError: If did not provide necessary context attribute for using
            this processing task.
    """

    if not hasattr(ctx, "id"):
        raise RuntimeError("No ID available for move")

    # TODO: Type checking
    include = [] if not include else include
    exclude = ["*"] if not exclude and include else []
    dest = os.path.join(dest, ctx.id)

    liststr = " ".join(["--include=\"{}\"".
                       format(i.strip("\"")) for i in include]) + " " + \
        " ".join(["--exclude=\"{}\"".
                 format(e.strip("\"")) for e in exclude])

    cmd = "rsync -aXE {} ./ {}/".format(liststr, dest)
    logging.info(cmd)

    res = await execute_command(cmd, cwd)
    if res.returncode == 0:
        return True
    return False
remove(ctx, directory=None) async

Process: Remove directory using shutil.rmtree.

Parameters:

Name Type Description Default
ctx object

Contextual configuration.

required
directory str

Specify the directory to remove, otherwise this will use the path specified by ctx.dir.

None

Returns:

Type Description
bool

True if return code is zero, false otherwise.

Raises:

Type Description
OSError

If directory cannot be removed.

Source code in model_ensembler/tasks/sys.py
@processing_task
async def remove(ctx, directory=None):
    """Process: Remove directory using shutil.rmtree.

    Args:
        ctx (object): Contextual configuration.
        directory (str, optional): Specify the directory to remove, otherwise
            this will use the path specified by ctx.dir.

    Returns:
        (bool): True if return code is zero, false otherwise.

    Raises:
        OSError: If directory cannot be removed.
    """
    if not directory:
        directory = ctx.dir

    logging.info("Attempting to remove data on {}".format(directory))

    try:
        shutil.rmtree(directory)
    except OSError as e:
        logging.exception("Could not remove {}: {}".
                          format(directory, e.strerror))
        return False
    return True

utils

execute_command(cmd, cwd=None, log=False, shell=None) async

Standard handling for calling external command.

Parameters:

Name Type Description Default
cmd str

The relative path of the command being called to cwd.

required
cwd str

The current working directory to call the cmd from, passed to subprocess.

None
log bool

If true, output stdout/stderr to logfile in cwd.

False
shell str

Which shell to ask subprocess to invoke when processing the command, will default to bash internally.

None

Returns:

Type Description
object

Namespace containing the returncode, stdout and stderr from the process that was invoked.

Source code in model_ensembler/tasks/utils.py
async def execute_command(cmd, cwd=None, log=False, shell=None):
    """Standard handling for calling external command.

    Args:
        cmd (str): The relative path of the command being called to cwd.
        cwd (str, optional): The current working directory to call the cmd
            from, passed to subprocess.
        log (bool, optional): If true, output stdout/stderr to logfile in cwd.
        shell (str, optional): Which shell to ask subprocess to invoke when
            processing the command, will default to bash internally.

    Returns:
        (object): Namespace containing the returncode, stdout and stderr from
            the process that was invoked.
    """

    logging.debug("Executing command {0}, cwd {1}".
                  format(cmd, cwd if cwd else "unset"))

    start_dt = datetime.now()

    args = Arguments()
    shell = args.shell if not shell else shell

    proc = await asyncio.create_subprocess_shell(
        cmd,
        executable=shell,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT,
        cwd=cwd)

    (stdout, stderr) = await proc.communicate()
    await proc.wait()

    if log and stdout:
        log_name = "execute_command.{}.log".\
            format(start_dt.strftime("%H%M%S.%f"))

        if cwd:
            log_name = os.path.join(cwd, log_name)

        with open(log_name, "w") as fh:
            fh.write(stdout.decode())

        logging.info("Command log written to {}".format(log_name))

    ret = types.SimpleNamespace(
        returncode=proc.returncode, stdout=stdout, stderr=stderr)

    if ret.returncode != 0:
        logging.warning("Command returned err: {}".format(ret.stderr))
        return ret
    else:
        logging.debug("Command successful")
    return ret
flight_task(func, check=True)

Decorator for making func as a task, providing context preprocessing.

Parameters:

Name Type Description Default
func callable

Callable to wrap with context.

required
check bool

Determine whether the func is to be treated as a check or another type of action (checks can be skipped).

True

Returns:

Type Description
func

The wrapped function that can process the context provided appropriately.

Source code in model_ensembler/tasks/utils.py
def flight_task(func, check=True):
    """Decorator for making func as a task, providing context preprocessing.

    Args:
        func (callable): Callable to wrap with context.
        check (bool, optional): Determine whether the func is to be treated
            as a check or another type of action (checks can be skipped).

    Returns:
        (func): The wrapped function that can process the context provided
            appropriately.
    """

    @functools.wraps(func)
    def new_func(ctx, *args, **kwargs):
        config = Arguments()

        for k, v in kwargs.items():
            try:
                if type(v) == str and str(v).startswith("run."):
                    nom = v.split(".")[1]
                    kwargs[k] = getattr(ctx, nom)
            finally:
                logging.debug("Calling with {} = {}".format(
                    k, kwargs[k])
                )

        # FIXME: context magic for execute_command calls below,
        #  not necessarily the best manner to achieve this as it would be
        #  better handled via the decorator
        if hasattr(ctx, 'dir') and 'cwd' in inspect.signature(func).parameters:
            kwargs['cwd'] = ctx.dir

        if config.no_checks and check:
            logging.info("Skipping checks for {}".format(func.__name__))
            return True
        return func(ctx, *args, **kwargs)

    new_func.check = check
    return new_func

templates

prepare_run_directory(batch, run) async

Preparing directory for each run from batch templates

Parameters:

Name Type Description Default
batch object

Whole batch configuration.

required
run object

Specific run configuration.

required

Raises:

Type Description
TemplatingError

If template directory cannot be moved from source to destination.

Source code in model_ensembler/templates.py
async def prepare_run_directory(batch, run):
    """ Preparing directory for each run from batch templates

    Args:
        batch (object): Whole batch configuration.
        run (object): Specific run configuration.

    Raises:
        TemplatingError: If template directory cannot be moved from source
                        to destination.
    """
    args = Arguments()

    if args.pickup and os.path.exists(run.dir):
        logging.info("Picked up previous job directory for run {}".
                     format(run.id))

        for tmpl_file in batch.templates:
            src_path = os.path.join(batch.templatedir, tmpl_file)
            dst_path = shutil.copy(src_path, os.path.join(run.dir, tmpl_file))
            logging.info("Re-copied {} to {} for template regeneration".
                         format(src_path, dst_path))
    else:
        if os.path.exists(run.dir):
            raise TemplatingError("Run directory {} already exists".
                                  format(run.dir))

        os.makedirs(run.dir, mode=0o775)

        cmd = "rsync -aXE {}/ {}/".format(batch.templatedir, run.dir)
        logging.info(cmd)
        proc = await asyncio.create_subprocess_exec(*shlex.split(cmd))
        rc = await proc.wait()

        if rc != 0:
            raise TemplatingError("Could not grab template directory {} to {}".
                                  format(batch.templatedir, run.dir))

process_templates(run, template_list)

Render templates based on provided context.

Parameters:

Name Type Description Default
run object

Specific run configuration.

required
template_list list

Paths to template sources.

required

Raises:

Type Description
TemplatingError

If cannot template using the provided format.

Source code in model_ensembler/templates.py
def process_templates(run, template_list):
    """Render templates based on provided context.

    Args:
        run (object): Specific run configuration.
        template_list (list): Paths to template sources.

    Raises:
        TemplatingError: If cannot template using the provided format.
    """
    for tmpl_file in template_list:
        if tmpl_file[-3:] != ".j2":
            raise TemplatingError("{} doe not appear to be a Jinja2 template "
                                  "(.j2)".format(tmpl_file))

        try:
            tmpl_path = os.path.join(run.dir, tmpl_file)
            with open(tmpl_path, "r") as fh:
                tmpl_data = fh.read()

            dst_file = tmpl_path[:-3]
            logging.info("Templating {} to {}".format(tmpl_path, dst_file))
            tmpl = jinja2.Template(tmpl_data)
            dst_data = tmpl.render(run=run)
            with open(dst_file, "w+") as fh:
                fh.write(dst_data)
            os.chmod(dst_file, os.stat(tmpl_path).st_mode)
            os.unlink(tmpl_path)
        except OSError:
            raise TemplatingError("Could not template {}".format(tmpl_file))

utils

Arguments

Bases: object

Singleton implementation of the arguments as an immutable object

Source code in model_ensembler/utils.py
class Arguments(object):
    """Singleton implementation of the arguments as an immutable object"""

    class __Arguments(object):
        """Arguments inner singleton"""

        def __init__(self, **kwargs):
            for k, v in kwargs.items():
                setattr(self, k, v)

    instance = None

    def __init__(self, **kwargs):
        if not Arguments.instance:
            Arguments.instance = Arguments.__Arguments(**kwargs)

    def __getattr__(self, item):
        return getattr(self.instance, item)
__Arguments

Bases: object

Arguments inner singleton

Source code in model_ensembler/utils.py
class __Arguments(object):
    """Arguments inner singleton"""

    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)

background_fork(double=False)

Allows for the calling process to fork into the background.

Parameters:

Name Type Description Default
double bool

If true, we'll fork again as to allow the parent of the first child to kill it, leaving the daemon process.

False

Raises:

Type Description
OSError

If fork fails.

Source code in model_ensembler/utils.py
def background_fork(double=False):
    """Allows for the calling process to fork into the background.

    Args:
        double (bool): If true, we'll fork again as to allow the parent of
                       the first child to kill it, leaving the daemon process.

    Raises:
        OSError: If fork fails.
    """
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError as e:
        print("Fork failed: {} ({})".format(e.errno, e.strerror))
        sys.exit(1)

    os.setsid()

    if double:
        background_fork()

setup_logging(name='', level=logging.INFO, verbose=False, logdir=os.path.join('logs'), logformat='[%(asctime)-20s :%(levelname)-8s] - %(message)s')

Sets up logging library for output with user friendly options.

Parameters:

Name Type Description Default
name string

A string indicating the name of the caller.

''
level int

logging level enum.

INFO
verbose bool

Shorthand for setting level to logging.DEBUG.

False
logdir string

If logging to a file, allow directory destination.

join('logs')
logformat string

Allow specification of logging format string.

'[%(asctime)-20s :%(levelname)-8s] - %(message)s'
Source code in model_ensembler/utils.py
def setup_logging(name='',
                  level=logging.INFO,
                  verbose=False,
                  logdir=os.path.join("logs"),
                  logformat="[%(asctime)-20s :%(levelname)-8s] - %(message)s",
                  ):
    """Sets up `logging` library for output with user friendly options.

    Args:
        name (string): A string indicating the name of the caller.
        level (int): `logging` level enum.
        verbose (bool): Shorthand for setting level to `logging.DEBUG`.
        logdir (string): If logging to a file, allow directory destination.
        logformat (string): Allow specification of `logging` format string.
    """
    if verbose:
        level = logging.DEBUG

    if not os.path.exists(logdir):
        os.makedirs(logdir)

    logging.basicConfig(
        level=level,
        format=logformat,
        datefmt="%d-%m-%y %T",
    )

    if logdir:
        file_handler = logging.handlers.TimedRotatingFileHandler(
            os.path.join(logdir, "{}.log".format(name)),
            when='midnight',
            utc=True
        )
        file_handler.setLevel(level)
        file_formatter = logging.Formatter(
            fmt='%(asctime)-25s%(levelname)-17s%(message)s',
            datefmt='%H:%M:%S'
        )
        file_handler.setFormatter(file_formatter)
        logging.getLogger().addHandler(file_handler)