__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import os
import sys
import time
import threading as mt
import radical.utils as ru
from . import utils as rpu
from . import states as rps
from . import constants as rpc
from . import task_description as rpcud
# bulk callbacks are implemented, but are currently not used nor exposed.
_USE_BULK_CB = False
if os.environ.get('RADICAL_PILOT_BULK_CB', '').lower() in ['true', 'yes', '1']:
_USE_BULK_CB = True
# ------------------------------------------------------------------------------
#
# make sure deprecation warning is shown only once per type
#
_seen = list()
def _warn(old_type, new_type):
if old_type not in _seen:
_seen.append(old_type)
sys.stderr.write('%s is deprecated - use %s\n' % (old_type, new_type))
# ------------------------------------------------------------------------------
#
[docs]class TaskManager(rpu.Component):
"""
A TaskManager manages :class:`radical.pilot.Task` instances which
represent the **executable** workload in RADICAL-Pilot. A TaskManager
connects the Tasks with one or more :class:`Pilot` instances (which
represent the workload **executors** in RADICAL-Pilot) and a **scheduler**
which determines which :class:`Task` gets executed on which
:class:`Pilot`.
**Example**::
s = rp.Session(database_url=DBURL)
pm = rp.PilotManager(session=s)
pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cores = 16
p1 = pm.submit_pilots(pd) # create first pilot with 16 cores
p2 = pm.submit_pilots(pd) # create second pilot with 16 cores
# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
t = rp.TaskDescription()
t.executable = "/bin/sleep"
t.arguments = ['60']
tasks.append(t)
# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)
The task manager can issue notification on task state changes. Whenever
state notification arrives, any callback registered for that notification is
fired.
NOTE: State notifications can arrive out of order wrt the task state model!
"""
# --------------------------------------------------------------------------
#
[docs] def __init__(self, session, cfg='default', scheduler=None, uid=None):
"""
Creates a new TaskManager and attaches it to the session.
**Arguments:**
* session [:class:`radical.pilot.Session`]:
The session instance to use.
* cfg (`dict` or `string`):
The configuration or name of configuration to use.
* scheduler (`string`):
The name of the scheduler plug-in to use.
* uid (`string`):
ID for unit manager, to be used for reconnect
**Returns:**
* A new `TaskManager` object [:class:`radical.pilot.TaskManager`].
"""
# initialize the base class (with no intent to fork)
if uid:
self._reconnect = True
self._uid = uid
else:
self._reconnect = False
self._uid = ru.generate_id('tmgr.%(item_counter)04d',
ru.ID_CUSTOM, ns=session.uid)
self._pilots = dict()
self._pilots_lock = mt.RLock()
self._tasks = dict()
self._tasks_lock = mt.RLock()
self._callbacks = dict()
self._tcb_lock = mt.RLock()
self._terminate = mt.Event()
self._closed = False
self._rec_id = 0 # used for session recording
for m in rpc.TMGR_METRICS:
self._callbacks[m] = dict()
# NOTE: `name` and `cfg` are overloaded, the user cannot point to
# a predefined config and name it at the same time. This might
# be ok for the session, but introduces a minor API inconsistency.
#
name = None
if isinstance(cfg, str):
name = cfg
cfg = None
cfg = ru.Config('radical.pilot.tmgr', name=name, cfg=cfg)
cfg.uid = self._uid
cfg.owner = self._uid
cfg.sid = session.uid
cfg.base = session.base
cfg.path = session.path
cfg.dburl = session.dburl
cfg.heartbeat = session.cfg.heartbeat
cfg.client_sandbox = session._get_client_sandbox()
if scheduler:
# overwrite the scheduler from the config file
cfg.scheduler = scheduler
rpu.Component.__init__(self, cfg, session=session)
self.start()
self._log.info('started tmgr %s', self._uid)
self._rep.info('<<create task manager')
# create pmgr bridges and components, use session cmgr for that
self._cmgr = rpu.ComponentManager(self._cfg)
self._cmgr.start_bridges()
self._cmgr.start_components()
# let session know we exist
if self._reconnect:
self._session._reconnect_tmgr(self)
self._reconnect_tasks()
else:
self._session._register_tmgr(self)
# The output queue is used to forward submitted tasks to the
# scheduling component.
self.register_output(rps.TMGR_SCHEDULING_PENDING,
rpc.TMGR_SCHEDULING_QUEUE)
# the tmgr will also collect tasks from the agent again, for output
# staging and finalization
if self._cfg.bridges.tmgr_staging_output_queue:
self._has_sout = True
self.register_output(rps.TMGR_STAGING_OUTPUT_PENDING,
rpc.TMGR_STAGING_OUTPUT_QUEUE)
else:
self._has_sout = False
# register the state notification pull cb
# FIXME: this should be a tailing cursor in the update worker
self.register_timed_cb(self._state_pull_cb,
timer=self._cfg['db_poll_sleeptime'])
# register callback which pulls tasks back from agent
# FIXME: this should be a tailing cursor in the update worker
self.register_timed_cb(self._task_pull_cb,
timer=self._cfg['db_poll_sleeptime'])
# also listen to the state pubsub for task state changes
self.register_subscriber(rpc.STATE_PUBSUB, self._state_sub_cb)
self._prof.prof('setup_done', uid=self._uid)
self._rep.ok('>>ok\n')
# --------------------------------------------------------------------------
#
def initialize(self):
# the manager must not carry bridge and component handles across forks
ru.atfork(self._atfork_prepare, self._atfork_parent, self._atfork_child)
# --------------------------------------------------------------------------
#
# EnTK forks, make sure we don't carry traces of children across the fork
#
def _atfork_prepare(self): pass
def _atfork_parent(self) : pass
def _atfork_child(self) :
self._bridges = dict()
self._components = dict()
# --------------------------------------------------------------------------
#
def finalize(self):
self._cmgr.close()
# --------------------------------------------------------------------------
#
[docs] def close(self):
"""
Shut down the TaskManager and all its components.
"""
# we do not cancel tasks at this point, in case any component or pilot
# wants to continue to progress task states, which should indeed be
# independent from the tmgr life cycle.
if self._closed:
return
self._terminate.set()
self._rep.info('<<close task manager')
# disable callbacks during shutdown
with self._tcb_lock:
self._callbacks = dict()
for m in rpc.TMGR_METRICS:
self._callbacks[m] = dict()
self._cmgr.close()
self._log.info("Closed TaskManager %s." % self._uid)
self._closed = True
self._rep.ok('>>ok\n')
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""
Returns a dictionary representation of the TaskManager object.
"""
ret = {
'uid': self.uid,
'cfg': self.cfg
}
return ret
# --------------------------------------------------------------------------
#
def __str__(self):
"""
Returns a string representation of the TaskManager object.
"""
return str(self.as_dict())
# --------------------------------------------------------------------------
#
def _pilot_state_cb(self, pilots, state=None):
if self._terminate.is_set():
return False
# we register this callback for pilots added to this tmgr. It will
# specifically look out for pilots which complete, and will make sure
# that all tasks are pulled back into tmgr control if that happens
# prematurely.
#
# If we find tasks which have not completed the agent part of the task
# state model, we declare them FAILED. If they can be restarted, we
# resubmit an identical task, which then will get a new task ID. This
# avoids state model confusion (the state model is right now expected to
# be linear), but is not intuitive for the application (FIXME).
#
# FIXME: there is a race with the tmgr scheduler which may, just now,
# and before being notified about the pilot's demise, send new
# tasks to the pilot.
# we only look into pilot states when the tmgr is still active
# FIXME: note that there is a race in that the tmgr can be closed while
# we are in the cb.
# FIXME: `self._closed` is not an `mt.Event`!
if self._closed:
self._log.debug('tmgr closed, ignore pilot cb %s',
['%s:%s' % (p.uid, p.state) for p in pilots])
return True
if not isinstance(pilots, list):
pilots = [pilots]
for pilot in pilots:
pid = pilot.uid
state = pilot.state
if state in rps.FINAL:
self._log.debug('pilot %s is final - pull tasks', pilot.uid)
task_cursor = self.session._dbs._c.find({
'type' : 'task',
'pilot' : pilot.uid,
'tmgr' : self.uid,
'control' : {'$in' : ['agent_pending', 'agent']}})
if not task_cursor.count():
tasks = list()
else:
tasks = list(task_cursor)
self._log.debug("tasks pulled: %3d (pilot dead)", len(tasks))
if not tasks:
continue
# update the tasks to avoid pulling them again next time.
# NOTE: this needs not locking with the task pulling in the
# _task_pull_cb, as that will only pull tmgr_pending
# tasks.
uids = [task['uid'] for task in tasks]
self._session._dbs._c.update({'type' : 'task',
'uid' : {'$in' : uids}},
{'$set' : {'control' : 'tmgr'}},
multi=True)
to_restart = list()
for task in tasks:
task['exception'] = 'RuntimeError("pilot died")'
task['exception_detail'] = 'pilot %s is final' % pid
task['state'] = rps.FAILED
if not task['description'].get('restartable'):
self._log.debug('task %s not restartable', task['uid'])
continue
self._log.debug('task %s is restartable', task['uid'])
task['restarted'] = True
ud = rpcud.TaskDescription(task['description'])
to_restart.append(ud)
# FIXME: increment some restart counter in the description?
# FIXME: reference the resulting new uid in the old task.
if to_restart and not self._closed:
self._log.debug('restart %s tasks', len(to_restart))
restarted = self.submit_tasks(to_restart)
for u in restarted:
self._log.debug('restart task %s', u.uid)
# final tasks are not pushed
self.advance(tasks, publish=True, push=False)
# keep cb registered
return True
# --------------------------------------------------------------------------
#
def _state_pull_cb(self):
if self._terminate.is_set():
return False
# pull all task states from the DB, and compare to the states we know
# about. If any state changed, update the task instance and issue
# notification callbacks as needed. Do not advance the state (again).
# FIXME: we also pull for dead tasks. That is not efficient...
# FIXME: this needs to be converted into a tailed cursor in the update
# worker
tasks = self._session._dbs.get_tasks(tmgr_uid=self.uid)
self._update_tasks(tasks)
return True
# --------------------------------------------------------------------------
#
def _task_pull_cb(self):
if self._terminate.is_set():
return False
# pull tasks from the agent which are about to get back
# under tmgr control, and push them into the respective queues
# FIXME: this should also be based on a tailed cursor
# FIXME: Unfortunately, 'find_and_modify' is not bulkable, so we have
# to use 'find'. To avoid finding the same tasks over and over
# again, we update the 'control' field *before* running the next
# find -- so we do it right here.
task_cursor = self.session._dbs._c.find({'type' : 'task',
'tmgr' : self.uid,
'control' : 'tmgr_pending'})
if not task_cursor.count():
# no tasks whatsoever...
# self._log.info("tasks pulled: 0")
return True # this is not an error
# update the tasks to avoid pulling them again next time.
tasks = list(task_cursor)
uids = [task['uid'] for task in tasks]
self._log.info("tasks pulled: %d", len(uids))
for task in tasks:
task['control'] = 'tmgr'
self._session._dbs._c.update({'type' : 'task',
'uid' : {'$in' : uids}},
{'$set' : {'control' : 'tmgr'}},
multi=True)
self._log.info("tasks pulled: %4d", len(tasks))
self._prof.prof('get', msg="bulk size: %d" % len(tasks), uid=self.uid)
for task in tasks:
# we need to make sure to have the correct state:
uid = task['uid']
self._prof.prof('get', uid=uid)
old = task['state']
new = rps._task_state_collapse(task['states'])
if old != new:
self._log.debug("task pulled %s: %s / %s", uid, old, new)
task['state'] = new
# now we really own the CUs, and can start working on them (ie. push
# them into the pipeline).
to_stage = list()
to_finalize = list()
for task in tasks:
# only advance tasks to data stager if we need data staging
# = otherwise finalize them right away
if task['description'].get('output_staging'):
if task['target_state'] != rps.DONE:
if task['description']['stage_on_error']:
to_stage.append(task)
else:
to_finalize.append(task)
else:
to_stage.append(task)
else:
to_finalize.append(task)
# don't profile state transitions - those happened in the past
if to_stage:
if self._has_sout:
# normal route: needs data stager
self.advance(to_stage, publish=True, push=True, prof=False)
else:
self._log.error('output staging needed but not available!')
for task in to_stage:
task['target_state'] = rps.FAILED
to_finalize.append(task)
if to_finalize:
# shortcut, skip the data stager, but fake state transition
self.advance(to_finalize, state=rps.TMGR_STAGING_OUTPUT,
publish=True, push=False)
# move to final stata
for task in to_finalize:
target_state = task.get('target_state')
if not target_state:
target_state = rps.FAILED
task['state'] = target_state
self.advance(to_finalize, publish=True, push=False)
return True
# --------------------------------------------------------------------------
#
def _state_sub_cb(self, topic, msg):
if self._terminate.is_set():
return False
cmd = msg.get('cmd')
arg = msg.get('arg')
if cmd != 'update':
self._log.debug('ignore state cb msg with cmd %s', cmd)
return True
things = ru.as_list(arg)
tasks = [thing for thing in things if thing.get('type') == 'task']
self._update_tasks(tasks)
return True
# --------------------------------------------------------------------------
#
def _update_tasks(self, task_dicts):
# return information about needed callback and advance activities, so
# that we don't break bulks here.
# note however that individual task callbacks are still being called on
# each task (if any are registered), which can lead to arbitrary,
# application defined delays.
to_notify = list()
with self._tasks_lock:
for task_dict in task_dicts:
uid = task_dict['uid']
# we don't care about tasks we don't know
task = self._tasks.get(uid)
if not task:
self._log.debug('tmgr: task unknown: %s', uid)
continue
# only update on state changes
current = task.state
target = task_dict['state']
if current == target:
continue
target, passed = rps._task_state_progress(uid, current, target)
if target in [rps.CANCELED, rps.FAILED]:
# don't replay intermediate states
passed = passed[-1:]
for s in passed:
task_dict['state'] = s
self._tasks[uid]._update(task_dict)
to_notify.append([task, s])
if to_notify:
if _USE_BULK_CB:
self._bulk_cbs(set([task for task,_ in to_notify]))
else:
for task, state in to_notify:
self._task_cb(task, state)
# --------------------------------------------------------------------------
#
def _task_cb(self, task, state):
with self._tcb_lock:
uid = task.uid
cb_dicts = list()
metric = rpc.TASK_STATE
# get wildcard callbacks
cb_dicts += self._callbacks[metric].get('*', {}).values()
cb_dicts += self._callbacks[metric].get(uid, {}).values()
for cb_dict in cb_dicts:
cb = cb_dict['cb']
cb_data = cb_dict['cb_data']
try:
if cb_data: cb(task, state, cb_data)
else : cb(task, state)
except:
self._log.exception('cb error (%s)', cb.__name__)
# --------------------------------------------------------------------------
#
def _bulk_cbs(self, tasks, metrics=None):
if not metrics: metrics = [rpc.TASK_STATE]
else : metrics = ru.as_list(metrics)
cbs = dict() # bulked callbacks to call
with self._tcb_lock:
for metric in metrics:
# get wildcard callbacks
cb_dicts = self._callbacks[metric].get('*')
for cb_name in cb_dicts:
cbs[cb_name] = {'cb' : cb_dicts[cb_name]['cb'],
'cb_data': cb_dicts[cb_name]['cb_data'],
'tasks' : set(tasks)}
# add task specific callbacks if needed
for task in tasks:
uid = task.uid
if uid not in self._callbacks[metric]:
continue
cb_dicts = self._callbacks[metric].get(uid, {})
for cb_name in cb_dicts:
if cb_name in cbs:
cbs[cb_name]['tasks'].add(task)
else:
cbs[cb_name] = {'cb' : cb_dicts[cb_name]['cb'],
'cb_data': cb_dicts[cb_name]['cb_data'],
'tasks' : set([task])}
for cb_name in cbs:
cb = cbs[cb_name]['cb']
cb_data = cbs[cb_name]['cb_data']
objs = cbs[cb_name]['tasks']
if cb_data: cb(list(objs), cb_data)
else : cb(list(objs))
# --------------------------------------------------------------------------
#
# FIXME: this needs to go to the scheduler
def _default_wait_queue_size_cb(self, tmgr, wait_queue_size):
# FIXME: this needs to come from the scheduler?
if self._terminate.is_set():
return False
self._log.info("[Callback]: wait_queue_size: %s.", wait_queue_size)
# --------------------------------------------------------------------------
#
@property
def uid(self):
"""
Returns the unique id.
"""
return self._uid
# --------------------------------------------------------------------------
#
@property
def scheduler(self):
"""
Returns the scheduler name.
"""
return self._cfg.get('scheduler')
# --------------------------------------------------------------------------
#
[docs] def add_pilots(self, pilots):
"""
Associates one or more pilots with the task manager.
**Arguments:**
* **pilots** [:class:`radical.pilot.Pilot` or list of
:class:`radical.pilot.Pilot`]: The pilot objects that will be
added to the task manager.
"""
if not isinstance(pilots, list):
pilots = [pilots]
if len(pilots) == 0:
raise ValueError('cannot add no pilots')
pilot_docs = list()
with self._pilots_lock:
# sanity check, and keep pilots around for inspection
for pilot in pilots:
if isinstance(pilot, dict):
pilot_dict = pilot
else:
pilot_dict = pilot.as_dict()
# real object: subscribe for state updates
pilot.register_callback(self._pilot_state_cb)
pid = pilot_dict['uid']
if pid in self._pilots:
raise ValueError('pilot %s already added' % pid)
self._pilots[pid] = pilot_dict
pilot_docs.append(pilot_dict)
# publish to the command channel for the scheduler to pick up
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'add_pilots',
'arg' : {'pilots': pilot_docs,
'tmgr' : self.uid}})
# --------------------------------------------------------------------------
#
[docs] def list_pilots(self):
"""
Lists the UIDs of the pilots currently associated with the task manager.
**Returns:**
* A list of :class:`radical.pilot.Pilot` UIDs [`string`].
"""
with self._pilots_lock:
return list(self._pilots.keys())
# --------------------------------------------------------------------------
#
[docs] def get_pilots(self):
"""
Get the pilots instances currently associated with the task manager.
**Returns:**
* A list of :class:`radical.pilot.Pilot` instances.
"""
with self._pilots_lock:
return list(self._pilots.values())
# --------------------------------------------------------------------------
#
[docs] def remove_pilots(self, pilot_ids, drain=False):
"""
Disassociates one or more pilots from the task manager.
After a pilot has been removed from a task manager, it won't process
any of the task manager's tasks anymore. Calling `remove_pilots`
doesn't stop the pilot itself.
**Arguments:**
* **drain** [`boolean`]: Drain determines what happens to the tasks
which are managed by the removed pilot(s). If `True`, all tasks
currently assigned to the pilot are allowed to finish execution.
If `False` (the default), then non-final tasks will be canceled.
"""
# TODO: Implement 'drain'.
# NOTE: the actual removal of pilots from the scheduler is asynchron!
if drain:
raise RuntimeError("'drain' is not yet implemented")
if not isinstance(pilot_ids, list):
pilot_ids = [pilot_ids]
if len(pilot_ids) == 0:
raise ValueError('cannot remove no pilots')
with self._pilots_lock:
# sanity check, and keep pilots around for inspection
for pid in pilot_ids:
if pid not in self._pilots:
raise ValueError('pilot %s not removed' % pid)
del self._pilots[pid]
# publish to the command channel for the scheduler to pick up
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'remove_pilots',
'arg' : {'pids' : pilot_ids,
'tmgr' : self.uid}})
# --------------------------------------------------------------------------
#
[docs] def list_units(self):
'''
deprecated - use `list_tasks()`
'''
_warn(self.list_units, self.list_tasks)
return self.list_tasks()
# --------------------------------------------------------------------------
#
[docs] def list_tasks(self):
"""
Returns the UIDs of the :class:`radical.pilot.Task` managed by
this task manager.
**Returns:**
* A list of :class:`radical.pilot.Task` UIDs [`string`].
"""
with self._pilots_lock:
return list(self._tasks.keys())
# --------------------------------------------------------------------------
#
[docs] def submit_units(self, descriptions):
'''
deprecated - use `submit_tasks()`
'''
_warn(self.submit_units, self.submit_tasks)
return self.submit_tasks(descriptions=descriptions)
# --------------------------------------------------------------------------
#
[docs] def submit_tasks(self, descriptions):
"""
Submits on or more :class:`radical.pilot.Task` instances to the
task manager.
**Arguments:**
* **descriptions** [:class:`radical.pilot.TaskDescription`
or list of :class:`radical.pilot.TaskDescription`]: The
description of the task instance(s) to create.
**Returns:**
* A list of :class:`radical.pilot.Task` objects.
"""
from .task import Task
ret_list = True
if not isinstance(descriptions, list):
ret_list = False
descriptions = [descriptions]
if len(descriptions) == 0:
raise ValueError('cannot submit no task descriptions')
# we return a list of tasks
self._rep.progress_tgt(len(descriptions), label='submit')
tasks = list()
for ud in descriptions:
task = Task(tmgr=self, descr=ud, origin='client')
tasks.append(task)
# keep tasks around
with self._tasks_lock:
self._tasks[task.uid] = task
if self._session._rec:
ru.write_json(ud.as_dict(), "%s/%s.batch.%03d.json"
% (self._session._rec, task.uid, self._rec_id))
self._rep.progress()
self._rep.progress_done()
if self._session._rec:
self._rec_id += 1
# insert tasks into the database, as a bulk.
task_docs = [u.as_dict() for u in tasks]
self._session._dbs.insert_tasks(task_docs)
# Only after the insert can we hand the tasks over to the next
# components (ie. advance state).
self.advance(task_docs, rps.TMGR_SCHEDULING_PENDING,
publish=True, push=True)
if ret_list: return tasks
else : return tasks[0]
# --------------------------------------------------------------------------
#
def _reconnect_tasks(self):
'''
When reconnecting, we need to dig information about all tasks from the
DB for which this tmgr is responsible.
'''
from .task import Task
from .task_description import TaskDescription
task_docs = self._session._dbs.get_tasks(tmgr_uid=self.uid)
with self._tasks_lock:
for doc in task_docs:
descr = TaskDescription(doc['description'])
descr.uid = doc['uid']
task = Task(tmgr=self, descr=descr, origin='client')
task._update(doc, reconnect=True)
self._tasks[task.uid] = task
# --------------------------------------------------------------------------
#
[docs] def get_units(self, uids=None):
'''
deprecated - use `get_tasks()`
'''
_warn(self.get_units, self.get_tasks)
return self.get_tasks(uids=uids)
# --------------------------------------------------------------------------
#
[docs] def get_tasks(self, uids=None):
"""Returns one or more tasks identified by their IDs.
**Arguments:**
* **uids** [`string` or `list of strings`]: The IDs of the
task objects to return.
**Returns:**
* A list of :class:`radical.pilot.Task` objects.
"""
if not uids:
with self._tasks_lock:
ret = list(self._tasks.values())
return ret
ret_list = True
if (not isinstance(uids, list)) and (uids is not None):
ret_list = False
uids = [uids]
ret = list()
with self._tasks_lock:
for uid in uids:
if uid not in self._tasks:
raise ValueError('task %s not known' % uid)
ret.append(self._tasks[uid])
if ret_list: return ret
else : return ret[0]
# --------------------------------------------------------------------------
#
[docs] def wait_units(self, uids=None, state=None, timeout=None):
'''
deprecated - use `wait_tasks()`
'''
_warn(self.wait_units, self.wait_tasks)
return self.wait_tasks(uids=uids, state=state, timeout=timeout)
# --------------------------------------------------------------------------
#
[docs] def wait_tasks(self, uids=None, state=None, timeout=None):
"""
Returns when one or more :class:`radical.pilot.Tasks` reach a
specific state.
If `uids` is `None`, `wait_tasks` returns when **all**
Tasks reach the state defined in `state`. This may include
tasks which have previously terminated or waited upon.
**Example**::
# TODO -- add example
**Arguments:**
* **uids** [`string` or `list of strings`]
If uids is set, only the Tasks with the specified
uids are considered. If uids is `None` (default), all
Tasks are considered.
* **state** [`string`]
The state that Tasks have to reach in order for the call
to return.
By default `wait_tasks` waits for the Tasks to
reach a terminal state, which can be one of the following:
* :data:`radical.pilot.rps.DONE`
* :data:`radical.pilot.rps.FAILED`
* :data:`radical.pilot.rps.CANCELED`
* **timeout** [`float`]
Timeout in seconds before the call returns regardless of Pilot
state changes. The default value **None** waits forever.
"""
if not uids:
with self._tasks_lock:
uids = list()
for uid,task in self._tasks.items():
if task.state not in rps.FINAL:
uids.append(uid)
if not state : states = rps.FINAL
elif not isinstance(state, list): states = [state]
else : states = state
# we simplify state check by waiting for the *earliest* of the given
# states - if the task happens to be in any later state, we are sure the
# earliest has passed as well.
check_state_val = rps._task_state_values[rps.FINAL[-1]]
for state in states:
check_state_val = min(check_state_val,
rps._task_state_values[state])
ret_list = True
if not isinstance(uids, list):
ret_list = False
uids = [uids]
start = time.time()
to_check = None
with self._tasks_lock:
to_check = [self._tasks[uid] for uid in uids]
# We don't want to iterate over all tasks again and again, as that would
# duplicate checks on tasks which were found in matching states. So we
# create a list from which we drop the tasks as we find them in
# a matching state
self._rep.progress_tgt(len(to_check), label='wait')
while to_check and not self._terminate.is_set():
# check timeout
if timeout and (timeout <= (time.time() - start)):
self._log.debug ("wait timed out")
break
time.sleep (0.1)
# FIXME: print percentage...
# print 'wait tasks: %s' % [[u.uid, u.state] for u in to_check]
check_again = list()
for task in to_check:
# we actually don't check if a task is in a specific (set of)
# state(s), but rather check if it ever *has been* in any of
# those states
if task.state not in rps.FINAL and \
rps._task_state_values[task.state] < check_state_val:
# this task does not match the wait criteria
check_again.append(task)
else:
# stop watching this task
if task.state in [rps.FAILED]:
self._rep.progress() # (color='error', c='-')
elif task.state in [rps.CANCELED]:
self._rep.progress() # (color='warn', c='*')
else:
self._rep.progress() # (color='ok', c='+')
to_check = check_again
self._rep.progress_done()
# grab the current states to return
state = None
with self._tasks_lock:
states = [self._tasks[uid].state for uid in uids]
sdict = {state: states.count(state) for state in set(states)}
for state in sorted(set(states)):
self._rep.info('\t%-10s: %5d\n' % (state, sdict[state]))
if to_check: self._rep.warn('>>timeout\n')
else : self._rep.ok ('>>ok\n')
# done waiting
if ret_list: return states
else : return states[0]
# --------------------------------------------------------------------------
#
[docs] def cancel_units(self, uids=None):
'''
deprecated - use `cancel_tasks()`
'''
_warn(self.cancel_units, self.cancel_tasks)
return self.cancel_tasks(uids=uids)
# --------------------------------------------------------------------------
#
[docs] def cancel_tasks(self, uids=None):
"""
Cancel one or more :class:`radical.pilot.Tasks`.
Note that cancellation of tasks is *immediate*, i.e. their state is
immediately set to `CANCELED`, even if some RP component may still
operate on the tasks. Specifically, other state transitions, including
other final states (`DONE`, `FAILED`) can occur *after* cancellation.
This is a side effect of an optimization: we consider this
acceptable tradeoff in the sense "Oh, that task was DONE at point of
cancellation -- ok, we can use the results, sure!".
If that behavior is not wanted, set the environment variable:
export RADICAL_PILOT_STRICT_CANCEL=True
**Arguments:**
* **uids** [`string` or `list of strings`]: The IDs of the
tasks objects to cancel.
"""
if not uids:
with self._tasks_lock:
uids = list(self._tasks.keys())
else:
if not isinstance(uids, list):
uids = [uids]
# NOTE: We advance all tasks to cancelled, and send a cancellation
# control command. If that command is picked up *after* some
# state progression, we'll see state transitions after cancel.
# For non-final states that is not a problem, as it is equivalent
# with a state update message race, which our state collapse
# mechanism accounts for. For an eventual non-canceled final
# state, we do get an invalid state transition. That is also
# corrected eventually in the state collapse, but the point
# remains, that the state model is temporarily violated. We
# consider this a side effect of the fast-cancel optimization.
#
# The env variable 'RADICAL_PILOT_STRICT_CANCEL == True' will
# disable this optimization.
#
# FIXME: the effect of the env var is not well tested
if 'RADICAL_PILOT_STRICT_CANCEL' not in os.environ:
with self._tasks_lock:
tasks = [self._tasks[uid] for uid in uids ]
task_docs = [task.as_dict() for task in tasks]
self.advance(task_docs, state=rps.CANCELED, publish=True, push=True)
# we *always* issue the cancellation command to the local components
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_tasks',
'arg' : {'uids' : uids,
'tmgr' : self.uid}})
# we also inform all pilots about the cancelation request
self._session._dbs.pilot_command(cmd='cancel_tasks', arg={'uids':uids})
# In the default case of calling 'advance' above, we just set the state,
# so we *know* tasks are canceled.
#
# We do not wait and block the call until all the tasks are marked
# cancelled. This means when inspecting for state just after a state
# change, we may observe a old state, instead of CANCELLED.
#
# This is done so cyclic state change do not get hanged. Example if
# task is changing state and user requests for task to be cancelled, the
# cancelling of task will hang because a previous state change operation
# is ongoing.
# --------------------------------------------------------------------------
#
# TODO: `metric` -> `metrics`, for consistency with `unregister_callback()`
#
[docs] def register_callback(self, cb, cb_data=None, metric=None, uid=None):
"""
Registers a new callback function with the TaskManager. Manager-level
callbacks get called if the specified metric changes. The default
metric `TASK_STATE` fires the callback if any of the Tasks
managed by the PilotManager change their state.
All callback functions need to have the same signature::
def cb(obj, value)
where ``object`` is a handle to the object that triggered the callback,
``value`` is the metric, and ``data`` is the data provided on
callback registration.. In the example of `TASK_STATE` above, the
object would be the task in question, and the value would be the new
state of the task.
If 'cb_data' is given, then the 'cb' signature changes to
def cb(obj, state, cb_data)
and 'cb_data' are passed unchanged.
If 'uid' is given, the callback will invoked only for the specified
task.
Available metrics are:
* `TASK_STATE`: fires when the state of any of the tasks which are
managed by this task manager instance is changing. It communicates
the task object instance and the tasks new state.
* `WAIT_QUEUE_SIZE`: fires when the number of unscheduled tasks (i.e.
of tasks which have not been assigned to a pilot for execution)
changes.
"""
# FIXME: the signature should be (self, metrics, cb, cb_data)
if not metric:
metric = rpc.TASK_STATE
metrics = ru.as_list(metric)
if not uid:
uid = '*'
elif uid not in self._tasks:
raise ValueError('no such task %s' % uid)
with self._tcb_lock:
for metric in metrics:
if metric not in rpc.TMGR_METRICS:
raise ValueError("invalid tmgr metric '%s'" % metric)
cb_id = id(cb)
if metric not in self._callbacks:
self._callbacks[metric] = dict()
if uid not in self._callbacks[metric]:
self._callbacks[metric][uid] = dict()
self._callbacks[metric][uid][cb_id] = {'cb' : cb,
'cb_data': cb_data}
# --------------------------------------------------------------------------
#
def unregister_callback(self, cb=None, metrics=None, uid=None):
if not metrics:
metrics = rpc.TASK_STATE
metrics = ru.as_list(metrics)
if not uid:
uid = '*'
elif uid not in self._tasks:
raise ValueError('no such task %s' % uid)
for metric in metrics:
if metric not in rpc.TMGR_METRICS:
raise ValueError("invalid tmgr metric '%s'" % metric)
with self._tcb_lock:
for metric in metrics:
if metric not in self._callbacks:
raise ValueError("cb metric '%s' invalid" % metric)
if uid not in self._callbacks[metric]:
raise ValueError("cb target '%s' invalid" % uid)
if cb:
to_delete = [id(cb)]
else:
to_delete = list(self._callbacks[metric][uid].keys())
for cb_id in to_delete:
if cb_id not in self._callbacks[metric][uid]:
raise ValueError("cb %s not registered" % cb_id)
del self._callbacks[metric][uid][cb_id]
# ------------------------------------------------------------------------------