3. Handle Failing Tasks
All applications can fail, often for reasons out of control of the user. A Task is no different, it can fail as well. Many non-trivial application will need to have a way to handle failing tasks – detecting the failure is the first and necessary step to do so, and RP makes that part easy: RP’s task state model defines that a failing task will immediately go into FAILED state, and that state information is available as task.state property.
The task also has the task.stderr property available for further inspection into causes of the failure – that will only be available though if the task did reach the EXECUTING state in the first place.
[1]:
%load_ext dotenv
%dotenv ../../../.env
cannot find .env file
We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.
[2]:
import os
import sys
verbose = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose
import radical.pilot as rp
import radical.utils as ru
# we use a reporter class for nicer output
report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)
================================================================================
Getting Started (RP version 1.18.1)
================================================================================
We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.
[3]:
resource = 'local.localhost'
To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file. Create a new session. No need to try/except this: if session creation fails, there is not much we can do anyways…
[4]:
session = rp.Session()
new session: [rp.session.1fc3bee2-70bc-11ed-9394-0242ac110002] \
database : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km] ok
All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.
[5]:
def create_pilot_descriptions(resources):
report.info('read config')
config = ru.read_json('../config.json')
# config = ru.read_json('%s/config.json' % os.path.dirname(os.path.abspath(__file__)))
report.ok('>>ok\n')
report.header('submit pilots')
n = 1
pdescs = list()
for resource in resources:
# Define an [n]-core local pilot that runs for [x] minutes
# Here we use a dict to initialize the description object
for i in range(n):
pd_init = {
'resource' : resource,
'runtime' : 60, # pilot runtime (min)
'exit_on_error' : True,
'project' : config[resource].get('project', None),
'queue' : config[resource].get('queue', None),
'access_schema' : config[resource].get('schema', None),
'cores' : config[resource].get('cores', 1),
'gpus' : config[resource].get('gpus', 0),
}
pdesc = rp.PilotDescription(pd_init)
pdescs.append(pdesc)
return pdescs
Add a PilotManager. PilotManagers manage one or more pilots.
[6]:
def launch_pilots(session,pdesc):
pmgr = rp.PilotManager(session=session)
pilots = pmgr.submit_pilots(pdesc)
return pilots
[7]:
def submit_tasks(pilots):
report.header('submit tasks')
tmgr = rp.TaskManager(session=session)
tmgr.add_pilots(pilots)
# Each task runs '/bin/date'.
n = 10 # number of tasks to run
report.info('create %d task description(s)\n\t' % n)
tds = list()
for i in range(0, n):
td = rp.TaskDescription()
if i % 10:
td.executable = '/bin/date'
else:
# trigger an error now and then
td.executable = '/bin/data' # does not exist
tds.append(td)
report.progress()
report.ok('>>ok\n')
tasks = tmgr.submit_tasks(tds)
report.header('gather results')
tmgr.wait_tasks()
return tasks
We create the report_task_progress function to report the task status of each task
[8]:
def report_task_progress(tasks):
report.info('\n')
for task in tasks:
if task.state in [rp.FAILED, rp.CANCELED]:
report.plain(' * %s: %s, exit: %5s, err: -%35s-'
% (task.uid, task.state[:4],
task.exit_code, task.stderr))
report.error('>>err\n')
else:
report.plain(' * %s: %s, exit: %5s, out: %35s'
% (task.uid, task.state[:4],
task.exit_code, task.stdout))
report.ok('>>ok\n')
We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.
[9]:
try:
pdesc = create_pilot_descriptions(['local.localhost'])
pilots = launch_pilots(session,pdesc)
tasks = submit_tasks(pilots)
report_task_progress(tasks)
except Exception as e:
# Something unexpected happened in the pilot code above
report.error('caught Exception: %s\n' % e)
raise
except (KeyboardInterrupt, SystemExit):
# the callback called sys.exit(), and we can here catch the
# corresponding KeyboardInterrupt exception for shutdown. We also catch
# SystemExit (which gets raised if the main threads exits for some other
# reason).
report.warn('exit requested\n')
finally:
# always clean up the session, no matter if we caught an exception or
# not. This will kill all remaining pilots.
report.header('finalize')
session.close(cleanup=True)
report.header()
read config ok
--------------------------------------------------------------------------------
submit pilots
create pilot manager ok
submit 1 pilot(s)
pilot.0000 local.localhost 1 cores 0 gpus ok
--------------------------------------------------------------------------------
submit tasks
create task manager ok
create 10 task description(s)
.......... ok
submit: ########################################################################
--------------------------------------------------------------------------------
gather results
wait : ########################################################################
DONE : 9
FAILED : 1
ok
* task.000000: FAIL, exit: 127, err: -/home/docs/radical.pilot.sandbox/rp.session.1fc3bee2-70bc-11ed-9394-0242ac110002//pilot.0000//task.000000/task.000000.exec.sh: 37: /home/docs/radical.pilot.sandbox/rp.session.1fc3bee2-70bc-11ed-9394-0242ac110002//pilot.0000//task.000000/task.000000.exec.sh: /bin/data: not found
- err
* task.000001: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000002: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000003: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000004: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000005: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000006: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000007: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000008: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
* task.000009: DONE, exit: 0, out: Wed Nov 30 14:35:22 UTC 2022
ok
--------------------------------------------------------------------------------
finalize
closing session rp.session.1fc3bee2-70bc-11ed-9394-0242ac110002 \
close task manager ok
close pilot manager \
wait for 1 pilot(s)
O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\| 0 timeout
ok
session lifetime: 64.8s ok
--------------------------------------------------------------------------------