Skip to content

model_ensembler

Top-level package for model_ensembler.

batcher

extra_ctx = contextvars.ContextVar('extra') module-attribute

Main ensembler module with execution core code

BatchExecutor

Bases: object

Create an executor for a ensemble configuration.

The purpose of this is act as the extensible master executor for the ensemble configuration provided. It handles the event loop and should be used to contain and control the execution overall.

Source code in build/lib/model_ensembler/batcher.py
class BatchExecutor(object):
    """Create an executor for a ensemble configuration.

    The purpose of this is act as the extensible master executor for the
    ensemble configuration provided. It handles the event loop and should be
    used to contain and control the execution overall.
    """

    def __init__(self, cfg, backend="slurm", extra_vars=[]):
        """Constructor.

        Args:
            cfg (object): EnsembleConfig ensemble configuration.
            backend (str): Backend to execute on,
                        should be one of {'dummy'|'slurm'}.
            extra_vars (list): Additional variables.
        """
        self._cfg = cfg

        self._init_cluster(backend)
        self._init_ctx(extra_vars)

    def _init_cluster(self, backend):
        """Initialise the cluster backend for batch execution.

        Args:
            backend (str): identifier for backend in
            `model_ensembler.cluster`.

        Raises:
            ModuleNotFoundError: If cluster backend specified is not supported.
        """
        nom = "model_ensembler.cluster.{}".format(backend)
        try:
            mod = importlib.import_module(nom)
        except ModuleNotFoundError:
            raise NotImplementedError("No {} implementation exists in "
                                      "model_ensembler.cluster!".
                                      format(backend))

        init_hpc_backend(nom)
        cluster_ctx.set(mod)

    def _init_ctx(self, extra_vars):
        """Initialise contexts.

        Initialise the root context vars for batch execution and set
        extra context vars that can be added last thing to the run.

        Args:
            extra_vars (list): Additional variables.
        """
        var_dict = run_ctx.get(dict())
        var_dict.update(self._cfg.vars)
        run_ctx.set(var_dict)

        extra_ctx.set(extra_vars)

    def run(self, loop=None):
        """Run the executor.

        This will establish the event loop, run the preprocessing actions for
        the ensemble, cycle through executing the batches and then run
        postprocessing actions. Exceptions will be caught and the event loop
        closed, currently with no specific handling.

        Args:
            loop (object): Event loop.
        """
        logging.info("Running batcher")

        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(
                run_task_items(self._cfg.pre_process))

            # Should batch be in function signature?
            for batch in self._cfg.batches:
                do_batch_execution(loop, batch, repeat=batch.repeat)
                # do_batch_execution(loop, batch) moves to
                # self.execute(loop, batch)

            loop.run_until_complete(
                run_task_items(self._cfg.post_process))

        finally:
            if loop:
                loop.run_until_complete(loop.shutdown_asyncgens())
                loop.close()

# What does this do?
    def execute(self, loop, batch):
        raise NotImplementedError("")
__init__(cfg, backend='slurm', extra_vars=[])

Constructor.

Parameters:

Name Type Description Default
cfg object

EnsembleConfig ensemble configuration.

required
backend str

Backend to execute on, should be one of {'dummy'|'slurm'}.

'slurm'
extra_vars list

Additional variables.

[]
Source code in build/lib/model_ensembler/batcher.py
def __init__(self, cfg, backend="slurm", extra_vars=[]):
    """Constructor.

    Args:
        cfg (object): EnsembleConfig ensemble configuration.
        backend (str): Backend to execute on,
                    should be one of {'dummy'|'slurm'}.
        extra_vars (list): Additional variables.
    """
    self._cfg = cfg

    self._init_cluster(backend)
    self._init_ctx(extra_vars)
run(loop=None)

Run the executor.

This will establish the event loop, run the preprocessing actions for the ensemble, cycle through executing the batches and then run postprocessing actions. Exceptions will be caught and the event loop closed, currently with no specific handling.

Parameters:

Name Type Description Default
loop object

Event loop.

None
Source code in build/lib/model_ensembler/batcher.py
def run(self, loop=None):
    """Run the executor.

    This will establish the event loop, run the preprocessing actions for
    the ensemble, cycle through executing the batches and then run
    postprocessing actions. Exceptions will be caught and the event loop
    closed, currently with no specific handling.

    Args:
        loop (object): Event loop.
    """
    logging.info("Running batcher")

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(
            run_task_items(self._cfg.pre_process))

        # Should batch be in function signature?
        for batch in self._cfg.batches:
            do_batch_execution(loop, batch, repeat=batch.repeat)
            # do_batch_execution(loop, batch) moves to
            # self.execute(loop, batch)

        loop.run_until_complete(
            run_task_items(self._cfg.post_process))

    finally:
        if loop:
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.close()

do_batch_execution(loop, batch, repeat=False)

Execute a batch configuration.

Parameters:

