Source code for radical.pilot.task_description


# pylint: disable=access-member-before-definition
#
__copyright__ = 'Copyright 2013-2022, The RADICAL-Cybertools Team'
__license__   = 'MIT'

import radical.utils as ru

# task modes
TASK_EXECUTABLE  = 'task.executable'
TASK_FUNCTION    = 'task.function'
TASK_METHOD      = 'task.method'
TASK_EVAL        = 'task.eval'
TASK_EXEC        = 'task.exec'
TASK_PROC        = 'task.proc'
TASK_SHELL       = 'task.shell'
RAPTOR_MASTER    = 'raptor.master'
RAPTOR_WORKER    = 'raptor.worker'

# task description attributes
UID              = 'uid'
NAME             = 'name'
MODE             = 'mode'

# mode: TASK_EXECUTABLE
EXECUTABLE       = 'executable'
ARGUMENTS        = 'arguments'

# mode: TASK_METHOD  # FIXME
METHOD           = 'method'
ARGS             = 'args'
KWARGS           = 'kwargs'

# mode: TASK_FUNCTION
FUNCTION         = 'function'
ARGS             = 'args'
KWARGS           = 'kwargs'

# mode: TASK_EXEC, TASK_EVAL
CODE             = 'code'

# mode: TASK_PROC, TASK_SHELL
COMMAND          = 'command'

# environment
ENVIRONMENT      = 'environment'
NAMED_ENV        = 'named_env'
SANDBOX          = 'sandbox'

# resource requirements
RANKS            = 'ranks'                    # ranks
CORES_PER_RANK   = 'cores_per_rank'           # cores per rank
GPUS_PER_RANK    = 'gpus_per_rank'            # gpus per rank
THREADING_TYPE   = 'threading_type'           # OpenMP?
GPU_TYPE         = 'gpu_type'                 # CUDA / ROCm?
LFS_PER_RANK     = 'lfs_per_rank'             # disk space per rank
MEM_PER_RANK     = 'mem_per_rank'             # memory per rank

# deprecated
CPU_PROCESSES    = 'cpu_processes'            # ranks
CPU_PROCESS_TYPE = 'cpu_process_type'         # n/a
CPU_THREADS      = 'cpu_threads'              # cores per rank
CPU_THREAD_TYPE  = 'cpu_thread_type'          # OpenMP?

GPU_PROCESSES    = 'gpu_processes'            # gpus per rank
GPU_PROCESS_TYPE = 'gpu_process_type'         # CUDA?
GPU_THREADS      = 'gpu_threads'              # part of gpu?
GPU_THREAD_TYPE  = 'gpu_thread_type'          # ?

LFS_PER_PROCESS  = 'lfs_per_process'          # disk space per rank
MEM_PER_PROCESS  = 'mem_per_process'          # memory per rank

# task setup
INPUT_STAGING    = 'input_staging'
OUTPUT_STAGING   = 'output_staging'
STAGE_ON_ERROR   = 'stage_on_error'
PRE_LAUNCH       = 'pre_launch'
PRE_EXEC         = 'pre_exec'
PRE_EXEC_SYNC    = 'pre_exec_sync'
POST_LAUNCH      = 'post_launch'
POST_EXEC        = 'post_exec'
TIMEOUT          = 'timeout'
CLEANUP          = 'cleanup'
PILOT            = 'pilot'
STDOUT           = 'stdout'
STDERR           = 'stderr'
RESTARTABLE      = 'restartable'
SCHEDULER        = 'scheduler'
TAGS             = 'tags'
METADATA         = 'metadata'


