Executors

libEnsemble’s Executors can be used within user functions to provide a simple, portable interface for running and managing user applications.

The Executor provides a portable interface for running applications on any system and any number of compute resources.

Detailed description

An Executor interface is provided by libEnsemble to remove the burden of system interaction from the user and improve workflow portability. Users first register their applications to Executor instances, which then return corresponding Task objects upon submission within user functions.

Task attributes and retrieval functions can be queried to determine the status of running application instances. Functions are also provided to access and interrogate files in the task’s working directory.

libEnsemble’s Executors and Tasks contain many familiar features and methods to Python’s native concurrent futures interface. Executors feature the submit() function for launching apps (detailed below), but currently do not support map() or shutdown(). Tasks are much like futures. They feature the cancel(), cancelled(), running(), done(), result(), and exception() functions from the standard.

The main Executor class can subprocess serial applications in place, while the MPIExecutor is used for running MPI applications.

Typically, users choose and parameterize their Executor objects in their calling scripts, where each executable generator or simulation application is registered to it. Once in the user-side worker code (sim/gen func), the Executor can be retrieved without any need to specify the type.

Once the Executor is retrieved, tasks can be submitted by specifying the app_name from registration in the calling script alongside other optional parameters described in the API.

To set up an MPI executor, register an MPI application, and add to the ensemble object.

from libensemble import Ensemble
from libensemble.executors import MPIExecutor

exctr = MPIExecutor()
exctr.register_app(full_path="/path/to/my/exe", app_name="sim1")
ensemble = Ensemble(executor=exctr)

In user simulation function:

def sim_func(H, persis_info, sim_specs, libE_info):

    input_param = str(int(H["x"][0][0]))
    exctr = libE_info["executor"]

    task = exctr.submit(
        app_name="sim1",
        num_procs=8,
        app_args=input_param,
        stdout="out.txt",
        stderr="err.txt",
    )

    # Wait for task to complete
    task.wait()

Example use-cases:

See Running on HPC Systems for illustrations of how common options such as libE_specs["dedicated_mode"] affect the run configuration on clusters and supercomputers.

Example of polling output and killing application:

In simulation function (sim_f).

import time


def sim_func(H, persis_info, sim_specs, libE_info):
    input_param = str(int(H["x"][0][0]))
    exctr = libE_info["executor"]

    task = exctr.submit(
        app_name="sim1",
        num_procs=8,
        app_args=input_param,
        stdout="out.txt",
        stderr="err.txt",
    )

    timeout_sec = 600
    poll_delay_sec = 1

    while not task.finished:
        # Has manager sent a finish signal
        if exctr.manager_kill_received():
            task.kill()
            my_cleanup()

        # Check output file for error and kill task
        elif task.stdout_exists():
            if "Error" in task.read_stdout():
                task.kill()

        elif task.runtime > timeout_sec:
            task.kill()  # Timeout

        else:
            time.sleep(poll_delay_sec)
            task.poll()

    print(task.state)  # state may be finished/failed/killed

Users who wish to poll only for manager kill signals and timeouts don’t necessarily need to construct a polling loop like above, but can instead use the Executor built-in polling_loop() method. An alternative to the above simulation function may resemble:

def sim_func(H, persis_info, sim_specs, libE_info):
    input_param = str(int(H["x"][0][0]))
    exctr = libE_info["executor"]

    task = exctr.submit(
        app_name="sim1",
        num_procs=8,
        app_args=input_param,
        stdout="out.txt",
        stderr="err.txt",
    )

    timeout_sec = 600
    poll_delay_sec = 1

    exctr.polling_loop(task, timeout=timeout_sec, delay=poll_delay_sec)

    print(task.state)  # state may be finished/failed/killed

The MPIExecutor autodetects system criteria such as the appropriate MPI launcher and mechanisms to poll and kill tasks. It also has access to the resource manager, which partitions resources among workers, ensuring that runs utilize different resources (e.g., nodes). Furthermore, the MPIExecutor offers resilience via the feature of re-launching tasks that fail to start because of system factors.

This module contains the classes Executor and Task. An executor can create and manage multiple tasks. Task attributes are queried to determine status.

Only for running local serial-launched applications. To run MPI applications and use detected resources, use the MPI Executor tab.

class libensemble.executors.executor.Executor

The executor can create, poll and kill runnable tasks

Class Attributes:

Variables:

Executor – executor: The executor object is stored here and can be retrieved in user functions.

__init__()

Instantiate a new Executor instance.

Returns:

A new Executor object is created. This is typically created in the user calling script.

Return type:

Executor

register_app(full_path, app_name=None, calc_type=None, desc=None, precedent='')