Name Type Description Default
loop object

Event loop.

required
batch object

Batch configuration.

required
repeat number

Loop n times.

False

Returns:

Type Description
str

Prints "Success" on completion.

Raises:

Type Description
ProcessingException

If there is pre or post_batch processing error.

Source code in build/lib/model_ensembler/batcher.py
def do_batch_execution(loop, batch, repeat=False):
    """Execute a batch configuration.

    Args:
        loop (object): Event loop.
        batch (object): Batch configuration.
        repeat (number): Loop n times.

    Returns:
        (str): Prints "Success" on completion.

    Raises:
        ProcessingException: If there is pre or post_batch processing error.
    """

    logging.info("Start batch: {}".format(datetime.utcnow()))
    logging.debug(pformat(batch))

    args = Arguments()
    skip_indexes = args.indexes if args.indexes else list()
    batch_ctx.set(batch)

    batch_dict = {k: v
                  for k, v in batch._asdict().items() \
                  if not (k.startswith("pre_")
                          or k.startswith("post_")
                          or k in "runs")
                  and not (k in ["cluster", "email", "nodes", "ntasks", "length"]
                           and v is None)}

    run_vars = run_ctx.get()
    run_vars.update(batch_dict)
    run_ctx.set(run_vars)

    # We are process dependent here, so this is where we have the choice of
    # concurrency strategies but each batch
    # is dependent on chdir remaining consistent after this point.
    orig = os.getcwd()
    if not os.path.exists(batch.basedir):
        os.makedirs(batch.basedir, exist_ok=True)
    os.chdir(batch.basedir)

    # TODO: Gross implementation for #26 - repeat parameter, this should be
    #  abstracted away into executor implementations (BatchExecutor.execute)
    #  and made to work better
    if not repeat:
        repeat_count = 2
    else:
        logging.warning("We are due to repeat until a batch check fails us")
        repeat_count = 1000000

    for rep_i in range(1, repeat_count):
        logging.info("Running cycle {}".format(rep_i))

        batch_tasks = list()
        _batch_job_sems[batch.name] = asyncio.Semaphore(batch.maxjobs)

        try:
            loop.run_until_complete(run_task_items(batch.pre_batch))
        except ProcessingException:
            logging.error("We have received a pre_batch failure, "
                          "will stop execution")
            break

        if len(sorted(set(skip_indexes))) == len(batch.runs):
            logging.error("No longer able to run this batch, all runs are in "
                          "the indexes to skip")
            break

        for idx, run in enumerate(batch.runs):
            # Auto-generated context vars for run
            run['idx'] = idx
            run['id'] = "{}-{}".format(batch.name, run['idx'])
            run['dir'] = os.path.abspath(os.path.join(os.getcwd(), run['id']))
            run['batch_idx'] = rep_i

            if idx < args.skips:
                logging.warning("Skipping run index {} due to {} skips, run "
                                "ID: {}".format(idx, args.skips, run['id']))
                continue

            if idx in skip_indexes:
                logging.warning("Skipping run index {} due to being in "
                                "skip indexes, run ID: {}".
                                format(idx, run['id']))
                continue

            # At this point the context changes at root to property based
            ctx_dict = run_ctx.get()
            ctx_dict.update(run)
            ctx_dict.update(extra_ctx.get())

            Run = collections.namedtuple('Run', field_names=ctx_dict.keys())
            r = Run(**ctx_dict)
            task = run_batch_item(r)
            batch_tasks.append(task)

        batch_results = loop.run_until_complete(
            run_runner(batch.maxruns, batch_tasks))

        for idx, result in enumerate(batch_results):
            job, run = result
            logging.debug("Batch {} result #{} from run {}: job {}".format(
                batch.name, idx, run.idx, str(job)
            ))

            if not job and run.idx not in skip_indexes:
                logging.warning("Result #{} for run {} indicates unsuccessful "
                                "submission, adding to indexes to skip".
                                format(idx, run.idx))
                skip_indexes.append(run.idx)

        try:
            loop.run_until_complete(run_task_items(batch.post_batch))
        except ProcessingException:
            logging.error("We have received a post_batch failure, "
                          "will stop execution")
            break

    os.chdir(orig)
    logging.info("Batch {} completed: {}".
                 format(batch.name, datetime.utcnow()))
    # TODO: return batch windows/info
    return "Success"

run_batch_item(run) async

Execute a run configuration.

Parameters:

Name Type Description Default
run object

Specific run configuration.

required

Returns:

Name Type Description
job_id int

Job id number.

run object

Specific run configuration.

Raises:

Type Description
TemplatingError

If a job cannot be templated.

ProcessingException

If an individual run failure is caught.