# ------------------------------------------------------------------------------
#
[docs]class TaskDescription(ru.TypedDict): """ A TaskDescription object describes the requirements and properties of a :class:`radical.pilot.Task` and is passed as a parameter to :meth:`radical.pilot.TaskManager.submit_tasks` to instantiate and run a new task. .. note:: A TaskDescription **MUST** define at least an `executable` -- all other elements are optional. .. py:attribute:: uid [type: `str` | default: `""`] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set. .. py:attribute:: name [type: `str` | default: `""`] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. .. py:attribute:: mode [type: `str` | default: `"executable"`] The execution mode to be used for this task. The following modes are accepted: - TASK_EXECUTABLE: the task is spawned as an external executable via a resource specific launch method (srun, aprun, mpiexec, etc). required attributes: `executable` related attributes: `arguments` - TASK_FUNCTION: the task references a python function to be called. required attributes: `function` related attributes: `args` related attributes: `kwargs` - TASK_METHOD: the task references a raptor worker method to be called. required attributes: `method` related attributes: `args` related attributes: `kwargs` - TASK_EVAL: the task is a code snippet to be evaluated. required attributes: `code` - TASK_EXEC: the task is a code snippet to be `exec`'ed. required attributes: `code` - TASK_SHELL: the task is a shell command line to be run. required attributes: `command` - TASK_PROC: the task is a single core process to be executed. required attributes: `executable` related attributes: `arguments` There exists a certain overlap between `TASK_EXECUTABLE`, `TASK_SHELL` and `TASK_PROC` modes. As a general rule, `TASK_SHELL` and `TASK_PROC` should be used for short running tasks which require a single core and no additional resources (gpus, storage, memory). `TASK_EXECUTABLE` should be used for all other tasks and is in fact the default. `TASK_SHELL` should only be used if the command to be run requires shell specific functionality (pipes, I/O redirection) which cannot easily be mapped to other task attributes. .. py:attribute:: executable [type: `str` | default: `""`] The executable to launch. The executable is expected to be either available via `$PATH` on the target resource, or to be an absolute path. .. py:attribute:: arguments [type: `list` | default: `[]`] The command line arguments for the given `executable` (`list` of `strings`). .. py:attribute:: code [type: `str` | default: `""`] The code to run. This field is expected to contain valid python code which is executed when the task mode is `TASK_EXEC` or `TASK_EVAL`. .. py:attribute:: function [type: `str` | default: `""`] The function to run. This field is expected to contain a python function name which can be resolved in the scope of the respective RP worker implementation (see documentation there). The task mode must be set to `TASK_FUNCTION`. `args` and `kwargs` are passed as function parameters. .. py:attribute:: args [type: `list` | default: `[]`] Unnamed arguments to be passed to the `function` (see above). This field will be serialized with `msgpack` and can thus contain any serializable data types. .. py:attribute:: kwargs [type: `dict` | default: `{}`] Named arguments to be passed to the `function` (see above). This field will be serialized with `msgpack` and can thus contain any serializable data types. .. py:attribute:: command [type: `str` | default: `""`] A shell command to be executed. This attribute is used for the `TASK_SHELL` mode. .. py:attribute:: ranks [type: `int` | default: `1`] The number of application processes to start on CPU cores. For two ranks or more, an MPI communicator will be available to the processes. `ranks` replaces the deprecated attribute `cpu_processes`. The attribute `cpu_process_type` was previously used to signal the need for an MPI communicator - that attribute is now also deprecated and will be ignored. .. py:attribute:: cores_per_rank [type: `int` | default: `1`] The number of cpu cores each process will have available to start its own threads or processes on. By default, `core` refers to a physical CPU core - but if the pilot has been launched with SMT-settings > 1, `core` will refer to a virtual core or hyperthread instead (the name depends on the CPU vendor). `cores_per_rank` replaces the deprecated attribute `cpu_threads`. .. py:attribute:: threading_type [type: `str` | default: `""`] The thread type, influences startup and environment (`<empty>/POSIX`, `OpenMP`). `threading_type` replaces the deprecated attribute `cpu_thread_type`. .. py:attribute:: gpus_per_rank [type: `int` | default: `0`] The number of gpus made available to each rank. `gpus_per_rank` replaces the deprecated attribute `gpu_processes`. The attributes `gpu_threads` and `gpu_process_type` are also deprecated and will be ignored. .. py:attribute:: gpu_type [type: `str` | default: `""`] The type of GPU environment to provide to the ranks (`<empty>`, `CUDA`, `ROCm`). `gpu_type` replaces the deprecated attribute `gpu_thread_type`. .. py:attribute:: lfs_per_rank [type: `int` | default: `0`] Local File Storage per rank - amount of data (MB) required on the local file system of the node. `lfs_per_rank` replaces the deprecated attribute `lfs_per_process`. .. py:attribute:: mem_per_rank [type: `int` | default: `0`] Amount of physical memory required per rank. `mem_per_rank` replaces the deprecated attribute `mem_per_process`. .. py:attribute:: environment [type: `dict` | default: `{}`] Environment variables to set in the environment before the execution (launching picked `LaunchMethod`). .. py:attribute:: named_env [type: `str` | default: `""`] A named virtual environment as prepared by the pilot. The task will remain in `AGENT_SCHEDULING` state until that environment gets created. .. py:attribute:: sandbox [type: `str` | default: `""`] This specifies the working directory of the task. That directory *MUST* be relative to the pilot sandbox. It will be created if it does not exist. By default, the sandbox has the name of the task's uid. .. py:attribute:: stdout [type: `str` | default: `""`] The name of the file to store stdout. If not set then the name of the following format will be used: `<uid>.out`. .. py:attribute:: stderr [type: `str` | default: `""`] The name of the file to store stderr. If not set then the name of the following format will be used: `<uid>.err`. .. py:attribute:: input_staging [type: `list` | default: `[]`] The files that need to be staged before the execution (`list` of `staging directives`, see below). .. py:attribute:: output_staging [type: `list` | default: `[]`] The files that need to be staged after the execution (`list` of `staging directives`, see below). .. py:attribute:: stage_on_error [type: `bool` | default: `False`] Output staging is normally skipped on `FAILED` or `CANCELED` tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage. .. py:attribute:: pre_launch [type: `list` | default: `[]`] Actions (shell commands) to perform before launching (i.e., before LaunchMethod is submitted), potentially on a batch node which is different from the node the task is placed on. Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput. .. py:attribute:: post_launch [type: `list` | default: `[]`] Actions (shell commands) to perform after launching (i.e., after LaunchMethod is executed). Precautions are the same as for `pre_launch` actions. .. py:attribute:: pre_exec [type: `list` | default: `[]`] Actions (shell commands) to perform before the task starts (LaunchMethod is submitted, but no actual task running yet). Each item could be either a string (`str`), which represents an action applied to all ranks, or a dictionary (`dict`), which represents a list of actions applied to specified ranks (key is a rankID and value is a list of actions to be performed for this rank). The actions/commands are executed on the respective nodes where the ranks are placed, and the actual rank startup will be delayed until all `pre_exec` commands have completed. Precautions are the same as for `pre_launch` actions. No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task's execution environment). No assumption should be made on the specific shell environment the commands are executed in other than a POSIX shell environment. Errors in executing these commands will result in the task to enter `FAILED` state, and no execution of the actual workload will be attempted. .. py:attribute:: pre_exec_sync [type: `bool` | default: `False`] Flag indicates necessary to sync ranks execution, which enforce to delay individual rank execution, until all `pre_exec` actions for all ranks are completed. .. py:attribute:: post_exec [type: `list` | default: `[]`] Actions (shell commands) to perform after the task finishes. The same remarks as on `pre_exec` apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful. .. py:attribute:: restartable [type: `bool` | default: `False`] If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted. .. py:attribute:: scheduler [type: `str` | default: `""`] Request the task to be handled by a specific agent scheduler. .. py:attribute:: tags [type: `dict` | default: `{}`] Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location). .. py:attribute:: metadata [type: `ANY` | default: `None`] User defined metadata. .. py:attribute:: timeout [type: `float` | default: `0.0`] Any timeout larger than 0 will result in the task process to be killed after the specified amount of seconds. The task will then end up in `CANCELED` state. .. py:attribute:: cleanup [type: `bool` | default: `False`] If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot's own `cleanup` flag is set. .. py:attribute:: pilot [type: `str` | default: `""`] Pilot `uid`, if specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised. Staging Directives ================== The Staging Directives are specified using a dict in the following form:: staging_directive = { 'source' : None, # see 'Location' below 'target' : None, # see 'Location' below 'action' : None, # See 'Action operators' below 'flags' : None, # See 'Flags' below 'priority': 0 # Control ordering of actions (unused) } Locations --------- `source` and `target` locations can be given as strings or `ru.Url` instances. Strings containing `://` are converted into URLs immediately. Otherwise, they are considered absolute or relative paths and are then interpreted in the context of the client's working directory. Special URL schemas: * `client://` : relative to the client's working directory * `resource://` : relative to the RP sandbox on the target resource * `pilot://` : relative to the pilot sandbox on the target resource * `task://` : relative to the task sandbox on the target resource In all these cases, the `hostname` element of the URL is expected to be empty, and the path is *always* considered relative to the locations specified above (even though URLs usually don't have a notion of relative paths). Action operators ---------------- Action operators: * rp.TRANSFER : remote file transfer from `source` URL to `target` URL * rp.COPY : local file copy, i.e., not crossing host boundaries * rp.MOVE : local file move * rp.LINK : local file symlink Flags ----- Flags: * rp.CREATE_PARENTS : create the directory hierarchy for targets on the fly * rp.RECURSIVE : if `source` is a directory, handles it recursively Task Environment ================ RP tasks are expected to be executed in isolation, meaning that their runtime environment is completely independent from the environment of other tasks, independent from the launch mechanism used to start the task, and also independent from the environment of the RP stack itself. The task description provides several hooks to help setting up the environment in that context. It is important to understand the way those hooks interact with respect to the environments mentioned above. - `pre_launch` directives are set and executed before the task is passed on to the task launch method. As such, `pre_launch` usually executed on the node where RP's agent is running, and *not* on the tasks target node. Executing `pre_launch` directives for many tasks can thus negatively impact RP's performance (*). Note also that `pre_launch` directives can in some cases interfere with the launch method. Use `pre_launch` directives for rare, heavy-weight operations which prepare the runtime environment for multiple tasks: fetch data from a remote source, unpack input data, create global communication channels, etc. - `pre_exec` directives are set and executed *after* the launch method placed the task on the compute nodes and are thus running on the target node. Note that for MPI tasks, the `pre_exec` directives are executed once per rank. Running large numbers of `pre_exec` directives concurrently can lead to system performance degradation (*), for example when those directives concurrently hot the shared files system (for loading modules or Python virtualenvs etc). Use `pre_exec` directives for task environment setup such as `module load`, `virtualenv activate`, `export` whose effects are expected to be applied either to all task ranks or to specified ranks. Avoid file staging operations at this point (files would be redundantly staged multiple times - once per rank). (*) The performance impact of repeated concurrent access to the system's shared file system can be significant and can pose a major bottleneck for your application. Specifically `module load` and `virtualenv activate` operations and the like are heavy on file system I/O, and executing those for many tasks is ill advised. Having said that: RP attempts to optimize those operations: if it identifies that identical `pre_exec` directives are shared between multiple tasks, RP will execute the directives exactly *once* and will cache the resulting environment settings - those cached settings are then applied to all other tasks with the same directives, without executing the directives again. """ _schema = { UID : str , NAME : str , MODE : str , EXECUTABLE : str , ARGUMENTS : [str] , CODE : str , FUNCTION : str , ARGS : [None] , KWARGS : {str: None} , COMMAND : str , SANDBOX : str , ENVIRONMENT : {str: str} , NAMED_ENV : str , PRE_LAUNCH : [str] , PRE_EXEC : [None] , PRE_EXEC_SYNC : bool , POST_LAUNCH : [str] , POST_EXEC : [None] , STDOUT : str , STDERR : str , INPUT_STAGING : [None] , OUTPUT_STAGING : [None] , STAGE_ON_ERROR : bool , RANKS : int , CORES_PER_RANK : int , GPUS_PER_RANK : int , THREADING_TYPE : str , GPU_TYPE : str , LFS_PER_RANK : int , MEM_PER_RANK : int , # deprecated CPU_PROCESSES : int , # RANKS CPU_PROCESS_TYPE: str , # n/a CPU_THREADS : int , # CORES_PER_RANK CPU_THREAD_TYPE : str , # THREADING_TYPE GPU_PROCESSES : int , # GPUS_PER_RANK GPU_PROCESS_TYPE: str , # GPU_TYPE GPU_THREADS : int , # n/a GPU_THREAD_TYPE : str , # n/a LFS_PER_PROCESS : int , # LFS_PER_RANK MEM_PER_PROCESS : int , # MEM_PER_RANK RESTARTABLE : bool , SCHEDULER : str , TAGS : {None: None}, METADATA : None , TIMEOUT : float , CLEANUP : bool , PILOT : str , } _defaults = { UID : '' , NAME : '' , MODE : None , EXECUTABLE : '' , ARGUMENTS : list() , CODE : '' , FUNCTION : '' , ARGS : list() , KWARGS : dict() , COMMAND : '' , SANDBOX : '' , ENVIRONMENT : dict() , NAMED_ENV : '' , PRE_LAUNCH : list() , PRE_EXEC : list() , PRE_EXEC_SYNC : False , POST_LAUNCH : list() , POST_EXEC : list() , STDOUT : '' , STDERR : '' , INPUT_STAGING : list() , OUTPUT_STAGING : list() , STAGE_ON_ERROR : False , RANKS : 1 , CORES_PER_RANK : 1 , GPUS_PER_RANK : 0 , THREADING_TYPE : '' , GPU_TYPE : '' , LFS_PER_RANK : 0 , MEM_PER_RANK : 0 , # deprecated CPU_PROCESSES : 0 , CPU_PROCESS_TYPE: '' , CPU_THREADS : 0 , CPU_THREAD_TYPE : '' , GPU_PROCESSES : 0 , GPU_PROCESS_TYPE: '' , GPU_THREADS : 0 , GPU_THREAD_TYPE : '' , LFS_PER_PROCESS : 0 , MEM_PER_PROCESS : 0 , RESTARTABLE : False , SCHEDULER : '' , TAGS : dict() , METADATA : None , TIMEOUT : 0.0 , CLEANUP : False , PILOT : '' , } # -------------------------------------------------------------------------- # def __init__(self, from_dict=None): super().__init__(from_dict=from_dict) # -------------------------------------------------------------------------- # def _verify(self): if not self.get('mode'): self['mode'] = TASK_EXECUTABLE if self.mode in [TASK_EXECUTABLE, RAPTOR_MASTER, RAPTOR_WORKER]: if not self.get('executable'): raise ValueError("TASK_EXECUTABLE Task needs 'executable'") elif self.mode == TASK_FUNCTION: if not self.get('function'): raise ValueError("TASK_FUNCTION Task needs 'function'") elif self.mode == TASK_PROC: if not self.get('executable'): raise ValueError("TASK_PROC Task needs 'executable'") elif self.mode == TASK_EVAL: if not self.get('code'): raise ValueError("TASK_EVAL Task needs 'code'") elif self.mode == TASK_EXEC: if not self.get('code'): raise ValueError("TASK_EXEC Task needs 'code'") elif self.mode == TASK_SHELL: if not self.get('command'): raise ValueError("TASK_SHELL Task needs 'command'") # backward compatibility for deprecated attributes if self.cpu_processes: self.ranks = self.cpu_processes self.cpu_processes = 0 if self.cpu_threads: self.cores_per_rank = self.cpu_threads self.cpu_threads = 0 if self.cpu_thread_type: self.threading_type = self.cpu_thread_type self.cpu_thread_type = None if self.gpu_processes: self.gpus_per_rank = self.gpu_processes self.gpu_processes = 0 if self.gpu_process_type: self.gpu_type = self.gpu_process_type self.gpu_process_type = None if self.lfs_per_process: self.lfs_per_rank = self.lfs_per_process self.lfs_per_process = 0 if self.mem_per_process: self.mem_per_rank = self.mem_per_process self.mem_per_process = 0 # deprecated and ignored if self.cpu_process_type: pass if self.gpu_threads : pass if self.gpu_thread_type : pass
# if self.mode in [TASK_SHELL, TASK_PROC]: # # if self.get('cpu_processes', 1) * self.get('cpu_threads', 1) > 1: # raise ValueError("TASK_SHELL and TASK_PROC Tasks must be single core") # # if self.get('gpu_processes', 0) > 0: # raise ValueError("TASK_SHELL and TASK_PROC Tasks canont use GPUs") # ------------------------------------------------------------------------------