Registers a user application to libEnsemble.

The full_path of the application must be supplied. Either app_name or calc_type can be used to identify the application in user scripts (in the submit function). app_name is recommended.

Parameters:
  • full_path (str) – The full path of the user application to be registered

  • app_name (str, Optional) – Name to identify this application.

  • calc_type (str, Optional) – Calculation type: Set this application as the default ‘sim’ or ‘gen’ function.

  • desc (str, Optional) – Description of this application

  • precedent (str, Optional) – Any str that should directly precede the application full path. Supports the placeholder %LIBENSEMBLE_SIM_DIR% which is replaced at runtime with the simulation directory as a relative path from where the executor was created. This is useful for container exec commands.

Return type:

None

manager_poll()

Polls for a manager signal

The executor manager_signal attribute will be updated.

Return type:

int | None

manager_kill_received()

Return True if received kill signal from the manager

Return type:

bool

polling_loop(task, timeout=None, delay=0.1, poll_manager=False)

Blocking, generic task status polling loop. Operates until the task finishes, times out, or is killed via a manager signal. On completion, returns a presumptive calc_status integer. Useful for running an application via the Executor until it stops without monitoring its intermediate output.

Parameters:
  • task (object) – a Task object returned by the executor on submission

  • timeout (int, Optional) – Maximum number of seconds for the polling loop to run. Tasks that run longer than this limit are killed. Default: No timeout

  • delay (int, Optional) – Sleep duration between polling loop iterations. Default: 0.1 seconds

  • poll_manager (bool, Optional) – Whether to also poll the manager for ‘finish’ or ‘kill’ signals. If detected, the task is killed. Default: False.

Returns:

calc_status – presumptive integer attribute describing the final status of a launched task

Return type:

int

submit(calc_type=None, app_name=None, app_args=None, stdout=None, stderr=None, dry_run=False, wait_on_start=False, env_script=None)

Create a new task and run as a local serial subprocess.

Returns task object.

Parameters:
  • calc_type (str, Optional) – The calculation type: ‘sim’ or ‘gen’ Only used if app_name is not supplied. Uses default sim or gen application.

  • app_name (str, Optional) – The application name. Must be supplied if calc_type is not.

  • app_args (str, Optional) – A str of the application arguments to be added to task submit command line

  • stdout (str, Optional) – A standard output filename

  • stderr (str, Optional) – A standard error filename

  • dry_run (bool, Optional) – Whether this is a dry_run - no task will be launched; instead runline is printed to logger (at INFO level)

  • wait_on_start (bool, Optional) – Whether to wait for task to be polled as RUNNING (or other active/end state) before continuing. If an integer N is supplied, wait at most N seconds.

  • env_script (str, Optional) – The full path of a shell script to set up the environment for the launched task. This will be run in the subprocess, and not affect the worker environment. The script should start with a shebang.

Returns:

task – The launched task object

Return type:

Task

Tasks are created and returned by the Executor’s submit(). Tasks can be polled, killed, and waited on with the respective poll, kill, and wait functions. Task information can be queried through instance attributes and query functions.

class libensemble.executors.executor.Task(app=None, app_args=None, workdir=None, stdout=None, stderr=None, workerid=None, dry_run=False)

Manages the creation, configuration and status of a launchable task

workdir_exists()

Returns true if the task’s workdir exists

Return type:

bool | None

file_exists_in_workdir(filename)

Returns true if the named file exists in the task’s workdir

Parameters:

filename (str)

Return type:

bool

read_file_in_workdir(filename)

Opens and reads the named file in the task’s workdir

Parameters:

filename (str)

Return type:

str

stdout_exists()

Returns true if the task’s stdout file exists in the workdir

Return type:

bool

read_stdout()

Opens and reads the task’s stdout file in the task’s workdir

Return type:

str

stderr_exists()

Returns true if the task’s stderr file exists in the workdir

Return type:

bool

read_stderr()

Opens and reads the task’s stderr file in the task’s workdir

Return type:

str

poll()

Polls and updates the status attributes of the task

Return type:

None

wait(timeout=None)

Waits on completion of the task or raises TimeoutExpired exception

Status attributes of task are updated on completion.

Parameters:

timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.

Return type:

None

result(timeout=None)

Wrapper for task.wait() that also returns the task’s status on completion.

Parameters:

timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.

Return type:

str

exception(timeout=None)

Wrapper for task.wait() that instead returns the task’s error code on completion.

Parameters:

timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.

running()

Return True if task is currently running.

Return type:

bool

done()

Return True if task is finished.

Return type:

bool

kill(wait_time=60)