Source code in build/lib/model_ensembler/batcher.py
async def run_batch_item(run):
    """Execute a run configuration.

    Args:
        run (object): Specific run configuration.

    Returns:
        job_id (int): Job id number.
        run (object): Specific run configuration.

    Raises:
        TemplatingError: If a job cannot be templated.
        ProcessingException: If an individual run failure is caught.
    """

    # TODO: my understanding is that all context from here through to end
    #  methods/tasks will now be under run_context in do_batch_execution
    batch = batch_ctx.get()
    run_ctx.set(run)
    cluster = cluster_ctx.get()

    logging.info("Start run {} at {}".format(run.id, datetime.utcnow()))
    logging.debug(pformat(run))

    args = Arguments()
    job_id = None

    try:
        await prepare_run_directory(batch, run)
        process_templates(run, batch.templates)
    except TemplatingError as e:
        # We catch gracefully and just prevent the run from happening
        logging.error("We cannot template the job {}: {}".format(run.id, e))
        return job_id, run

    # It's very tempting to move pre_run, but don't: we DO NOT execute until
    # job is templated. Instead I've created the ability to run tasks prior
    # to preparation/templating of the job for scenarios where you don't want
    # the templating to error out/job to even be prepared
    try:
        await run_task_items(batch.pre_run)

        if args.no_submission:
            logging.info("Skipping actual slurm submission based on arguments")
        else:
            async with _batch_job_sems[batch.name]:
                func = getattr(model_ensembler.tasks, "jobs")
                check = collections.namedtuple("check", ["args"])

                await run_check(func, check({
                    "limit": batch.maxjobs,
                    "match": batch.name,
                }))

                job_id = await cluster.submit_job(run, script=batch.job_file)

                if not job_id:
                    logging.exception(
                        "{} could not be submitted, we won't continue".format(
                            batch.name
                        ))
                else:
                    running = False
                    state = None

                    while not running:
                        try:
                            job = await cluster.find_id(job_id)
                            state = job.state
                        except (IndexError, ValueError) as e:
                            logging.warning("Job {} not registered yet, "
                                            "or error encountered".
                                            format(job_id))
                            logging.exception(e)

                        if state and (
                                state in cluster.START_STATES or
                                state in cluster.FINISH_STATES):
                            running = True
                        else:
                            await asyncio.sleep(args.submit_timeout)

                    while True:
                        try:
                            job = await cluster.find_id(job_id)
                            state = job.state
                        except (IndexError, ValueError):
                            logging.exception("Job status for run {} retrieval"
                                              " whilst slurm running, waiting "
                                              "and retrying".
                                              format(run.id))
                            await asyncio.sleep(args.error_timeout)
                            continue

                        logging.debug("{} monitor got state {} for job {}".
                                      format(run.id, state, job_id))

                        if state in cluster.FINISH_STATES:
                            logging.info("{} monitor got state {} for job {}".
                                         format(run.id, state, job_id))
                            break
                        else:
                            await asyncio.sleep(args.running_timeout)

        await run_task_items(batch.post_run)
    except ProcessingException:
        logging.error("Run failure caught, abandoning {} but not the "
                      "batch".format(run.id))

    logging.info("End run {} at {}".format(run.id, datetime.utcnow()))
    return job_id, run

cli

check()

CLI native sanity checking Contains pre-set sanity check configuration, combines them with the user's CLI arguments in a list (e.g. dummy/slurm), which is passed to main().

Allow checking of successful installation.

Source code in build/lib/model_ensembler/cli.py
def check():
    """CLI native sanity checking
    Contains pre-set sanity check configuration, combines them with
    the user's CLI arguments in a list (e.g. dummy/slurm), which is passed to main().

    Allow checking of successful installation.
    """
    # Get the user CLI args
    user_args = sys.argv[1:]

    # Directly pass sanity check yml + user args to
    # the argument parser as args_list
    args_list = ["examples/sanity-check.yml"] + user_args
    args = parse_args(args_list)

    main(args)

main(args=None)

CLI entry point.

Source code in build/lib/model_ensembler/cli.py
def main(args=None):
    """CLI entry point.
    """
    if args is None:
        args = parse_args()

    if args.daemon:
        background_fork(True)

    setup_logging("{}".format(os.path.basename(args.configuration)),
                  verbose=args.verbose)

    logging.info("Model Ensemble Runner")

    config = EnsembleConfig(args.configuration)
    # TODO: get_batch_executor
    BatchExecutor(config,
                  args.backend,
                  dict(args.extra)).run()

parse_args(args_list=None)

Parse command line parameters.

Returns:

Type Description
object

Arguments(), immutable instance from .utils.

