Executors¶
libEnsemble’s Executors can be used within user functions to provide a simple, portable interface for running and managing user applications.
The Executor provides a portable interface for running applications on any system and any number of compute resources.
Detailed description
An Executor interface is provided by libEnsemble to remove the burden
of system interaction from the user and improve workflow portability. Users
first register their applications to Executor instances, which then return
corresponding Task objects upon submission within user functions.
Task attributes and retrieval functions can be queried to determine the status of running application instances. Functions are also provided to access and interrogate files in the task’s working directory.
libEnsemble’s Executors and Tasks contain many familiar features and methods
to Python’s native concurrent futures interface. Executors feature the
submit() function for launching apps (detailed below), but currently do
not support map() or shutdown(). Tasks are much like futures.
They feature the cancel(), cancelled(), running(), done(),
result(), and exception() functions from the standard.
The main Executor class can subprocess serial applications in place,
while the MPIExecutor is used for running MPI applications.
Typically, users choose and parameterize their Executor objects in their
calling scripts, where each executable generator or simulation application is
registered to it. Once in the user-side worker code (sim/gen func), the Executor
can be retrieved without any need to specify the type.
Once the Executor is retrieved, tasks can be submitted by specifying the
app_name from registration in the calling script alongside other optional
parameters described in the API.
To set up an MPI executor, register an MPI application, and add to the ensemble object.
from libensemble import Ensemble
from libensemble.executors import MPIExecutor
exctr = MPIExecutor()
exctr.register_app(full_path="/path/to/my/exe", app_name="sim1")
ensemble = Ensemble(executor=exctr)
In user simulation function:
def sim_func(H, persis_info, sim_specs, libE_info):
input_param = str(int(H["x"][0][0]))
exctr = libE_info["executor"]
task = exctr.submit(
app_name="sim1",
num_procs=8,
app_args=input_param,
stdout="out.txt",
stderr="err.txt",
)
# Wait for task to complete
task.wait()
Example use-cases:
Electrostatic Forces example: Launches the
forces.xMPI application.Forces example with GPUs: Auto-assigns GPUs via executor.
See Running on HPC Systems for illustrations
of how common options such as libE_specs["dedicated_mode"] affect the
run configuration on clusters and supercomputers.
Example of polling output and killing application:
In simulation function (sim_f).
import time
def sim_func(H, persis_info, sim_specs, libE_info):
input_param = str(int(H["x"][0][0]))
exctr = libE_info["executor"]
task = exctr.submit(
app_name="sim1",
num_procs=8,
app_args=input_param,
stdout="out.txt",
stderr="err.txt",
)
timeout_sec = 600
poll_delay_sec = 1
while not task.finished:
# Has manager sent a finish signal
if exctr.manager_kill_received():
task.kill()
my_cleanup()
# Check output file for error and kill task
elif task.stdout_exists():
if "Error" in task.read_stdout():
task.kill()
elif task.runtime > timeout_sec:
task.kill() # Timeout
else:
time.sleep(poll_delay_sec)
task.poll()
print(task.state) # state may be finished/failed/killed
Users who wish to poll only for manager kill signals and timeouts don’t necessarily
need to construct a polling loop like above, but can instead use the Executor
built-in polling_loop() method. An alternative to the above simulation function
may resemble:
def sim_func(H, persis_info, sim_specs, libE_info):
input_param = str(int(H["x"][0][0]))
exctr = libE_info["executor"]
task = exctr.submit(
app_name="sim1",
num_procs=8,
app_args=input_param,
stdout="out.txt",
stderr="err.txt",
)
timeout_sec = 600
poll_delay_sec = 1
exctr.polling_loop(task, timeout=timeout_sec, delay=poll_delay_sec)
print(task.state) # state may be finished/failed/killed
The MPIExecutor autodetects system criteria such as the appropriate MPI launcher
and mechanisms to poll and kill tasks. It also has access to the resource manager,
which partitions resources among workers, ensuring that runs utilize different
resources (e.g., nodes). Furthermore, the MPIExecutor offers resilience via the
feature of re-launching tasks that fail to start because of system factors.
This module contains the classes Executor and Task. An executor
can create and manage multiple tasks. Task attributes are queried to
determine status.
Only for running local serial-launched applications. To run MPI applications and use detected resources, use the MPI Executor tab.
- class libensemble.executors.executor.Executor¶
The executor can create, poll and kill runnable tasks
Class Attributes:
- Variables:
Executor – executor: The executor object is stored here and can be retrieved in user functions.
- __init__()¶
Instantiate a new Executor instance.
- Returns:
A new Executor object is created. This is typically created in the user calling script.
- Return type:
- register_app(full_path, app_name=None, calc_type=None, desc=None, precedent='')¶
Registers a user application to libEnsemble.
The
full_pathof the application must be supplied. Eitherapp_nameorcalc_typecan be used to identify the application in user scripts (in the submit function).app_nameis recommended.- Parameters:
full_path (str) – The full path of the user application to be registered
app_name (str, Optional) – Name to identify this application.
calc_type (str, Optional) – Calculation type: Set this application as the default ‘sim’ or ‘gen’ function.
desc (str, Optional) – Description of this application
precedent (str, Optional) – Any str that should directly precede the application full path. Supports the placeholder
%LIBENSEMBLE_SIM_DIR%which is replaced at runtime with the simulation directory as a relative path from where the executor was created. This is useful for container exec commands.
- Return type:
None
- manager_poll()¶
Polls for a manager signal
The executor manager_signal attribute will be updated.
- Return type:
int | None
- manager_kill_received()¶
Return True if received kill signal from the manager
- Return type:
bool
- polling_loop(task, timeout=None, delay=0.1, poll_manager=False)¶
Blocking, generic task status polling loop. Operates until the task finishes, times out, or is killed via a manager signal. On completion, returns a presumptive calc_status integer. Useful for running an application via the Executor until it stops without monitoring its intermediate output.
- Parameters:
task (object) – a Task object returned by the executor on submission
timeout (int, Optional) – Maximum number of seconds for the polling loop to run. Tasks that run longer than this limit are killed. Default: No timeout
delay (int, Optional) – Sleep duration between polling loop iterations. Default: 0.1 seconds
poll_manager (bool, Optional) – Whether to also poll the manager for ‘finish’ or ‘kill’ signals. If detected, the task is killed. Default: False.
- Returns:
calc_status – presumptive integer attribute describing the final status of a launched task
- Return type:
int
- submit(calc_type=None, app_name=None, app_args=None, stdout=None, stderr=None, dry_run=False, wait_on_start=False, env_script=None)¶
Create a new task and run as a local serial subprocess.
Returns
taskobject.- Parameters:
calc_type (str, Optional) – The calculation type: ‘sim’ or ‘gen’ Only used if app_name is not supplied. Uses default sim or gen application.
app_name (str, Optional) – The application name. Must be supplied if calc_type is not.
app_args (str, Optional) – A str of the application arguments to be added to task submit command line
stdout (str, Optional) – A standard output filename
stderr (str, Optional) – A standard error filename
dry_run (bool, Optional) – Whether this is a dry_run - no task will be launched; instead runline is printed to logger (at INFO level)
wait_on_start (bool, Optional) – Whether to wait for task to be polled as RUNNING (or other active/end state) before continuing. If an integer N is supplied, wait at most N seconds.
env_script (str, Optional) – The full path of a shell script to set up the environment for the launched task. This will be run in the subprocess, and not affect the worker environment. The script should start with a shebang.
- Returns:
task – The launched task object
- Return type:
Tasks are created and returned by the Executor’s submit(). Tasks
can be polled, killed, and waited on with the respective poll, kill, and wait functions.
Task information can be queried through instance attributes and query functions.
- class libensemble.executors.executor.Task(app=None, app_args=None, workdir=None, stdout=None, stderr=None, workerid=None, dry_run=False)¶
Manages the creation, configuration and status of a launchable task
- workdir_exists()¶
Returns true if the task’s workdir exists
- Return type:
bool | None
- file_exists_in_workdir(filename)¶
Returns true if the named file exists in the task’s workdir
- Parameters:
filename (str)
- Return type:
bool
- read_file_in_workdir(filename)¶
Opens and reads the named file in the task’s workdir
- Parameters:
filename (str)
- Return type:
str
- stdout_exists()¶
Returns true if the task’s stdout file exists in the workdir
- Return type:
bool
- read_stdout()¶
Opens and reads the task’s stdout file in the task’s workdir
- Return type:
str
- stderr_exists()¶
Returns true if the task’s stderr file exists in the workdir
- Return type:
bool
- read_stderr()¶
Opens and reads the task’s stderr file in the task’s workdir
- Return type:
str
- poll()¶
Polls and updates the status attributes of the task
- Return type:
None
- wait(timeout=None)¶
Waits on completion of the task or raises TimeoutExpired exception
Status attributes of task are updated on completion.
- Parameters:
timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.
- Return type:
None
- result(timeout=None)¶
Wrapper for task.wait() that also returns the task’s status on completion.
- Parameters:
timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.
- Return type:
str
- exception(timeout=None)¶
Wrapper for task.wait() that instead returns the task’s error code on completion.
- Parameters:
timeout (int or float, Optional) – Time in seconds after which a TimeoutExpired exception is raised. If not set, then simply waits until completion. Note that the task is not automatically killed on timeout.
- running()¶
Return
Trueif task is currently running.- Return type:
bool
- done()¶
Return
Trueif task is finished.- Return type:
bool
- kill(wait_time=60)¶
Kills or cancels the supplied task
- Parameters:
wait_time (int, Optional) – Time in seconds to wait for termination between sending SIGTERM and a SIGKILL signals.
- Return type:
None
Sends SIGTERM, waits for a period of <wait_time> for graceful termination, then sends a hard kill with SIGKILL. If <wait_time> is 0, we go immediately to SIGKILL; if <wait_time> is none, we never do a SIGKILL.
- cancel()¶
Wrapper for task.kill() without waiting
- Return type:
None
- cancelled()¶
Return
`Trueif task successfully cancelled.- Return type:
bool
Note
These should not be set directly. Tasks are launched by the Executor, and task information can be queried through the task attributes below and the query functions.
- task.state:
(string) The task status. One of (“UNKNOWN”|”CREATED”|”WAITING”|”RUNNING”|”FINISHED”|”USER_KILLED”|”FAILED”|”FAILED_TO_START”)
- task.process:
(process obj) The process object used by the underlying process manager (e.g., return value of subprocess.Popen).
- task.errcode:
(int) The error code (or return code) used by the underlying process manager.
- task.finished:
(boolean) True means task has finished running - not whether it was successful.
- task.success:
(boolean) Did task complete successfully (e.g., the return code is zero)?
- task.runtime:
(int) Time in seconds that task has been running.
- task.submit_time:
(int) Time since epoch that task was submitted.
- task.total_time:
(int) Total time from task submission to completion (only available when task is finished).
Run configuration attributes - some will be autogenerated:
- task.workdir:
(string) Work directory for the task
- task.name:
(string) Name of task - autogenerated
- task.app:
(app obj) Use application/executable, registered using exctr.register_app
- task.app_args:
(string) Application arguments as a string
- task.stdout:
(string) Name of file where the standard output of the task is written (in task.workdir)
- task.stderr:
(string) Name of file where the standard error of the task is written (in task.workdir)
- task.dry_run:
(boolean) True if task corresponds to dry run (no actual submission)
- task.runline:
(string) Complete, parameterized command to be subprocessed to launch app
This module launches and controls the running of MPI applications.
In order to create an MPI executor, the script should contain:
from libensemble.executors.mpi_executor import MPIExecutor
exctr = MPIExecutor()
The MPIExecutor will use system resource information supplied by the libEnsemble resource manager when submitting tasks.
- class libensemble.executors.mpi_executor.MPIExecutor(custom_info={})¶
Bases:
ExecutorThe MPI executor can create, poll and kill runnable MPI tasks
- Parameters:
custom_info (dict, Optional) – Provide custom overrides to selected variables that are usually auto-detected. See below.
custom_info usage
The MPIExecutor automatically detects MPI runners and launch mechanisms. However it is possible to override the detected information using the
custom_infoargument. This takes a dictionary of values.The allowable fields are:
'mpi_runner' [string]: Select runner: 'mpich', 'openmpi', 'aprun', 'srun', 'jsrun', 'custom' All except 'custom' relate to runner classes in libEnsemble. Custom allows user to define their own run-lines but without parsing arguments or making use of auto-resources. 'runner_name' [string]: The literal string that appears at the front of the run command. This is typically 'mpirun', 'srun', etc., and can be a full path. Defaults exist for all runners except 'custom'. 'subgroup_launch' [bool]: Whether MPI runs should be initiated in a new process group. This needs to be correct for kills to work correctly. Use the standalone test at libensemble/tests/standalone_tests/kill_test to determine correct value for a system.For example:
customizer = {'mpi_runner': 'mpich', 'runner_name': 'wrapper -x mpich'} from libensemble.executors.mpi_executor import MPIExecutor exctr = MPIExecutor(custom_info=customizer)
- submit(calc_type=None, app_name=None, num_procs=None, num_nodes=None, procs_per_node=None, num_gpus=None, machinefile=None, app_args=None, stdout=None, stderr=None, stage_inout=None, hyperthreads=False, dry_run=False, wait_on_start=False, extra_args=None, auto_assign_gpus=False, match_procs_to_gpus=False, env_script=None, mpi_runner_type=None)¶
Creates a new task, and either executes or schedules execution.
Returns
taskobject.The user must supply either the app_name or calc_type arguments (app_name is recommended). All other arguments are optional.
- Parameters:
calc_type (str, Optional) – The calculation type: ‘sim’ or ‘gen’ Only used if app_name is not supplied. Uses default sim or gen application.
app_name (str, Optional) – The application name. Must be supplied if calc_type is not.
num_procs (int, Optional) – The total number of processes (MPI ranks)
num_nodes (int, Optional) – The number of nodes
procs_per_node (int, Optional) – The processes per node
num_gpus (int, Optional) – The total number of GPUs
machinefile (str, Optional) – Name of a machinefile
app_args (str, Optional) – A string of the application arguments to be added to task submit command line
stdout (str, Optional) – A standard output filename
stderr (str, Optional) – A standard error filename
stage_inout (str, Optional) – A directory to copy files from; default will take from current directory
hyperthreads (bool, Optional) – Whether to submit MPI tasks to hyperthreads
dry_run (bool, Optional) – Whether this is a dry_run - no task will be launched; instead runline is printed to logger (at INFO level)
wait_on_start (bool, Optional) – Whether to wait for task to be polled as RUNNING (or other active/end state) before continuing.
extra_args (str, Optional) – Additional command line arguments to supply to MPI runner. If arguments are recognized as MPI resource configuration (num_procs, num_nodes, procs_per_node) they will be used in resources determination unless also supplied in the direct options.
auto_assign_gpus (bool, Optional) – Auto-assign GPUs available to this worker using either the method supplied in configuration or determined by detected environment. Default: False
match_procs_to_gpus (bool, Optional) – For use with auto_assign_gpus. Auto-assigns MPI processors to match the assigned GPUs. Default: False unless auto_assign_gpus is True and no other CPU configuration is supplied.
env_script (str, Optional) – The full path of a shell script to set up the environment for the launched task. This will be run in the subprocess, and not affect the worker environment. The script should start with a shebang.
mpi_runner_type ((str|dict), Optional) – An MPI runner to be used for this submit only. Supply either a string for the MPI runner type or a dictionary for detailed configuration (see custom_info on MPIExecutor constructor). This will not change the default MPI runner for the executor. Example string inputs are “mpich”, “openmpi”, “srun”, “jsrun”, “aprun”.
- Returns:
task – The launched task object
- Return type:
Note that if some combination of num_procs, num_nodes, and procs_per_node is provided, these will be honored if possible. If resource detection is on and these are omitted, then the available resources will be divided among workers.
- manager_kill_received()¶
Return True if received kill signal from the manager
- Return type:
bool
- manager_poll()¶
Polls for a manager signal
The executor manager_signal attribute will be updated.
- Return type:
int | None
- polling_loop(task, timeout=None, delay=0.1, poll_manager=False)¶
Blocking, generic task status polling loop. Operates until the task finishes, times out, or is killed via a manager signal. On completion, returns a presumptive calc_status integer. Useful for running an application via the Executor until it stops without monitoring its intermediate output.
- Parameters:
task (object) – a Task object returned by the executor on submission
timeout (int, Optional) – Maximum number of seconds for the polling loop to run. Tasks that run longer than this limit are killed. Default: No timeout
delay (int, Optional) – Sleep duration between polling loop iterations. Default: 0.1 seconds
poll_manager (bool, Optional) – Whether to also poll the manager for ‘finish’ or ‘kill’ signals. If detected, the task is killed. Default: False.
- Returns:
calc_status – presumptive integer attribute describing the final status of a launched task
- Return type:
int
- register_app(full_path, app_name=None, calc_type=None, desc=None, precedent='')¶
Registers a user application to libEnsemble.
The
full_pathof the application must be supplied. Eitherapp_nameorcalc_typecan be used to identify the application in user scripts (in the submit function).app_nameis recommended.- Parameters:
full_path (str) – The full path of the user application to be registered
app_name (str, Optional) – Name to identify this application.
calc_type (str, Optional) – Calculation type: Set this application as the default ‘sim’ or ‘gen’ function.
desc (str, Optional) – Description of this application
precedent (str, Optional) – Any str that should directly precede the application full path. Supports the placeholder
%LIBENSEMBLE_SIM_DIR%which is replaced at runtime with the simulation directory as a relative path from where the executor was created. This is useful for container exec commands.
- Return type:
None
Class-specific attributes can be set directly to alter the behavior of the MPI Executor. However, they should be used with caution, because they may not be implemented in other executors.
- max_submit_attempts:
(int) Maximum number of launch attempts for a given task. Default: 5.
- fail_time:
(int or float) Only if wait_on_start is set. Maximum run time to failure in seconds that results in relaunch. Default: 2.
- retry_delay_incr:
(int or float) Delay increment between launch attempts in seconds. Default: 5. (i.e., First retry after 5 seconds, then 10 seconds, then 15, etc…)
Example. To increase resilience against submission failures:
taskctrl = MPIExecutor()
taskctrl.max_launch_attempts = 8
taskctrl.fail_time = 5
taskctrl.retry_delay_incr = 10