Kills or cancels the supplied task

Parameters:

wait_time (int, Optional) – Time in seconds to wait for termination between sending SIGTERM and a SIGKILL signals.

Return type:

None

Sends SIGTERM, waits for a period of <wait_time> for graceful termination, then sends a hard kill with SIGKILL. If <wait_time> is 0, we go immediately to SIGKILL; if <wait_time> is none, we never do a SIGKILL.

cancel()

Wrapper for task.kill() without waiting

Return type:

None

cancelled()

Return `True if task successfully cancelled.

Return type:

bool

Note

These should not be set directly. Tasks are launched by the Executor, and task information can be queried through the task attributes below and the query functions.

task.state:

(string) The task status. One of (“UNKNOWN”|”CREATED”|”WAITING”|”RUNNING”|”FINISHED”|”USER_KILLED”|”FAILED”|”FAILED_TO_START”)

task.process:

(process obj) The process object used by the underlying process manager (e.g., return value of subprocess.Popen).

task.errcode:

(int) The error code (or return code) used by the underlying process manager.

task.finished:

(boolean) True means task has finished running - not whether it was successful.

task.success:

(boolean) Did task complete successfully (e.g., the return code is zero)?

task.runtime:

(int) Time in seconds that task has been running.

task.submit_time:

(int) Time since epoch that task was submitted.

task.total_time:

(int) Total time from task submission to completion (only available when task is finished).

Run configuration attributes - some will be autogenerated:

task.workdir:

(string) Work directory for the task

task.name:

(string) Name of task - autogenerated

task.app:

(app obj) Use application/executable, registered using exctr.register_app

task.app_args:

(string) Application arguments as a string

task.stdout:

(string) Name of file where the standard output of the task is written (in task.workdir)

task.stderr:

(string) Name of file where the standard error of the task is written (in task.workdir)

task.dry_run:

(boolean) True if task corresponds to dry run (no actual submission)

task.runline:

(string) Complete, parameterized command to be subprocessed to launch app

This module launches and controls the running of MPI applications.

In order to create an MPI executor, the script should contain:

from libensemble.executors.mpi_executor import MPIExecutor

exctr = MPIExecutor()

The MPIExecutor will use system resource information supplied by the libEnsemble resource manager when submitting tasks.

class libensemble.executors.mpi_executor.MPIExecutor(custom_info={})

Bases: Executor

The MPI executor can create, poll and kill runnable MPI tasks

Parameters:

custom_info (dict, Optional) – Provide custom overrides to selected variables that are usually auto-detected. See below.

custom_info usage

The MPIExecutor automatically detects MPI runners and launch mechanisms. However it is possible to override the detected information using the custom_info argument. This takes a dictionary of values.

The allowable fields are:

'mpi_runner' [string]:
    Select runner: 'mpich', 'openmpi', 'aprun', 'srun', 'jsrun', 'custom'
    All except 'custom' relate to runner classes in libEnsemble.
    Custom allows user to define their own run-lines but without parsing
    arguments or making use of auto-resources.
'runner_name' [string]:
    The literal string that appears at the front of the run command.
    This is typically 'mpirun', 'srun', etc., and can be a full path.
    Defaults exist for all runners except 'custom'.
'subgroup_launch' [bool]:
    Whether MPI runs should be initiated in a new process group. This needs
    to be correct for kills to work correctly. Use the standalone test at
    libensemble/tests/standalone_tests/kill_test to determine correct value
    for a system.

For example:

customizer = {'mpi_runner': 'mpich',
              'runner_name': 'wrapper -x mpich'}

from libensemble.executors.mpi_executor import MPIExecutor
exctr = MPIExecutor(custom_info=customizer)
submit(calc_type=None, app_name=None, num_procs=None, num_nodes=None, procs_per_node=None, num_gpus=None, machinefile=None, app_args=None, stdout=None, stderr=None, stage_inout=None, hyperthreads=False, dry_run=False, wait_on_start=False, extra_args=None, auto_assign_gpus=False, match_procs_to_gpus=False, env_script=None, mpi_runner_type=None)

Creates a new task, and either executes or schedules execution.

Returns task object.

The user must supply either the app_name or calc_type arguments (app_name is recommended). All other arguments are optional.

Parameters:
  • calc_type (str, Optional) – The calculation type: ‘sim’ or ‘gen’ Only used if app_name is not supplied. Uses default sim or gen application.

  • app_name (str, Optional) – The application name. Must be supplied if calc_type is not.

  • num_procs (int, Optional) – The total number of processes (MPI ranks)

  • num_nodes (int, Optional) – The number of nodes

  • procs_per_node (int, Optional) – The processes per node

  • num_gpus (int, Optional) – The total number of GPUs

  • machinefile (str, Optional) – Name of a machinefile

  • app_args (str, Optional) – A string of the application arguments to be added to task submit command line

  • stdout (str, Optional) – A standard output filename

  • stderr (str, Optional) – A standard error filename

  • stage_inout (str, Optional) – A directory to copy files from; default will take from current directory

  • hyperthreads (bool, Optional) – Whether to submit MPI tasks to hyperthreads

  • dry_run (bool, Optional) – Whether this is a dry_run - no task will be launched; instead runline is printed to logger (at INFO level)

  • wait_on_start (bool, Optional) – Whether to wait for task to be polled as RUNNING (or other active/end state) before continuing.

  • extra_args (str, Optional) – Additional command line arguments to supply to MPI runner. If arguments are recognized as MPI resource configuration (num_procs, num_nodes, procs_per_node) they will be used in resources determination unless also supplied in the direct options.

  • auto_assign_gpus (bool, Optional) – Auto-assign GPUs available to this worker using either the method supplied in configuration or determined by detected environment. Default: False

  • match_procs_to_gpus (bool, Optional) – For use with auto_assign_gpus. Auto-assigns MPI processors to match the assigned GPUs. Default: False unless auto_assign_gpus is True and no other CPU configuration is supplied.

  • env_script (str, Optional) – The full path of a shell script to set up the environment for the launched task. This will be run in the subprocess, and not affect the worker environment. The script should start with a shebang.

  • mpi_runner_type ((str|dict), Optional) – An MPI runner to be used for this submit only. Supply either a string for the MPI runner type or a dictionary for detailed configuration (see custom_info on MPIExecutor constructor). This will not change the default MPI runner for the executor. Example string inputs are “mpich”, “openmpi”, “srun”, “jsrun”, “aprun”.

Returns:

task – The launched task object

Return type:

Task

Note that if some combination of num_procs, num_nodes, and procs_per_node is provided, these will be honored if possible. If resource detection is on and these are omitted, then the available resources will be divided among workers.

manager_kill_received()

Return True if received kill signal from the manager

Return type:

bool

manager_poll()

Polls for a manager signal

The executor manager_signal attribute will be updated.

Return type:

int | None

polling_loop(task, timeout=None, delay=0.1, poll_manager=False)

Blocking, generic task status polling loop. Operates until the task finishes, times out, or is killed via a manager signal. On completion, returns a presumptive calc_status integer. Useful for running an application via the Executor until it stops without monitoring its intermediate output.

Parameters:
  • task (object) – a Task object returned by the executor on submission

  • timeout (int, Optional) – Maximum number of seconds for the polling loop to run. Tasks that run longer than this limit are killed. Default: No timeout

  • delay (int, Optional) – Sleep duration between polling loop iterations. Default: 0.1 seconds

  • poll_manager (bool, Optional) – Whether to also poll the manager for ‘finish’ or ‘kill’ signals. If detected, the task is killed. Default: False.

Returns:

calc_status – presumptive integer attribute describing the final status of a launched task

Return type:

int

register_app(full_path, app_name=None, calc_type=None, desc=None, precedent='')

Registers a user application to libEnsemble.

The full_path of the application must be supplied. Either app_name or calc_type can be used to identify the application in user scripts (in the submit function). app_name is recommended.

Parameters:
  • full_path (str) – The full path of the user application to be registered

  • app_name (str, Optional) – Name to identify this application.

  • calc_type (str, Optional) – Calculation type: Set this application as the default ‘sim’ or ‘gen’ function.

  • desc (str, Optional) – Description of this application

  • precedent (str, Optional) – Any str that should directly precede the application full path. Supports the placeholder %LIBENSEMBLE_SIM_DIR% which is replaced at runtime with the simulation directory as a relative path from where the executor was created. This is useful for container exec commands.

Return type:

None

Class-specific attributes can be set directly to alter the behavior of the MPI Executor. However, they should be used with caution, because they may not be implemented in other executors.

max_submit_attempts:

(int) Maximum number of launch attempts for a given task. Default: 5.

fail_time:

(int or float) Only if wait_on_start is set. Maximum run time to failure in seconds that results in relaunch. Default: 2.

retry_delay_incr:

(int or float) Delay increment between launch attempts in seconds. Default: 5. (i.e., First retry after 5 seconds, then 10 seconds, then 15, etc…)

Example. To increase resilience against submission failures:

taskctrl = MPIExecutor()
taskctrl.max_launch_attempts = 8
taskctrl.fail_time = 5
taskctrl.retry_delay_incr = 10