Source code in build/lib/model_ensembler/cli.py
def parse_args(args_list=None):
    """Parse command line parameters.

    Returns:
        (object): Arguments(), immutable instance from ``.utils``.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--daemon",
                        help="Daemonise the ensembler", default=False,
                        action="store_true")

    # TODO: Need to validate the argument selections/group certain commands
    parser.add_argument("-v", "--verbose",
                        help="Log verbosely", default=False,
                        action="store_true")
    parser.add_argument("-c", "--no-checks",
                        help="Do not run check commands", default=False,
                        action="store_true")
    parser.add_argument("-s", "--no-submission",
                        help="Do not try to submit job, just log the step",
                        default=False, action="store_true")
    parser.add_argument("-p", "--pickup",
                        help="Continue a previous set of runs by picking up "
                        "existing directories rather, than assuming to create "
                        "them; for example if ensemble has previously failed",
                        default=False, action="store_true")
    parser.add_argument("-l", "--shell",
                        help="Allows the user to specify the shell passed to "
                             "subprocess execs.",
                        default="/bin/bash", type=str)

    # FIXME: These should not be applied in multi-batch ensembles
    parser.add_argument("-k", "--skips",
                        help="Number of run entries to skip", default=0,
                        type=int)

    parser.add_argument("-i", "--indexes",
                        help="Specify which indexes to run",
                        type=parse_indexes)

    parser.add_argument("-ct", "--check-timeout", default=30, type=int)
    parser.add_argument("-st", "--submit-timeout", default=20, type=int)
    parser.add_argument("-rt", "--running-timeout", default=60, type=int)
    parser.add_argument("-et", "--error-timeout", default=120, type=int)

    parser.add_argument("-ms", "--max-stagger", default=1, type=int)

    parser.add_argument("-x", "--extra-vars", dest="extra", nargs="*",
                        default=[], type=parse_extra_vars)

    parser.add_argument("configuration")
    parser.add_argument("backend", default="slurm", choices=("slurm", "dummy"),
                        nargs="?")

    # Required to allow passing pre-set config to be 
    # passed as first positional argument
    if args_list is None:
        parsed_args = parser.parse_args()
    else:
        parsed_args = parser.parse_args(args_list)

    # Prefer retaining immutable Arguments()
    # by not using the instance as a namespace
    return Arguments(**vars(parsed_args))

parse_extra_vars(arg)

Method for processing extra var arguments.

Parameters:

Name Type Description Default
arg tuple

Collection of extra var arguments.

required

Returns:

Type Description
tuple

Name and value for the argument to be overridden.

Raises: argparse.ArgumentTypeError: If arguments do not match.

Source code in build/lib/model_ensembler/cli.py
def parse_extra_vars(arg):
    """ Method for processing extra var arguments.

    Args:
        arg (tuple): Collection of extra var arguments.

    Returns:
        (tuple): Name and value for the argument to be overridden.
    Raises:
        argparse.ArgumentTypeError: If arguments do not match.
    """

    arg_match = re.match(r'^([^=]+)=(.+)$', arg)
    if arg_match:
        return arg_match.groups()
    raise argparse.ArgumentTypeError("Argument does not match "
                                     "name=value format: {}".format(arg))

parse_indexes(argv)

Method for ensuring a CSV string of integers.

Parameters:

Name Type Description Default
argv list

expecting delimited integer list.

required

Returns:

Type Description
list

Matched integer values.

Raises:

Type Description
ArgumentTypeError

If argv is not CSV delimited integer list.

Source code in build/lib/model_ensembler/cli.py
def parse_indexes(argv):
    """ Method for ensuring a CSV string of integers.

    Args:
        argv (list): expecting delimited integer list.

    Returns:
        (list): Matched integer values.

    Raises:
        argparse.ArgumentTypeError: If argv is not CSV delimited integer list.
    """
    if re.match(r'^([0-9]+,)*[0-9]+$', argv):
        return [int(v) for v in argv.split(",")]
    raise argparse.ArgumentTypeError("{} is not a CSV delimited integer list "
                                     "of indexes".format(argv))

config

Batch

Bases: BatchSpec, TaskArrayMixin

Class to represent the entirety of the ensembler configuration.

Parameters:

Name Type Description Default
pre_batch object

TaskArray configuration setup.

None
pre_run object

TaskArray configuration setup.

None
post_run object

TaskArray configuration setup.

None
post_batch object

TaskArray configuration setup.

None
**kwargs tuple

Keyword arguments for BatchSpec instance.

{}
Source code in build/lib/model_ensembler/config.py
class Batch(BatchSpec, TaskArrayMixin):
    """Class to represent the entirety of the ensembler configuration.

    Args:
        pre_batch (object): TaskArray configuration setup.
        pre_run (object): TaskArray configuration setup.
        post_run (object): TaskArray configuration setup.
        post_batch (object): TaskArray configuration setup.
        **kwargs (tuple): Keyword arguments for BatchSpec instance.
    """
    def __init__(self,
                 *args,
                 pre_batch=None, pre_run=None, post_run=None, post_batch=None,
                 **kwargs):
        super().__init__()
        self._pre_batch = pre_batch
        self._pre_run = pre_run
        self._post_run = post_run
        self._post_batch = post_batch

    @property
    def pre_batch(self):
        """ Property decorator managing pre batch tasks.

        Returns:
            (list): Pre batch tasks.
        """
        return self.task_array("_pre_batch")

    @property
    def pre_run(self):
        """ Property decorator managing pre run tasks.

        Returns:
            (list): Pre run tasks.
        """
        return self.task_array("_pre_run")

    @property
    def post_run(self):
        """ Property decorator managing post run tasks.

        Returns:
            (list): Post run tasks.
        """
        return self.task_array("_post_run")

    @property
    def post_batch(self):
        """ Property decorator managing post batch tasks.

        Returns:
            (list): Post batch tasks.
        """
        return self.task_array("_post_batch")
post_batch property

Property decorator managing post batch tasks.

Returns:

Type Description
list

Post batch tasks.

post_run property

Property decorator managing post run tasks.

Returns:

Type Description
list

Post run tasks.

pre_batch property

Property decorator managing pre batch tasks.

Returns:

Type Description
list

Pre batch tasks.

pre_run property

Property decorator managing pre run tasks.

Returns:

Type Description
list

Pre run tasks.

EnsembleConfig

Bases: YAMLConfig, TaskArrayMixin

Class to represent the entirety of the ensembler configuration.

Parameters:

Name Type Description Default
*args tuple

See YAMLConfig.

()
**kwargs tuple

arbitrary keyword arguments.

{}
Source code in build/lib/model_ensembler/config.py
class EnsembleConfig(YAMLConfig, TaskArrayMixin):
    """Class to represent the entirety of the ensembler configuration.

    Args:
        *args (tuple): See ``YAMLConfig``.
        **kwargs (tuple): arbitrary keyword arguments.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._vars = self._data['ensemble']['vars']
        self._pre_process = self._data['ensemble']['pre_process']
        self._post_process = self._data['ensemble']['post_process']
        self._batches = self._data['ensemble']['batches']

    @property
    def pre_process(self):
        """ Property decorator managing preprocessing attributes.

        Returns:
            (list): Preprocessing Tasks.
        """
        return self.task_array("_pre_process")

    @property
    def post_process(self):
        """ Property decorator managing postprocessing attributes.

        Returns:
            (list): Postprocessing Tasks.
        """
        return self.task_array("_post_process")

    @property
    def batches(self):
        """ Property decorator managing batches contained in ensemble config.

        Returns:
            (list): Batches contained in the ensemble configuration.
        """
        batches = list()
        for batch in self._batches:
            batches.append(Batch(**batch))
        return batches

    @property
    def vars(self):
        """ Property decorator managing vars from the ensemble configuration.

        Returns:
            (dict): vars from ensemble configuration.
        """
        return self._vars
