5. API Reference¶
5.1. Sessions and Security Contexts¶
5.1.1. Sessions¶
- class radical.pilot.Session(dburl=None, uid=None, cfg=None, _primary=True, **close_options)[source]¶
A Session is the root object of all RP objects in an application instance: it holds
radical.pilot.PilotManagerandradical.pilot.TaskManagerinstances which in turn holdradical.pilot.Pilotandradical.pilot.Taskinstances, and several other components which operate on those stateful entities.- __init__(dburl=None, uid=None, cfg=None, _primary=True, **close_options)[source]¶
Creates a new session. A new Session instance is created and stored in the database.
- Arguments:
dburl (string): The MongoDB URL. If none is given, RP uses the environment variable RADICAL_PILOT_DBURL. If that is not set, an error will be raised.
cfg (str or dict): a named or instantiated configuration to be used for the session.
uid (string): Create a session with this UID. Session UIDs MUST be unique - otherwise they will lead to conflicts in the underlying database, resulting in undefined behaviours (or worse).
_primary (bool): only sessions created by the original application process (via rp.Session(), will connect to the DB. Secondary session instances are instantiated internally in processes spawned (directly or indirectly) by the initial session, for example in some of it’s components. A secondary session will inherit the original session ID, but will not attempt to create a new DB collection - if such a DB connection is needed, the component needs to establish that on its own.
If additional key word arguments are provided, they will be used as the default arguments to Session.close(). (This can be useful when the Session is used as a Python context manager, such that close() is called automatically at the end of a
withblock.)
- close(**kwargs)[source]¶
Closes the session. All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True, the session data is removed from the database.
- Arguments:
cleanup (bool): Remove session from MongoDB (implies * terminate)
terminate (bool): Shut down all pilots associated with the session.
download (bool): Fetch pilot profiles and database entries.
- property closed¶
Returns the time of closing
- property connected¶
Return time when the session connected to the DB
- property created¶
Returns the UTC date and time the session was created.
- get_pilot_managers(pmgr_uids=None)[source]¶
returns known PilotManager(s).
Arguments:
pmgr_uids [string]: unique identifier of the PilotManager we want
- Returns:
One or more [
radical.pilot.PilotManager] objects.
- get_resource_config(resource, schema=None)[source]¶
Returns a dictionary of the requested resource config
- get_task_managers(tmgr_uids=None)[source]¶
returns known TaskManager(s).
Arguments:
tmgr_uids [string]: unique identifier of the TaskManager we want
- Returns:
One or more [
radical.pilot.TaskManager] objects.
- inject_metadata(metadata)[source]¶
Insert (experiment) metadata into an active session RP stack version info always get added.
- list_pilot_managers()[source]¶
Lists the unique identifiers of all
radical.pilot.PilotManagerinstances associated with this session.- Returns:
A list of
radical.pilot.PilotManageruids (list of strings).
- list_resources()[source]¶
Returns a list of known resource labels which can be used in a pilot description.
- list_task_managers()[source]¶
Lists the unique identifiers of all
radical.pilot.TaskManagerinstances associated with this session.- Returns:
A list of
radical.pilot.TaskManageruids (list of strings).
5.1.2. Security Contexts¶
See also
5.2. Pilots and PilotManagers¶
5.2.1. PilotManagers¶
- class radical.pilot.PilotManager(session, uid=None, cfg='default')[source]¶
A PilotManager manages
rp.Pilotinstances that are submitted via theradical.pilot.PilotManager.submit_pilots()method.It is possible to attach one or more Using Local and Remote HPC Resources to a PilotManager to outsource machine specific configuration parameters to an external configuration file.
Example:
s = rp.Session(database_url=DBURL) pm = rp.PilotManager(session=s) pd = rp.PilotDescription() pd.resource = "futuregrid.alamo" pd.cpus = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' tasks tasks = [] for task_count in range(0, 128): t = rp.TaskDescription() t.executable = "/bin/sleep" t.arguments = ['60'] tasks.append(t) # Combine the two pilots, the workload and a scheduler via # a TaskManager. tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) tm.add_pilot(p1) tm.submit_tasks(tasks)
The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired.
NOTE: State notifications can arrive out of order wrt the pilot state model!
- __init__(session, uid=None, cfg='default')[source]¶
Creates a new PilotManager and attaches is to the session.
- Arguments:
session [
rp.Session]: The session instance to use.uid (string): ID for pilot manager, to be used for reconnect
cfg (dict or string): The configuration or name of configuration to use.
- Returns:
A new PilotManager object [
rp.PilotManager].
- cancel_pilots(uids=None, _timeout=None)[source]¶
Cancel one or more
rp.Pilots.- Arguments:
uids [string or list of strings]: The IDs of the pilot objects to cancel.
- close(terminate=True)[source]¶
Shut down the PilotManager and all its components.
- Arguments:
terminate [bool]: cancel non-final pilots if True (default) NOTE: pilots cannot be reconnected to after termination
- get_pilots(uids=None)[source]¶
Returns one or more pilots identified by their IDs.
- Arguments:
uids [string or list of strings]: The IDs of the pilot objects to return.
- Returns:
A list of
rp.Pilotobjects.
- list_pilots()[source]¶
Returns the UIDs of the
rp.Pilotsmanaged by this pilot manager.- Returns:
A list of
rp.PilotUIDs [string].
- register_callback(cb, cb_data=None, metric='PILOT_STATE')[source]¶
Registers a new callback function with the PilotManager. Manager-level callbacks get called if the specified metric changes. The default metric PILOT_STATE fires the callback if any of the Pilots managed by the PilotManager change their state.
All callback functions need to have the same signature:
def cb(obj, value, cb_data)
where
objectis a handle to the object that triggered the callback,valueis the metric, anddatais the data provided on callback registration.. In the example of PILOT_STATE above, the object would be the pilot in question, and the value would be the new state of the pilot.Available metrics are:
PILOT_STATE: fires when the state of any of the pilots which are managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state.
- submit_pilots(descriptions)[source]¶
Submits on or more
rp.Pilotinstances to the pilot manager.- Arguments:
descriptions [
rp.PilotDescriptionor list ofrp.PilotDescription]: The description of the pilot instance(s) to create.
- Returns:
A list of
rp.Pilotobjects.
- property uid¶
Returns the unique id.
- wait_pilots(uids=None, state=None, timeout=None)[source]¶
Returns when one or more
rp.Pilotsreach a specific state.If pilot_uids is None, wait_pilots returns when all Pilots reach the state defined in state. This may include pilots which have previously terminated or waited upon.
Example:
# TODO -- add exampleArguments:
pilot_uids [string or list of strings] If pilot_uids is set, only the Pilots with the specified uids are considered. If pilot_uids is None (default), all Pilots are considered.
state [string] The state that Pilots have to reach in order for the call to return.
By default wait_pilots waits for the Pilots to reach a terminal state, which can be one of the following:
rp.rps.DONErp.rps.FAILEDrp.rps.CANCELED
timeout [float] Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.
5.2.2. PilotDescription¶
- class radical.pilot.PilotDescription(from_dict=None)[source]¶
A PilotDescription object describes the requirements and properties of a
radical.pilot.Pilotand is passed as a parameter toradical.pilot.PilotManager.submit_pilots()to instantiate and run a new pilot.Example:
pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = "local.localhost" pd.cores = 16 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd)
- uid¶
[type: str | default: None] A unique ID for the pilot. This attribute is optional, a unique ID will be assigned by RP if the field is not set.
- job_name¶
[type: str | default: None] The name of the job / pilot, which will be provided to radical.saga.job.Description. If not set then
uidwill be used instead.
- resource¶
[type: str | default: None] [mandatory] The key of a Using Local and Remote HPC Resources entry. If the key exists, the machine-specific configuration is loaded from the config file once the PilotDescription is passed to
radical.pilot.PilotManager.submit_pilots(). If the key doesn’t exist, an exceptionValueErroris raised.
- access_schema¶
[type: str | default: None] The key of an access mechanism to use. The valid access mechanism is defined in the resource configuration, see Using Local and Remote HPC Resources. The first one defined there is used by default, if no other is specified.
- runtime¶
[type: int | default: 10] [mandatory] The maximum run time (wall-clock time) in minutes of the pilot.
- sandbox¶
[type: str | default: None] The working (“sandbox”) directory of the pilot agent. This parameter is optional and if not set, it defaults to radical.pilot.sandbox in your home or login directory.
Warning
If you define a pilot on an HPC cluster and you want to set sandbox manually, make sure that it points to a directory on a shared filesystem that can be reached from all compute nodes.
- nodes¶
[type: int | default: 1] The number of nodes the pilot should allocate on the target resource. This parameter could be set instead of cores and gpus (and memory).
Note
If nodes is specified, gpus and cores must not be specified.
- cores¶
[type: int | default: 0] The number of cores the pilot should allocate on the target resource. This parameter could be set instead of nodes.
Note
For local pilots, you can set a number larger than the physical machine limit (corresponding resource configuration should have the attribute “fake_resources”).
Note
If cores is specified, nodes must not be specified.
- gpus¶
[type: int | default: 0] The number of gpus the pilot should allocate on the target resource.
Note
If gpus is specified, nodes must not be specified.
- memory¶
[type: int | default: 0] The total amount of physical memory the pilot (and related to it job) requires. This parameter translates into TotalPhysicalMemory at radical.saga.job.Description.
- queue¶
[type: str | default: None] The name of the job queue the pilot should get submitted to. If queue is set in the resource configuration (
resource), defining queue will override it explicitly.
- project¶
[type: str | default: None] The name of the project / allocation to charge for used CPU time. If project is set in the resource configuration (
resource), defining project will override it explicitly.
- candidate_hosts¶
[type: list | default: []] The list of host names where this pilot is allowed to start on.
- app_comm¶
[type: list | default: []] The list of names is interpreted as communication channels to start within the pilot agent, for the purpose of application communication, i.e., that tasks running on that pilot are able to use those channels to communicate amongst each other.
The names are expected to end in _queue or _pubsub, indicating the type of channel to create. Once created, tasks will find environment variables of the name RP_%s_IN and RP_%s_OUT, where %s is replaced with the given channel name (uppercased), and IN/OUT indicate the respective endpoint addresses for the created channels
- input_staging¶
[type: list | default: []] The list of files to be staged into the pilot sandbox.
- output_staging¶
[type: list | default: []] The list of files to be staged from the pilot sandbox.
- cleanup¶
[type: bool | default: False] If cleanup is set to True, the pilot will delete its entire sandbox upon termination. This includes individual Task sandboxes and all generated output data. Only log files will remain in the sandbox directory.
- exit_on_error¶
[type: bool | default: True] Flag to trigger app termination in case of the pilot failure.
- services¶
[Type: [str] | default: []] [optional] A list of commands which get started on a separate service compute node right after bootstrapping, and before any RP task is launched. That service compute node will not be used for any other tasks.
- layout¶
[type: str or dict | default: “default”] Point to a json file or an explicit (dict) description of the pilot layout: number and size of partitions and their configuration.
5.2.3. Pilots¶
- class radical.pilot.Pilot(pmgr, descr)[source]¶
A Pilot represent a resource overlay on a local or remote resource.
Note
A Pilot cannot be created directly. The factory method
radical.pilot.PilotManager.submit_pilots()has to be used instead.Example:
pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = "local.localhost" pd.cores = 2 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd)
- property description¶
Returns the description the pilot was started with, as a dictionary.
- Returns:
description (dict)
- property log¶
Returns a list of human readable [timestamp, string] tuples describing various events during the pilot’s lifetime. Those strings are not normative, only informative!
- Returns:
log (list of [timestamp, string] tuples)
- property pilot_sandbox¶
Returns the full sandbox URL of this pilot, if that is already known, or ‘None’ otherwise.
- Returns:
A string
- property pmgr¶
Returns the pilot’s manager.
- Returns:
A
PilotManager.
- prepare_env(env_name, env_spec)[source]¶
request the preparation of a task or worker environment on the target resource. This call will block until the env is created.
env_name: name of the environment to prepare (str) env_spec: specification of the environment to prepare (dict), like:
- {‘type’‘venv’,
‘version’ : ‘3.7’, ‘pre_exec’: [‘module load python’], ‘setup’ : [‘radical.pilot==1.0’, ‘pandas’]},
- {‘type’‘conda’,
‘version’ : ‘3.8’, ‘setup’ : [‘numpy’]}
- {‘type’‘conda’,
‘version’: ‘3.8’, ‘path’ : ‘/path/to/ve’, ‘setup’ : [‘numpy’]}
where the type specifies the environment type, version specifies the Python version to deploy, and setup specifies how the environment is to be prepared. If path is specified the env will be created at that path. If path is not specified, RP will place the named env in the pilot sandbox (under env/named_env_<name>). If a VE exists at that path, it will be used as is (an update is not performed). pre_exec commands are executed before env creation and setup are attempted.
Note: the version specifier is only interpreted up to minor version, sibminor and less are ignored.
- register_callback(cb, metric='PILOT_STATE', cb_data=None)[source]¶
Registers a callback function that is triggered every time the pilot’s state changes.
All callback functions need to have the same signature:
def cb(obj, state)
where
objectis a handle to the object that triggered the callback andstateis the new state of that object. If ‘cb_data’ is given, then the ‘cb’ signature changes todef cb(obj, state, cb_data)
and ‘cb_data’ are passed along.
- property resource¶
Returns the resource tag of this pilot.
- Returns:
A resource tag (string)
- property resource_details¶
Returns agent level resource information
- rpc(rpc, args)[source]¶
Send a pilot command, wait for the response, and return the result. This is basically an RPC into the pilot.
- stage_in(sds)[source]¶
Stages the content of the
staging_directivesinto the pilot’s staging area.
- property stderr¶
Returns a snapshot of the pilot’s STDERR stream.
If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.
- Returns:
stderr (string)
- property stdout¶
Returns a snapshot of the pilot’s STDOUT stream.
If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.
- Returns:
stdout (string)
- property uid¶
Returns the pilot’s unique identifier.
The uid identifies the pilot within a
PilotManager.- Returns:
A unique identifier (string).
- wait(state=None, timeout=None)[source]¶
Returns when the pilot reaches a specific state or when an optional timeout is reached.
Arguments:
state [list of strings] The
state(s)that pilot has to reach in order for the call to return.By default wait waits for the pilot to reach a final state, which can be one of the following:
radical.pilot.states.DONEradical.pilot.states.FAILEDradical.pilot.states.CANCELED
timeout [float] Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value None never times out.
5.3. Tasks and TaskManagers¶
5.3.1. TaskManager¶
- class radical.pilot.TaskManager(session, cfg='default', scheduler=None, uid=None)[source]¶
A TaskManager manages
radical.pilot.Taskinstances which represent the executable workload in RADICAL-Pilot. A TaskManager connects the Tasks with one or morePilotinstances (which represent the workload executors in RADICAL-Pilot) and a scheduler which determines whichTaskgets executed on whichPilot.Example:
s = rp.Session(database_url=DBURL) pm = rp.PilotManager(session=s) pd = rp.PilotDescription() pd.resource = "futuregrid.alamo" pd.cores = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' tasks tasks = [] for task_count in range(0, 128): t = rp.TaskDescription() t.executable = "/bin/sleep" t.arguments = ['60'] tasks.append(t) # Combine the two pilots, the workload and a scheduler via # a TaskManager. tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) tm.add_pilot(p1) tm.submit_tasks(tasks)
The task manager can issue notification on task state changes. Whenever state notification arrives, any callback registered for that notification is fired.
NOTE: State notifications can arrive out of order wrt the task state model!
- __init__(session, cfg='default', scheduler=None, uid=None)[source]¶
Creates a new TaskManager and attaches it to the session.
- Arguments:
session [
radical.pilot.Session]: The session instance to use.cfg (dict or string): The configuration or name of configuration to use.
scheduler (string): The name of the scheduler plug-in to use.
uid (string): ID for unit manager, to be used for reconnect
- Returns:
A new TaskManager object [
radical.pilot.TaskManager].
- add_pilots(pilots)[source]¶
Associates one or more pilots with the task manager.
Arguments:
pilots [
radical.pilot.Pilotor list ofradical.pilot.Pilot]: The pilot objects that will be added to the task manager.
- cancel_tasks(uids=None)[source]¶
Cancel one or more
radical.pilot.Tasks.Note that cancellation of tasks is immediate, i.e. their state is immediately set to CANCELED, even if some RP component may still operate on the tasks. Specifically, other state transitions, including other final states (DONE, FAILED) can occur after cancellation. This is a side effect of an optimization: we consider this acceptable tradeoff in the sense “Oh, that task was DONE at point of cancellation – ok, we can use the results, sure!”.
If that behavior is not wanted, set the environment variable:
export RADICAL_PILOT_STRICT_CANCEL=True
- Arguments:
uids [string or list of strings]: The IDs of the tasks objects to cancel.
- get_pilots()[source]¶
Get the pilots instances currently associated with the task manager.
- Returns:
A list of
radical.pilot.Pilotinstances.
- get_tasks(uids=None)[source]¶
Returns one or more tasks identified by their IDs.
- Arguments:
uids [string or list of strings]: The IDs of the task objects to return.
- Returns:
A list of
radical.pilot.Taskobjects.
- list_pilots()[source]¶
Lists the UIDs of the pilots currently associated with the task manager.
- Returns:
A list of
radical.pilot.PilotUIDs [string].
- list_tasks()[source]¶
Returns the UIDs of the
radical.pilot.Taskmanaged by this task manager.- Returns:
A list of
radical.pilot.TaskUIDs [string].
- register_callback(cb, cb_data=None, metric=None, uid=None)[source]¶
Registers a new callback function with the TaskManager. Manager-level callbacks get called if the specified metric changes. The default metric TASK_STATE fires the callback if any of the Tasks managed by the PilotManager change their state.
All callback functions need to have the same signature:
def cb(obj, value)
where
objectis a handle to the object that triggered the callback,valueis the metric, anddatais the data provided on callback registration.. In the example of TASK_STATE above, the object would be the task in question, and the value would be the new state of the task.If ‘cb_data’ is given, then the ‘cb’ signature changes to
def cb(obj, state, cb_data)
and ‘cb_data’ are passed unchanged.
If ‘uid’ is given, the callback will invoked only for the specified task.
Available metrics are:
TASK_STATE: fires when the state of any of the tasks which are managed by this task manager instance is changing. It communicates the task object instance and the tasks new state.
WAIT_QUEUE_SIZE: fires when the number of unscheduled tasks (i.e. of tasks which have not been assigned to a pilot for execution) changes.
- remove_pilots(pilot_ids, drain=False)[source]¶
Disassociates one or more pilots from the task manager.
After a pilot has been removed from a task manager, it won’t process any of the task manager’s tasks anymore. Calling remove_pilots doesn’t stop the pilot itself.
Arguments:
drain [boolean]: Drain determines what happens to the tasks which are managed by the removed pilot(s). If True, all tasks currently assigned to the pilot are allowed to finish execution. If False (the default), then non-final tasks will be canceled.
- property scheduler¶
Returns the scheduler name.
- submit_tasks(descriptions)[source]¶
Submits on or more
radical.pilot.Taskinstances to the task manager.- Arguments:
descriptions [
radical.pilot.TaskDescriptionor list ofradical.pilot.TaskDescription]: The description of the task instance(s) to create.
- Returns:
A list of
radical.pilot.Taskobjects.
- property uid¶
Returns the unique id.
- wait_tasks(uids=None, state=None, timeout=None)[source]¶
Returns when one or more
radical.pilot.Tasksreach a specific state.If uids is None, wait_tasks returns when all Tasks reach the state defined in state. This may include tasks which have previously terminated or waited upon.
Example:
# TODO -- add exampleArguments:
uids [string or list of strings] If uids is set, only the Tasks with the specified uids are considered. If uids is None (default), all Tasks are considered.
state [string] The state that Tasks have to reach in order for the call to return.
By default wait_tasks waits for the Tasks to reach a terminal state, which can be one of the following:
radical.pilot.rps.DONEradical.pilot.rps.FAILEDradical.pilot.rps.CANCELED
timeout [float] Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.
5.3.2. TaskDescription¶
- class radical.pilot.TaskDescription(from_dict=None)[source]¶
A TaskDescription object describes the requirements and properties of a
radical.pilot.Taskand is passed as a parameter toradical.pilot.TaskManager.submit_tasks()to instantiate and run a new task.Note
A TaskDescription MUST define at least an executable – all other elements are optional.
- uid¶
[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set.
- name¶
[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads.
- mode¶
[type: str | default: “executable”] The execution mode to be used for this task. The following modes are accepted:
TASK_EXECUTABLE: the task is spawned as an external executable via a resource specific launch method (srun, aprun, mpiexec, etc). required attributes: executable related attributes: arguments
TASK_FUNCTION: the task references a python function to be called. required attributes: function related attributes: args related attributes: kwargs
TASK_METHOD: the task references a raptor worker method to be called. required attributes: method related attributes: args related attributes: kwargs
TASK_EVAL: the task is a code snippet to be evaluated. required attributes: code
TASK_EXEC: the task is a code snippet to be exec’ed. required attributes: code
TASK_SHELL: the task is a shell command line to be run. required attributes: command
TASK_PROC: the task is a single core process to be executed. required attributes: executable related attributes: arguments
There exists a certain overlap between TASK_EXECUTABLE, TASK_SHELL and TASK_PROC modes. As a general rule, TASK_SHELL and TASK_PROC should be used for short running tasks which require a single core and no additional resources (gpus, storage, memory). TASK_EXECUTABLE should be used for all other tasks and is in fact the default. TASK_SHELL should only be used if the command to be run requires shell specific functionality (pipes, I/O redirection) which cannot easily be mapped to other task attributes.
- executable¶
[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.
- arguments¶
[type: list | default: []] The command line arguments for the given executable (list of strings).
- code¶
[type: str | default: “”] The code to run. This field is expected to contain valid python code which is executed when the task mode is TASK_EXEC or TASK_EVAL.
- function¶
[type: str | default: “”] The function to run. This field is expected to contain a python function name which can be resolved in the scope of the respective RP worker implementation (see documentation there). The task mode must be set to TASK_FUNCTION. args and kwargs are passed as function parameters.
- args¶
[type: list | default: []] Unnamed arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.
- kwargs¶
[type: dict | default: {}] Named arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.
- command¶
[type: str | default: “”] A shell command to be executed. This attribute is used for the TASK_SHELL mode.
- ranks¶
[type: int | default: 1] The number of application processes to start on CPU cores. For two ranks or more, an MPI communicator will be available to the processes.
ranks replaces the deprecated attribute cpu_processes. The attribute cpu_process_type was previously used to signal the need for an MPI communicator - that attribute is now also deprecated and will be ignored.
- cores_per_rank¶
[type: int | default: 1] The number of cpu cores each process will have available to start its own threads or processes on. By default, core refers to a physical CPU core - but if the pilot has been launched with SMT-settings > 1, core will refer to a virtual core or hyperthread instead (the name depends on the CPU vendor).
cores_per_rank replaces the deprecated attribute cpu_threads.
- threading_type¶
[type: str | default: “”] The thread type, influences startup and environment (<empty>/POSIX, OpenMP).
threading_type replaces the deprecated attribute cpu_thread_type.
- gpus_per_rank¶
[type: int | default: 0] The number of gpus made available to each rank.
gpus_per_rank replaces the deprecated attribute gpu_processes. The attributes gpu_threads and gpu_process_type are also deprecated and will be ignored.
- gpu_type¶
[type: str | default: “”] The type of GPU environment to provide to the ranks (<empty>, CUDA, ROCm).
gpu_type replaces the deprecated attribute gpu_thread_type.
- lfs_per_rank¶
[type: int | default: 0] Local File Storage per rank - amount of data (MB) required on the local file system of the node.
lfs_per_rank replaces the deprecated attribute lfs_per_process.
- mem_per_rank¶
[type: int | default: 0] Amount of physical memory required per rank.
mem_per_rank replaces the deprecated attribute mem_per_process.
- environment¶
[type: dict | default: {}] Environment variables to set in the environment before the execution (launching picked LaunchMethod).
- named_env¶
[type: str | default: “”] A named virtual environment as prepared by the pilot. The task will remain in AGENT_SCHEDULING state until that environment gets created.
- sandbox¶
[type: str | default: “”] This specifies the working directory of the task. That directory MUST be relative to the pilot sandbox. It will be created if it does not exist. By default, the sandbox has the name of the task’s uid.
- stdout¶
[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.
- stderr¶
[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.
- input_staging¶
[type: list | default: []] The files that need to be staged before the execution (list of staging directives, see below).
- output_staging¶
[type: list | default: []] The files that need to be staged after the execution (list of staging directives, see below).
- stage_on_error¶
[type: bool | default: False] Output staging is normally skipped on FAILED or CANCELED tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage.
- pre_launch¶
[type: list | default: []] Actions (shell commands) to perform before launching (i.e., before LaunchMethod is submitted), potentially on a batch node which is different from the node the task is placed on.
Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput.
- post_launch¶
[type: list | default: []] Actions (shell commands) to perform after launching (i.e., after LaunchMethod is executed).
Precautions are the same as for pre_launch actions.
- pre_exec¶
[type: list | default: []] Actions (shell commands) to perform before the task starts (LaunchMethod is submitted, but no actual task running yet). Each item could be either a string (str), which represents an action applied to all ranks, or a dictionary (dict), which represents a list of actions applied to specified ranks (key is a rankID and value is a list of actions to be performed for this rank).
The actions/commands are executed on the respective nodes where the ranks are placed, and the actual rank startup will be delayed until all pre_exec commands have completed.
Precautions are the same as for pre_launch actions.
No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task’s execution environment).
No assumption should be made on the specific shell environment the commands are executed in other than a POSIX shell environment.
Errors in executing these commands will result in the task to enter FAILED state, and no execution of the actual workload will be attempted.
- pre_exec_sync¶
[type: bool | default: False] Flag indicates necessary to sync ranks execution, which enforce to delay individual rank execution, until all pre_exec actions for all ranks are completed.
- post_exec¶
[type: list | default: []] Actions (shell commands) to perform after the task finishes. The same remarks as on pre_exec apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful.
- restartable¶
[type: bool | default: False] If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted.
- scheduler¶
[type: str | default: “”] Request the task to be handled by a specific agent scheduler.
- tags¶
[type: dict | default: {}] Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location).
- metadata¶
[type: ANY | default: None] User defined metadata.
- timeout¶
[type: float | default: 0.0] Any timeout larger than 0 will result in the task process to be killed after the specified amount of seconds. The task will then end up in CANCELED state.
- cleanup¶
[type: bool | default: False] If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot’s own cleanup flag is set.
- pilot¶
[type: str | default: “”] Pilot uid, if specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised.
5. Staging Directives¶
The Staging Directives are specified using a dict in the following form:
staging_directive = { 'source' : None, # see 'Location' below 'target' : None, # see 'Location' below 'action' : None, # See 'Action operators' below 'flags' : None, # See 'Flags' below 'priority': 0 # Control ordering of actions (unused) }
5. Locations¶
source and target locations can be given as strings or ru.Url instances. Strings containing :// are converted into URLs immediately. Otherwise, they are considered absolute or relative paths and are then interpreted in the context of the client’s working directory.
Special URL schemas:
client:// : relative to the client’s working directory
resource:// : relative to the RP sandbox on the target resource
pilot:// : relative to the pilot sandbox on the target resource
task:// : relative to the task sandbox on the target resource
In all these cases, the hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).
5. Action operators¶
Action operators:
rp.TRANSFER : remote file transfer from source URL to target URL
rp.COPY : local file copy, i.e., not crossing host boundaries
rp.MOVE : local file move
rp.LINK : local file symlink
5. Flags¶
Flags:
rp.CREATE_PARENTS : create the directory hierarchy for targets on the fly
rp.RECURSIVE : if source is a directory, handles it recursively
5. Task Environment¶
RP tasks are expected to be executed in isolation, meaning that their runtime environment is completely independent from the environment of other tasks, independent from the launch mechanism used to start the task, and also independent from the environment of the RP stack itself.
The task description provides several hooks to help setting up the environment in that context. It is important to understand the way those hooks interact with respect to the environments mentioned above.
pre_launch directives are set and executed before the task is passed on to the task launch method. As such, pre_launch usually executed on the node where RP’s agent is running, and not on the tasks target node. Executing pre_launch directives for many tasks can thus negatively impact RP’s performance (*). Note also that pre_launch directives can in some cases interfere with the launch method.
Use pre_launch directives for rare, heavy-weight operations which prepare the runtime environment for multiple tasks: fetch data from a remote source, unpack input data, create global communication channels, etc.
pre_exec directives are set and executed after the launch method placed the task on the compute nodes and are thus running on the target node. Note that for MPI tasks, the pre_exec directives are executed once per rank. Running large numbers of pre_exec directives concurrently can lead to system performance degradation (*), for example when those directives concurrently hot the shared files system (for loading modules or Python virtualenvs etc).
Use pre_exec directives for task environment setup such as module load, virtualenv activate, export whose effects are expected to be applied either to all task ranks or to specified ranks. Avoid file staging operations at this point (files would be redundantly staged multiple times - once per rank).
(*) The performance impact of repeated concurrent access to the system’s shared file system can be significant and can pose a major bottleneck for your application. Specifically module load and virtualenv activate operations and the like are heavy on file system I/O, and executing those for many tasks is ill advised. Having said that: RP attempts to optimize those operations: if it identifies that identical pre_exec directives are shared between multiple tasks, RP will execute the directives exactly once and will cache the resulting environment settings - those cached settings are then applied to all other tasks with the same directives, without executing the directives again.
5.3.3. Task¶
- class radical.pilot.Task(tmgr, descr, origin)[source]¶
A Task represent a ‘task’ that is executed on a Pilot. Tasks allow to control and query the state of this task.
Note
A task cannot be created directly. The factory method
rp.TaskManager.submit_tasks()has to be used instead.Example:
tmgr = rp.TaskManager(session=s) ud = rp.TaskDescription() ud.executable = "/bin/date" task = tmgr.submit_tasks(ud)
- property description¶
Returns the description the task was started with, as a dictionary.
- Returns:
description (dict)
- property exception¶
Returns an string representation (__repr__) of the exception which caused the task’s FAILED state if such one was raised while managing or executing the task.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Returns:
str
- property exception_detail¶
Returns additional information about the exception which caused this task to enter FAILED state.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Returns:
str
- property exit_code¶
Returns the exit code of the task, if that is already known, or ‘None’ otherwise.
- Returns:
exit code (int)
- property metadata¶
Returns the metadata field of the task’s description
- property mode¶
Returns the task mode
- Returns:
A mode (string).
- property name¶
Returns the task’s application specified name.
- Returns:
A name (string).
- property origin¶
indicates where the task was created
- Returns:
string
- property pilot¶
Returns the pilot ID of this task, if that is already known, or ‘None’ otherwise.
- Returns:
A pilot ID (string)
- register_callback(cb, cb_data=None, metric=None)[source]¶
Registers a callback function that is triggered every time a task’s state changes.
All callback functions need to have the same signature:
def cb(obj, state)
where
objectis a handle to the object that triggered the callback andstateis the new state of that object. If ‘cb_data’ is given, then the ‘cb’ signature changes todef cb(obj, state, cb_data)
and ‘cb_data’ are passed unchanged.
- property return_value¶
Returns the return value for tasks which represent function call (or None otherwise).
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Returns:
Any
- property state¶
Returns the current state of the task.
- Returns:
state (string enum)
- property stderr¶
Returns a snapshot of the executable’s STDERR stream.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.
- Returns:
stderr (string)
- property stdout¶
Returns a snapshot of the executable’s STDOUT stream.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.
- Returns:
stdout (string)
- property task_sandbox¶
Returns the full sandbox URL of this task, if that is already known, or ‘None’ otherwise.
- Returns:
A URL (radical.utils.Url).
- property tmgr¶
Returns the task’s manager.
- Returns:
A
TaskManager.
- property uid¶
Returns the task’s unique identifier.
The uid identifies the task within a
TaskManager.- Returns:
A unique identifier (string).
- wait(state=None, timeout=None)[source]¶
Returns when the task reaches a specific state or when an optional timeout is reached.
Arguments:
state [list of strings] The state(s) that task has to reach in order for the call to return.
By default wait waits for the task to reach a final state, which can be one of the following:
rp.states.DONErp.states.FAILEDrp.states.CANCELED
timeout [float] Optional timeout in seconds before the call returns regardless whether the task has reached the desired state or not. The default value None never times out.