Skip to content

cluster

dummy

current_jobs(ctx, match) async

Dummy method to find current jobs.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
match str

Jobs to match the job list with.

required

Returns:

Type Description
list

Current jobs.

Source code in model_ensembler/cluster/dummy.py
async def current_jobs(ctx, match):
    """Dummy method to find current jobs.

    Args:
        ctx (object): Context object for retrieving configuration.
        match (str): Jobs to match the job list with.

    Returns:
        (list): Current jobs.
    """
    global _jobs

    job_arr = [el for el in _jobs.values()
               if el.name.startswith(match)
               and el.state in START_STATES]

    return job_arr

find_id(job_id) async

Dummy method to find local job id.

Parameters:

Name Type Description Default
job_id int

Local job identifier.

required

Returns:

Type Description
int

job id.

Raises:

Type Description
LookupError

If job id not found.

Source code in model_ensembler/cluster/dummy.py
async def find_id(job_id):
    """Dummy method to find local job id.

    Args:
        job_id (int): Local job identifier.

    Returns:
        (int): job id.

    Raises:
        LookupError: If job id not found.
    """
    global _jobs

    job = None
    job_arr = [el for el in _jobs.values() if el.name == job_id]

    if len(job_arr) == 1:
        job = job_arr[0]
    elif len(job_arr) > 1:
        raise LookupError("{} jobs found for ID {}".format(len(job_arr),
                                                           job_id))
    return job

submit_job(ctx, script=None) async

Dummy method to submit job locally.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
script str

Script name to submit.

None

Returns:

Type Description
int

Job ID.

Source code in model_ensembler/cluster/dummy.py
async def submit_job(ctx, script=None):
    """Dummy method to submit job locally.

    Args:
        ctx (object): Context object for retrieving configuration.
        script (str): Script name to submit.

    Returns:
        (int): Job ID.
    """
    # TODO: ugh, we could use contextvars for this
    global _jobs
    args = Arguments()

    max_submit_sleep = args.max_stagger
    sleep_for = random.randint(0, max_submit_sleep)
    logging.debug(
        "Sleeping for {} seconds before submission".format(sleep_for))
    await asyncio.sleep(sleep_for)

    with _dict_lock:
        _jobs[ctx.dir] = Job(ctx.id, "SUBMITTED", False, False)

    threading.Thread(target=threaded_job, args=(ctx.dir, script)).start()
    return ctx.id

threaded_job(run_dir, script)

Dummy method to set off local job

Parameters:

Name Type Description Default
run_dir str

Directory script is running in.

required
script str

Name of script to run.

required
Source code in model_ensembler/cluster/dummy.py
def threaded_job(run_dir, script):
    """Dummy method to set off local job

    Args:
        run_dir (str): Directory script is running in.
        script (str): Name of script to run.
    """
    global _jobs

    with _dict_lock:
        _jobs[run_dir] = Job(_jobs[run_dir].name,
                             "RUNNING",
                             True,
                             False)

    logging.info("DUMMY RUN: {} - {}".format(run_dir, _jobs[run_dir]))
    subprocess.run("./{}".format(script), cwd=run_dir)

    # TODO: failed
    with _dict_lock:
        _jobs[run_dir] = Job(_jobs[run_dir].name,
                             "COMPLETED",
                             True,
                             True)

slurm

current_jobs(ctx, match) async

Method to get list of current jobs

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
match str

Jobs to match the job list with.

required

Returns:

Type Description
list

Filtered jobs.

Source code in model_ensembler/cluster/slurm.py
async def current_jobs(ctx, match):
    """Method to get list of current jobs

    Args:
        ctx (object): Context object for retrieving configuration.
        match (str): Jobs to match the job list with.

    Returns:
        (list): Filtered jobs.
    """
    filtered_jobs = None

    # Ensure we account for empty lists
    while not filtered_jobs and filtered_jobs is None:
        try:
            res = await execute_command("squeue -o \"%j,%T\" -h -p {}".
                                        format(ctx.cluster),
                                        cwd=ctx.dir)
            output = res.stdout.decode()
        except Exception as e:
            logging.warning("Could not retrieve list: {}".format(e))
        else:
            jobs = []
            for line in output.split():
                fields = line.strip().split(",")
                jobs.append({"name": fields[0], "job_state": fields[1]})

            filtered_jobs = [{"name": j['name'], "state": j["job_state"]}
                             for j in jobs
                             if j['name'].startswith(match)
                             and j['job_state'] in START_STATES]

            logging.debug("SLURM JOBS result: {}".
                          format(pformat(filtered_jobs)))

    return filtered_jobs

find_id(job_id) async

Method to find SLURM job by ID.

This method provides an interface to the squeue SLURM queue utility to identify a job and return it along with it's state.

Parameters:

Name Type Description Default
job_id int

SLURM job identifier.

required

Returns:

Name Type Description
jobs list

Job objects including name and state.

Raises:

Type Description
ValueError

If cannot retrieve job from list.

Source code in model_ensembler/cluster/slurm.py
async def find_id(job_id):
    """Method to find SLURM job by ID.

    This method provides an interface to the squeue SLURM queue utility to
    identify a job and return it along with it's state.

    Args:
        job_id (int): SLURM job identifier.

    Returns:
        jobs (list): Job objects including name and state.

    Raises:
        ValueError: If cannot retrieve job from list.
    """
    job = None
    args = Arguments()

    while not job:
        try:
            res = await execute_command("sacct -XnP -j {} "
                                        "-o jobname,state,start,end".
                                        format(job_id))
            output = res.stdout.decode().strip()
            (name, state, started, finished) = output.split("|")
        except ValueError:
            logging.debug("Could not retrieve job from list")
            await asyncio.sleep(args.check_timeout)
        else:
            job = Job(
                name=name,
                state=state.split()[0],
                started=started == "Unknown",
                finished=finished == "Unknown"
            )

    logging.debug("SLURM find result name: {}".format(job.name))
    return job

submit_job(ctx, script=None) async

Method to submit jobs to SLURM.

Parameters:

Name Type Description Default
ctx object

Context object for retrieving configuration.

required
script str

Script name to submit.

None

Returns:

Type Description
int

Job ID.

Source code in model_ensembler/cluster/slurm.py
async def submit_job(ctx, script=None):
    """Method to submit jobs to SLURM.

    Args:
        ctx (object): Context object for retrieving configuration.
        script (str): Script name to submit.

    Returns:
        (int): Job ID.
    """
    r_sbatch_id = re.compile(r'Submitted batch job (\d+)$')
    args = Arguments()

    # Don't smash the scheduler immediately, it appears to have the potential
    # to cause problems.
    sleep_for = random.randint(0, args.max_stagger)
    logging.debug(
        "Sleeping for {} seconds before submission".format(sleep_for))
    await asyncio.sleep(sleep_for)
    res = await execute_command("sbatch --no-requeue {}".format(script),
                                cwd=ctx.dir)
    output = res.stdout.decode()

    sbatch_match = r_sbatch_id.match(output)
    if sbatch_match:
        job_id = sbatch_match.group(1)
        logging.info("Submitted job with ID {}".format(job_id))

        return int(job_id)
    return None