batches property

Property decorator managing batches contained in ensemble config.

Returns:

Type Description
list

Batches contained in the ensemble configuration.

post_process property

Property decorator managing postprocessing attributes.

Returns:

Type Description
list

Postprocessing Tasks.

pre_process property

Property decorator managing preprocessing attributes.

Returns:

Type Description
list

Preprocessing Tasks.

vars property

Property decorator managing vars from the ensemble configuration.

Returns:

Type Description
dict

vars from ensemble configuration.

Task

Bases: TaskSpec

Task definition class derived from the TaskSpec namedtuple

Source code in build/lib/model_ensembler/config.py
class Task(TaskSpec):
    """Task definition class derived from the TaskSpec namedtuple"""
    pass

TaskArrayMixin

Generates sets of Task objects from Batch object members

Source code in build/lib/model_ensembler/config.py
class TaskArrayMixin:
    """Generates sets of Task objects from Batch object members"""

    def task_array(self, attr):
        """Yields Tasks from a configuration instance.

        Args:
            attr (str): Name of the member property in the configuration that
                        defines the list of Task objects.

        Yields:
            (object): Task object
        """
        field = getattr(self, attr)
        if field:
            for raw_task in field:
                yield Task(**raw_task)
        return None
task_array(attr)

Yields Tasks from a configuration instance.

Parameters:

Name Type Description Default
attr str

Name of the member property in the configuration that defines the list of Task objects.

required

Yields:

Type Description
object

Task object

Source code in build/lib/model_ensembler/config.py
def task_array(self, attr):
    """Yields Tasks from a configuration instance.

    Args:
        attr (str): Name of the member property in the configuration that
                    defines the list of Task objects.

    Yields:
        (object): Task object
    """
    field = getattr(self, attr)
    if field:
        for raw_task in field:
            yield Task(**raw_task)
    return None

YAMLConfig

Configuration processor for model-ensemble YAML-based configurations.

Parameters:

Name Type Description Default
configuration str

Name of the YAML configuration to load.

