# pylint: disable=protected-access
__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import copy
import time
import radical.utils as ru
from . import states as rps
from . import constants as rpc
from .staging_directives import complete_url
# ------------------------------------------------------------------------------
#
[docs]class Pilot(object):
"""
A Pilot represent a resource overlay on a local or remote resource.
.. note:: A Pilot cannot be created directly. The factory method
:meth:`radical.pilot.PilotManager.submit_pilots` has to be
used instead.
**Example**::
pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()
pd.resource = "local.localhost"
pd.cores = 2
pd.runtime = 5 # minutes
pilot = pm.submit_pilots(pd)
"""
# --------------------------------------------------------------------------
# In terms of implementation, a Pilot is not much more than a dict whose
# content are dynamically updated to reflect the state progression through
# the PMGR components. As a Pilot is always created via a PMGR, it is
# considered to *belong* to that PMGR, and all activities are actually
# implemented by that PMGR.
#
# Note that this implies that we could create Pilots before submitting them
# to a PMGR, w/o any problems. (FIXME?)
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
#
def __init__(self, pmgr, descr):
# sanity checks on description
if not descr.runtime:
raise ValueError('pilot runtime must be defined')
if descr.runtime <= 0:
raise ValueError('pilot runtime must be positive')
if not descr.resource:
raise ValueError('pilot target resource must be defined')
# initialize state
self._descr = descr.as_dict()
self._pmgr = pmgr
self._session = self._pmgr.session
self._prof = self._session._prof
self._uid = self._descr.get('uid')
self._state = rps.NEW
self._log = pmgr._log
self._pilot_dict = dict()
self._callbacks = dict()
self._cache = dict() # cache of SAGA dir handles
self._cb_lock = ru.RLock()
# pilot failures can trigger app termination
self._exit_on_error = self._descr.get('exit_on_error')
# ensure uid is unique
if self._uid:
if not self._pmgr.check_uid(self._uid):
raise ValueError('uid %s is not unique' % self._uid)
else:
self._uid = ru.generate_id('pilot.%(item_counter)04d', ru.ID_CUSTOM,
ns=self._session.uid)
for m in rpc.PMGR_METRICS:
self._callbacks[m] = dict()
# we always invoke the default state cb
self._callbacks[rpc.PILOT_STATE][self._default_state_cb.__name__] = {
'cb' : self._default_state_cb,
'cb_data' : None}
# `as_dict()` needs `pilot_dict` and other attributes. Those should all
# be available at this point (apart from the sandboxes), so we now
# query for those sandboxes.
self._pilot_jsurl = ru.Url()
self._pilot_jshop = ru.Url()
self._endpoint_fs = ru.Url()
self._resource_sandbox = ru.Url()
self._session_sandbox = ru.Url()
self._pilot_sandbox = ru.Url()
self._client_sandbox = ru.Url()
pilot = self.as_dict()
self._pilot_jsurl, self._pilot_jshop \
= self._session._get_jsurl (pilot)
self._endpoint_fs = self._session._get_endpoint_fs (pilot)
self._resource_sandbox = self._session._get_resource_sandbox(pilot)
self._session_sandbox = self._session._get_session_sandbox (pilot)
self._pilot_sandbox = self._session._get_pilot_sandbox (pilot)
self._client_sandbox = self._session._get_client_sandbox()
# contexts for staging url expansion
# NOTE: no task sandboxes defined!
self._rem_ctx = {'pwd' : self._pilot_sandbox,
'client' : self._client_sandbox,
'pilot' : self._pilot_sandbox,
'resource': self._resource_sandbox,
'session' : self._session_sandbox,
'endpoint': self._endpoint_fs}
self._loc_ctx = {'pwd' : self._client_sandbox,
'client' : self._client_sandbox,
'pilot' : self._pilot_sandbox,
'resource': self._resource_sandbox,
'session' : self._session_sandbox,
'endpoint': self._endpoint_fs}
# we need to expand plaaceholders in the sandboxes
# FIXME: this code is a duplication from the pilot launcher code
expand = dict()
for k,v in pilot['description'].items():
if v is None:
v = ''
expand['pd.%s' % k] = v
if isinstance(v, str):
expand['pd.%s' % k.upper()] = v.upper()
expand['pd.%s' % k.lower()] = v.lower()
else:
expand['pd.%s' % k.upper()] = v
expand['pd.%s' % k.lower()] = v
self._endpoint_fs .path = self._endpoint_fs .path % expand
self._resource_sandbox.path = self._resource_sandbox.path % expand
self._session_sandbox .path = self._session_sandbox .path % expand
self._pilot_sandbox .path = self._pilot_sandbox .path % expand
# --------------------------------------------------------------------------
#
def __repr__(self):
return '<%s object, uid %s>' % (self.__class__.__qualname__, self._uid)
# --------------------------------------------------------------------------
#
def __str__(self):
return str([self.uid, self.resource, self.state])
# --------------------------------------------------------------------------
#
def _default_state_cb(self, pilot, state=None):
uid = self.uid
state = self.state
self._log.info("[Callback]: pilot %s state: %s.", uid, state)
if state == rps.FAILED and self._exit_on_error:
self._log.error("[Callback]: pilot '%s' failed (exit)", uid)
# There are different ways to tell main...
ru.cancel_main_thread('int')
# raise RuntimeError('pilot %s failed - fatal!' % self.uid)
# os.kill(os.getpid())
# sys.exit()
# --------------------------------------------------------------------------
#
def _update(self, pilot_dict):
'''
This will update the facade object after state changes etc, and is
invoked by whatever component receiving that updated information.
Return True if state changed, False otherwise
'''
self._log.debug('update %s', pilot_dict['uid'])
if pilot_dict['uid'] != self.uid:
self._log.error('invalid uid: %s / %s', pilot_dict['uid'], self.uid)
assert pilot_dict['uid'] == self.uid, 'update called on wrong instance'
# NOTE: this method relies on state updates to arrive in order and
# without gaps.
current = self.state
target = pilot_dict['state']
if target not in [rps.FAILED, rps.CANCELED]:
# ensure valid state transition
state_diff = rps._pilot_state_value(target) - \
rps._pilot_state_value(current)
if state_diff != 1:
raise RuntimeError('%s: invalid state transition %s -> %s',
self.uid, current, target)
self._state = target
# keep all information around
self._pilot_dict = copy.deepcopy(pilot_dict)
# invoke pilot specific callbacks
# FIXME: this iteration needs to be thread-locked!
for _,cb_val in self._callbacks[rpc.PILOT_STATE].items():
cb = cb_val['cb']
cb_data = cb_val['cb_data']
self._log.debug('call %s', cb)
self._log.debug('%s calls cb %s', self.uid, cb)
if cb_data: cb([self], cb_data)
else : cb([self])
# ask pmgr to invoke any global callbacks
self._pmgr._call_pilot_callbacks(self)
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
'''
Returns a Python dictionary representation of the object.
'''
ret = {'session' : self.session.uid,
'pmgr' : self.pmgr.uid,
'uid' : self.uid,
'type' : 'pilot',
'state' : self.state,
'log' : self.log,
'stdout' : self.stdout,
'stderr' : self.stderr,
'resource' : self.resource,
'endpoint_fs' : str(self._endpoint_fs),
'resource_sandbox' : str(self._resource_sandbox),
'session_sandbox' : str(self._session_sandbox),
'pilot_sandbox' : str(self._pilot_sandbox),
'client_sandbox' : str(self._client_sandbox),
'js_url' : str(self._pilot_jsurl),
'js_hop' : str(self._pilot_jshop),
'description' : self.description, # this is a deep copy
'resource_details' : self.resource_details
}
return ret
# --------------------------------------------------------------------------
#
@property
def session(self):
'''
Returns the pilot's session.
**Returns:**
* A :class:`Session`.
'''
return self._session
# --------------------------------------------------------------------------
#
@property
def pmgr(self):
'''
Returns the pilot's manager.
**Returns:**
* A :class:`PilotManager`.
'''
return self._pmgr
# -------------------------------------------------------------------------
#
@property
def resource_details(self):
'''
Returns agent level resource information
'''
return self._pilot_dict.get('resource_details')
# --------------------------------------------------------------------------
#
@property
def uid(self):
'''
Returns the pilot's unique identifier.
The uid identifies the pilot within a :class:`PilotManager`.
**Returns:**
* A unique identifier (string).
'''
return self._uid
# --------------------------------------------------------------------------
#
@property
def state(self):
'''
Returns the current :py:mod:`state <radical.pilot.states>` of the pilot.
**Returns:**
* state (string enum)
'''
return self._state
# --------------------------------------------------------------------------
#
@property
def log(self):
'''
Returns a list of human readable [timestamp, string] tuples describing
various events during the pilot's lifetime. Those strings are not
normative, only informative!
**Returns:**
* log (list of [timestamp, string] tuples)
'''
return self._pilot_dict.get('log')
# --------------------------------------------------------------------------
#
@property
def stdout(self):
'''
Returns a snapshot of the pilot's STDOUT stream.
If this property is queried before the pilot has reached
'DONE' or 'FAILED' state it will return None.
.. warning: This can be inefficient. Output may be incomplete and/or
filtered.
**Returns:**
* stdout (string)
'''
return self._pilot_dict.get('stdout')
# --------------------------------------------------------------------------
#
@property
def stderr(self):
'''
Returns a snapshot of the pilot's STDERR stream.
If this property is queried before the pilot has reached
'DONE' or 'FAILED' state it will return None.
.. warning: This can be inefficient. Output may be incomplete and/or
filtered.
**Returns:**
* stderr (string)
'''
return self._pilot_dict.get('stderr')
# --------------------------------------------------------------------------
#
@property
def resource(self):
'''
Returns the resource tag of this pilot.
**Returns:**
* A resource tag (string)
'''
return self._descr.get('resource')
# --------------------------------------------------------------------------
#
@property
def pilot_sandbox(self):
'''
Returns the full sandbox URL of this pilot, if that is already
known, or 'None' otherwise.
**Returns:**
* A string
'''
# NOTE: The pilot has a sandbox property, containing the full sandbox
# path, which is used by the pmgr to stage data back and forth.
# However, the full path as visible from the pmgr side might not
# be what the agent is seeing, specifically in the case of
# non-shared filesystems (OSG). The agent thus uses
# `$PWD` as sandbox, with the assumption that this will
# get mapped to whatever is here returned as sandbox URL.
#
# There is thus implicit knowledge shared between the RP client
# and the RP agent that `$PWD` *is* the sandbox! The same
# implicitly also holds for the staging area, which is relative
# to the pilot sandbox.
if self._pilot_sandbox:
return str(self._pilot_sandbox)
@property
def endpoint_fs(self):
return self._endpoint_fs
@property
def resource_sandbox(self):
return self._resource_sandbox
@property
def session_sandbox(self):
return self._session_sandbox
@property
def client_sandbox(self):
return self._client_sandbox
# --------------------------------------------------------------------------
#
@property
def description(self):
'''
Returns the description the pilot was started with, as a dictionary.
**Returns:**
* description (dict)
'''
return copy.deepcopy(self._descr)
# --------------------------------------------------------------------------
#
[docs] def register_callback(self, cb, metric=rpc.PILOT_STATE, cb_data=None):
'''
Registers a callback function that is triggered every time the
pilot's state changes.
All callback functions need to have the same signature::
def cb(obj, state)
where ``object`` is a handle to the object that triggered the callback
and ``state`` is the new state of that object. If 'cb_data' is given,
then the 'cb' signature changes to
def cb(obj, state, cb_data)
and 'cb_data' are passed along.
'''
if metric not in rpc.PMGR_METRICS :
raise ValueError ("invalid pmgr metric '%s'" % metric)
with self._cb_lock:
cb_name = cb.__name__
self._callbacks[metric][cb_name] = {'cb' : cb,
'cb_data' : cb_data}
# --------------------------------------------------------------------------
#
def unregister_callback(self, cb, metric=rpc.PILOT_STATE):
if metric and metric not in rpc.PMGR_METRICS :
raise ValueError ("invalid pmgr metric '%s'" % metric)
if not metric : metrics = rpc.PMGR_METRICS
elif not isinstance(metric, list): metrics = [metric]
else : metrics = metric
with self._cb_lock:
for metric in metrics:
if cb: to_delete = [cb.__name__]
else : to_delete = list(self._callbacks[metric].keys())
for cb_name in to_delete:
if cb_name not in self._callbacks[metric]:
raise ValueError("unknown callback '%s'" % cb_name)
del self._callbacks[metric][cb_name]
# --------------------------------------------------------------------------
#
[docs] def wait(self, state=None, timeout=None):
'''
Returns when the pilot reaches a specific state or
when an optional timeout is reached.
**Arguments:**
* **state** [`list of strings`]
The :py:mod:`state(s) <radical.pilot.states>` that pilot has to reach in
order for the call to return.
By default `wait` waits for the pilot to reach a **final**
state, which can be one of the following:
* :data:`radical.pilot.states.DONE`
* :data:`radical.pilot.states.FAILED`
* :data:`radical.pilot.states.CANCELED`
* **timeout** [`float`]
Optional timeout in seconds before the call returns regardless
whether the pilot has reached the desired state or not. The
default value **None** never times out. '''
if not state : states = rps.FINAL
elif not isinstance(state, list): states = [state]
else : states = state
if self.state in rps.FINAL:
# we will never see another state progression. Raise an error
# (unless we waited for this)
if self.state in states:
return
# FIXME: do we want a raise here, really? This introduces a race,
# really, on application level
# raise RuntimeError("can't wait on a pilot in final state")
return self.state
start_wait = time.time()
while self.state not in states:
time.sleep(0.1)
if timeout and (timeout <= (time.time() - start_wait)):
break
if self._pmgr._terminate.is_set():
break
return self.state
# --------------------------------------------------------------------------
#
[docs] def cancel(self):
'''
Cancel the pilot.
'''
# clean connection cache
try:
for key in self._cache:
self._cache[key].close()
self._cache = dict()
except:
pass
self._pmgr.cancel_pilots(self.uid)
# --------------------------------------------------------------------------
#
[docs] def prepare_env(self, env_name, env_spec):
'''
request the preparation of a task or worker environment on the target
resource. This call will block until the env is created.
env_name: name of the environment to prepare (str)
env_spec: specification of the environment to prepare (dict), like:
{'type' : 'venv',
'version' : '3.7',
'pre_exec': ['module load python'],
'setup' : ['radical.pilot==1.0', 'pandas']},
{'type' : 'conda',
'version' : '3.8',
'setup' : ['numpy']}
{'type' : 'conda',
'version': '3.8',
'path' : '/path/to/ve',
'setup' : ['numpy']}
where the `type` specifies the environment type, `version` specifies the
Python version to deploy, and `setup` specifies how the environment is
to be prepared. If `path` is specified the env will be created at that
path. If `path` is not specified, RP will place the named env in the
pilot sandbox (under `env/named_env_<name>`). If a VE exists at that
path, it will be used as is (an update is not performed). `pre_exec`
commands are executed before env creation and setup are attempted.
Note: the `version` specifier is only interpreted up to minor version,
sibminor and less are ignored.
'''
self.rpc('prepare_env', {'env_name': env_name,
'env_spec': env_spec})
# --------------------------------------------------------------------------
#
[docs] def rpc(self, rpc, args):
'''
Send a pilot command, wait for the response, and return the result.
This is basically an RPC into the pilot.
'''
reply = self._session._dbs.pilot_rpc(self.uid, rpc, args)
return reply
# --------------------------------------------------------------------------
#
[docs] def stage_in(self, sds):
'''
Stages the content of the :py:mod:`~radical.pilot.staging_directives` into the
pilot's staging area.
'''
sds = ru.as_list(sds)
for sd in sds:
sd['prof_id'] = self.uid
sd['source'] = str(complete_url(sd['source'], self._loc_ctx, self._log))
sd['target'] = str(complete_url(sd['target'], self._rem_ctx, self._log))
# ask the pmgr to send the staging requests to the stager
self._pmgr._pilot_staging_input(sds)
return [sd['target'] for sd in sds]
# --------------------------------------------------------------------------
#
[docs] def stage_out(self, sds=None):
'''
Fetch files (default:`staging_output.tgz`) from the pilot sandbox.
See :py:mod:`radical.pilot.staging_directives`.
'''
sds = ru.as_list(sds)
if not sds:
sds = [{'source': 'pilot:///staging_output.tgz',
'target': 'client:///staging_output.tgz',
'action': rpc.TRANSFER}]
for sd in sds:
sd['prof_id'] = self.uid
for sd in sds:
sd['source'] = str(complete_url(sd['source'], self._rem_ctx, self._log))
sd['target'] = str(complete_url(sd['target'], self._loc_ctx, self._log))
# ask the pmgr to send the staging reuests to the stager
self._pmgr._pilot_staging_output(sds)
return [sd['target'] for sd in sds]
# ------------------------------------------------------------------------------