required
Source code in build/lib/model_ensembler/config.py
class YAMLConfig():
    """Configuration processor for model-ensemble YAML-based configurations.

    Args:
        configuration (str): Name of the YAML configuration to load.
    """

    def __init__(self, configuration):
        self._schema = os.path.join(path, "model-ensemble.json")
        self._configuration_file = configuration

        self._schema_data, self._data = \
            self.__class__.validate(self._schema, self._configuration_file)

    @staticmethod
    def validate(json_schema, yaml_file):
        """Validate a YAML configuration against a JSON schema.

        Args:
            json_schema (str): Name of schema to validate against.
            yaml_file (str): Name of the configuration to validate.

        Returns:
            (tuple): contains JSON schema, YAML data.

        Raises:
            RuntimeError: If "name" and "basedir" are specified in
                        "batch_config:" instead of "batches:".
        """
        logging.debug("Assessing {} against {}".format(
            json_schema, yaml_file
        ))

        with open(yaml_file, "r") as fh:
            yaml_data = load(fh, Loader=Loader)

        # FIXME: this is a cheat for extreme batch numbers by allowing common
        #  parameters
        if "batch_config" in yaml_data["ensemble"]:
            batch_config = yaml_data["ensemble"]["batch_config"]

            for batch in yaml_data["ensemble"]["batches"]:
                for k, v in batch_config.items():
                    if k in ["name", "basedir"]:
                        raise RuntimeError("'name' and 'basedir' should be defined "
                                           "for each batch, rather than all batches."
                                           "Please move these from 'batch_config:'"
                                           "to 'batch:'.")
                    if k not in batch:
                        batch[k] = v

        with open(json_schema, "r") as fh:
            json_data = json.load(fh)

        try:
            jsonschema.validate(instance=yaml_data, schema=json_data)
        except jsonschema.ValidationError as e:
            logging.error("There's an error with configuration file: {}".
                          format(yaml_file))
            raise e
        logging.info("Validated configuration file {} successfully".
                     format(yaml_file))
        return json_data, yaml_data
validate(json_schema, yaml_file) staticmethod

Validate a YAML configuration against a JSON schema.

Parameters:

Name Type Description Default
json_schema str

Name of schema to validate against.

required
yaml_file str

Name of the configuration to validate.

required

Returns:

Type Description
tuple

contains JSON schema, YAML data.

Raises:

Type Description
RuntimeError

If "name" and "basedir" are specified in "batch_config:" instead of "batches:".

Source code in build/lib/model_ensembler/config.py
@staticmethod
def validate(json_schema, yaml_file):
    """Validate a YAML configuration against a JSON schema.

    Args:
        json_schema (str): Name of schema to validate against.
        yaml_file (str): Name of the configuration to validate.

    Returns:
        (tuple): contains JSON schema, YAML data.

    Raises:
        RuntimeError: If "name" and "basedir" are specified in
                    "batch_config:" instead of "batches:".
    """
    logging.debug("Assessing {} against {}".format(
        json_schema, yaml_file
    ))

    with open(yaml_file, "r") as fh:
        yaml_data = load(fh, Loader=Loader)

    # FIXME: this is a cheat for extreme batch numbers by allowing common
    #  parameters
    if "batch_config" in yaml_data["ensemble"]:
        batch_config = yaml_data["ensemble"]["batch_config"]

        for batch in yaml_data["ensemble"]["batches"]:
            for k, v in batch_config.items():
                if k in ["name", "basedir"]:
                    raise RuntimeError("'name' and 'basedir' should be defined "
                                       "for each batch, rather than all batches."
                                       "Please move these from 'batch_config:'"
                                       "to 'batch:'.")
                if k not in batch:
                    batch[k] = v

    with open(json_schema, "r") as fh:
        json_data = json.load(fh)

    try:
        jsonschema.validate(instance=yaml_data, schema=json_data)
    except jsonschema.ValidationError as e:
        logging.error("There's an error with configuration file: {}".
                      format(yaml_file))
        raise e
    logging.info("Validated configuration file {} successfully".
                 format(yaml_file))
    return json_data, yaml_data

exceptions

TemplatingError

Bases: RuntimeError

For templating issues

Source code in build/lib/model_ensembler/exceptions.py
3
4
5
class TemplatingError(RuntimeError):
    """For templating issues"""
    pass

runners

run_check(func, check) async

Run a check configuration.

Parameters:

Name Type Description Default
func callable

Async check method.

required
check dict

Check configuration.

required

Raises:

Type Description
CheckException

Any exception from the called check.

Source code in build/lib/model_ensembler/runners.py
async def run_check(func, check):
    """Run a check configuration.

    Args:
        func (callable): Async check method.
        check (dict): Check configuration.

    Raises:
        CheckException: Any exception from the called check.
    """
    result = False
    args = Arguments()

    while not result:
        try:
            logging.debug("PRE CHECK")
            ctx = model_ensembler.batcher.run_ctx.get()
            result = await func(ctx, **check.args)
            logging.debug("POST CHECK")
        except Exception as e:
            logging.exception(e)
            raise CheckException("Issues with flight checks, abandoning")

        if not result:
            logging.debug("Cannot continue, waiting {} seconds for next check".
                          format(args.check_timeout))
            await asyncio.sleep(args.check_timeout)

run_runner(limit, tasks) async

Runs a list of tasks asynchronously.

Given a particular limit, establish a semaphore and run up to limit tasks. Once the list of tasks is complete, return.

Parameters:

Name Type Description Default
limit int

Context object for retrieving configuration.

required
tasks list

Tasks and checks.

required

Returns:

Type Description
list

Completed tasks.

Source code in build/lib/model_ensembler/runners.py
async def run_runner(limit, tasks):
    """Runs a list of tasks asynchronously.

    Given a particular limit, establish a semaphore and run up to limit tasks.
    Once the list of tasks is complete, return.

    Args:
        limit (int): Context object for retrieving configuration.
        tasks (list): Tasks and checks.

    Returns:
        (list): Completed tasks.
    """

    # TODO: return run task windows/info
    sem = asyncio.Semaphore(limit)

    async def sem_task(task):
        async with sem:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))

run_task(func, task) async

Run a task configuration.

Parameters:

Name Type Description Default
run_ctx object

Context object for retrieving configuration.

required
task dict

Task configuration.

required

Raises:

Type Description
TaskException

Any exception from the called task.

Returns:

Type Description
bool

True if task runs without exception.

Source code in build/lib/model_ensembler/runners.py
async def run_task(func, task):
    """Run a task configuration.

    Args:
        run_ctx (object): Context object for retrieving configuration.
        task (dict): Task configuration.

    Raises:
        TaskException: Any exception from the called task.

    Returns:
        (bool): True if task runs without exception.
    """
    try:
        args = dict() if not task.args else task.args
        ctx = model_ensembler.batcher.run_ctx.get()
        await func(ctx, **args)
    except Exception as e:
        logging.exception(e)
        raise TaskException("Issues with flight checks, abandoning")
    return True

run_task_items(items) async

Run a set of task and checks.

Run the list of tasks and check items, the configuration references the model_ensemble.tasks method to use and the context/configuration provides the arguments. TaskException and CheckException are trapped and rethrown as ProcessingException.

Parameters:

Name Type Description Default
items list

Tasks and checks.

required

Raises:

Type Description
ProcessingException

A common exception thrown for failures in the individual tasks.

Source code in build/lib/model_ensembler/runners.py
async def run_task_items(items):
    """Run a set of task and checks.

    Run the list of tasks and check items, the configuration references the
    ``model_ensemble.tasks`` method to use and the context/configuration
    provides the arguments. TaskException and CheckException are trapped and
    rethrown as ProcessingException.

    Args:
        items (list): Tasks and checks.

    Raises:
        ProcessingException: A common exception thrown for failures in the
                            individual tasks.
    """
    try:
        ctx = model_ensembler.batcher.run_ctx.get()

        for item in items:
            func = getattr(model_ensembler.tasks, item.name)

            logging.debug("TASK CWD: {}".format(os.getcwd()))
            logging.debug("TASK CTX: {}".format(pformat(ctx)))
            logging.debug("TASK FUNC: {}".format(pformat(item)))

            if func.check:
                await run_check(func, item)
            else:
                await run_task(func, item)
    except (TaskException, CheckException) as e:
        raise ProcessingException(e)

templates

prepare_run_directory(batch, run) async

Preparing directory for each run from batch templates

Parameters:

Name Type Description Default
batch object

Whole batch configuration.

required
run object

Specific run configuration.

required

Raises:

Type Description
TemplatingError

If template directory cannot be moved from source to destination.

Source code in build/lib/model_ensembler/templates.py
async def prepare_run_directory(batch, run):
    """ Preparing directory for each run from batch templates

    Args:
        batch (object): Whole batch configuration.
        run (object): Specific run configuration.

    Raises:
        TemplatingError: If template directory cannot be moved from source
                        to destination.
    """
    args = Arguments()

    if args.pickup and os.path.exists(run.dir):
        logging.info("Picked up previous job directory for run {}".
                     format(run.id))

        for tmpl_file in batch.templates:
            src_path = os.path.join(batch.templatedir, tmpl_file)
            dst_path = shutil.copy(src_path, os.path.join(run.dir, tmpl_file))
            logging.info("Re-copied {} to {} for template regeneration".
                         format(src_path, dst_path))
    else:
        if os.path.exists(run.dir):
            raise TemplatingError("Run directory {} already exists".
                                  format(run.dir))

        os.makedirs(run.dir, mode=0o775)

        cmd = "rsync -aXE {}/ {}/".format(batch.templatedir, run.dir)
        logging.info(cmd)
        proc = await asyncio.create_subprocess_exec(*shlex.split(cmd))
        rc = await proc.wait()

        if rc != 0:
            raise TemplatingError("Could not grab template directory {} to {}".
                                  format(batch.templatedir, run.dir))

process_templates(run, template_list)

Render templates based on provided context.

Parameters:

Name Type Description Default
run object

Specific run configuration.

required
template_list list

Paths to template sources.

required

Raises:

Type Description
TemplatingError

If cannot template using the provided format.

Source code in build/lib/model_ensembler/templates.py
def process_templates(run, template_list):
    """Render templates based on provided context.

    Args:
        run (object): Specific run configuration.
        template_list (list): Paths to template sources.

    Raises:
        TemplatingError: If cannot template using the provided format.
    """
    for tmpl_file in template_list:
        if tmpl_file[-3:] != ".j2":
            raise TemplatingError("{} doe not appear to be a Jinja2 template "
                                  "(.j2)".format(tmpl_file))

        try:
            tmpl_path = os.path.join(run.dir, tmpl_file)
            with open(tmpl_path, "r") as fh:
                tmpl_data = fh.read()

            dst_file = tmpl_path[:-3]
            logging.info("Templating {} to {}".format(tmpl_path, dst_file))
            tmpl = jinja2.Template(tmpl_data)
            dst_data = tmpl.render(run=run)
            with open(dst_file, "w+") as fh:
                fh.write(dst_data)
            os.chmod(dst_file, os.stat(tmpl_path).st_mode)
            os.unlink(tmpl_path)
        except OSError:
            raise TemplatingError("Could not template {}".format(tmpl_file))

utils

Arguments

Bases: object

Singleton implementation of the arguments as an immutable object

Source code in build/lib/model_ensembler/utils.py
class Arguments(object):
    """Singleton implementation of the arguments as an immutable object"""

    class __Arguments(object):
        """Arguments inner singleton"""

        def __init__(self, **kwargs):
            for k, v in kwargs.items():
                setattr(self, k, v)

    instance = None

    def __init__(self, **kwargs):
        if not Arguments.instance:
            Arguments.instance = Arguments.__Arguments(**kwargs)

    def __getattr__(self, item):
        return getattr(self.instance, item)
__Arguments

Bases: object

Arguments inner singleton

Source code in build/lib/model_ensembler/utils.py
class __Arguments(object):
    """Arguments inner singleton"""

    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)

background_fork(double=False)

Allows for the calling process to fork into the background.

Parameters:

Name Type Description Default
double bool

If true, we'll fork again as to allow the parent of the first child to kill it, leaving the daemon process.

False

Raises:

Type Description
OSError

If fork fails.

Source code in build/lib/model_ensembler/utils.py
def background_fork(double=False):
    """Allows for the calling process to fork into the background.

    Args:
        double (bool): If true, we'll fork again as to allow the parent of
                       the first child to kill it, leaving the daemon process.

    Raises:
        OSError: If fork fails.
    """
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError as e:
        print("Fork failed: {} ({})".format(e.errno, e.strerror))
        sys.exit(1)

    os.setsid()

    if double:
        background_fork()

setup_logging(name='', level=logging.INFO, verbose=False, logdir=os.path.join('logs'), logformat='[%(asctime)-20s :%(levelname)-8s] - %(message)s')

Sets up logging library for output with user friendly options.

Parameters:

Name Type Description Default
name string

A string indicating the name of the caller.

''
level int

logging level enum.

INFO
verbose bool

Shorthand for setting level to logging.DEBUG.

False
logdir string

If logging to a file, allow directory destination.

join('logs')
logformat string

Allow specification of logging format string.

'[%(asctime)-20s :%(levelname)-8s] - %(message)s'
Source code in build/lib/model_ensembler/utils.py
def setup_logging(name='',
                  level=logging.INFO,
                  verbose=False,
                  logdir=os.path.join("logs"),
                  logformat="[%(asctime)-20s :%(levelname)-8s] - %(message)s",
                  ):
    """Sets up `logging` library for output with user friendly options.

    Args:
        name (string): A string indicating the name of the caller.
        level (int): `logging` level enum.
        verbose (bool): Shorthand for setting level to `logging.DEBUG`.
        logdir (string): If logging to a file, allow directory destination.
        logformat (string): Allow specification of `logging` format string.
    """
    if verbose:
        level = logging.DEBUG

    if not os.path.exists(logdir):
        os.makedirs(logdir)

    logging.basicConfig(
        level=level,
        format=logformat,
        datefmt="%d-%m-%y %T",
    )

    if logdir:
        file_handler = logging.handlers.TimedRotatingFileHandler(
            os.path.join(logdir, "{}.log".format(name)),
            when='midnight',
            utc=True
        )
        file_handler.setLevel(level)
        file_formatter = logging.Formatter(
            fmt='%(asctime)-25s%(levelname)-17s%(message)s',
            datefmt='%H:%M:%S'
        )
        file_handler.setFormatter(file_formatter)
        logging.getLogger().addHandler(file_handler)