9. Setup Task Environment
Different applications come with different requirements for the runtime environment. This section describes how the shell environment for a task can be configured.
The task environment is defined via a Python dictionary, as part of the task description:
cud = rp.TaskDescription()
cud.executable = '/bin/echo'
cud.arguments = ['$RP_TASK_ID greets $TEST']
cud.environment = {'TEST' : 'jabberwocky'}
Warning: Always call radical.pilot.Session.close() before your application terminates to terminate all lingering pilots. You can use the function argument cleanup=True to delete the entries of the session from the database. If you need to retain those data, use the function argument download=True.
This makes the environment variable TEST available during task execution. Some other variables, such as the RP_TASK_ID above, are set by RP internally and are here used for demonstration.
9.1. Running the Example
Below is an example that uses the code above to run a bag of echo commands.
As usual, we start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.
[1]:
import os
import sys
verbose = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose
import radical.pilot as rp
import radical.utils as ru
report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)
================================================================================
Getting Started (RP version 1.18.1)
================================================================================
We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.
We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.
[2]:
from dotenv import load_dotenv
load_dotenv()
RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.2e03c8f2-70bd-11ed-9f46-0242ac110002] \
database : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km] ok
All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.
[3]:
def create_pilot_descriptions(resource):
# read the config used for resource details
report.info('read config')
config = ru.read_json('../config.json')
report.ok('>>ok\n')
report.header('submit pilots')
# Define an [n]-core local pilot that runs for [x] minutes
# Here we use a dict to initialize the description object
pd_init = {
'resource' : resource,
'runtime' : 15, # pilot runtime (min)
'exit_on_error' : True,
'project' : config[resource].get('project', None),
'queue' : config[resource].get('queue', None),
'access_schema' : config[resource].get('schema', None),
'cores' : config[resource].get('cores', 1),
'gpus' : config[resource].get('gpus', 0),
}
pdesc = rp.PilotDescription(pd_init)
return pdesc
[4]:
def launch_pilots(session,pdesc):
# Add a Pilot Manager. Pilot managers manage one or more Pilots.
pmgr = rp.PilotManager(session=session)
# Launch the pilot.
pilot = pmgr.submit_pilots(pdesc)
return pilot
[5]:
def submit_tasks(pilot):
report.header('submit tasks')
# Register the Pilot in a TaskManager object.
tmgr = rp.TaskManager(session=session)
tmgr.add_pilots(pilot)
# Create a workload of Tasks.
# Each task runs a specific `echo` command
n = 10 # number of tasks to run
report.info('create %d task description(s)\n\t' % n)
tds = list()
for i in range(0, n):
# create a new Task description, and fill it.
# Here we don't use dict initialization.
td = rp.TaskDescription()
td.environment = {'TEST' : 'jabberwocky'}
td.executable = '/bin/echo'
td.arguments = ['$RP_TASK_ID greets $TEST']
tds.append(td)
report.progress()
report.ok('>>ok\n')
# Submit the previously created Task descriptions to the
# PilotManager. This will trigger the selected scheduler to start
# assigning Tasks to the Pilots.
tasks = tmgr.submit_tasks(tds)
# Wait for all tasks to reach a final state (DONE, CANCELED or FAILED).
report.header('gather results')
tmgr.wait_tasks()
return tasks
[6]:
def report_task_progress(tasks):
report.info('\n')
for task in tasks:
report.plain(' * %s: %s, exit: %3s, out: %s\n'
% (task.uid, task.state[:4],
task.exit_code, task.stdout.strip()[:35]))
[7]:
try:
pdesc = create_pilot_descriptions('local.localhost')
pilots = launch_pilots(session,pdesc)
tasks = submit_tasks(pilots)
report_task_progress(tasks)
except Exception as e:
# Something unexpected happened in the pilot code above
report.error('caught Exception: %s\n' % e)
raise
except (KeyboardInterrupt, SystemExit):
# the callback called sys.exit(), and we can here catch the
# corresponding KeyboardInterrupt exception for shutdown. We also catch
# SystemExit (which gets raised if the main threads exits for some other
# reason).
report.warn('exit requested\n')
finally:
# always clean up the session, no matter if we caught an exception or
# not. This will kill all remaining pilots.
report.header('finalize')
session.close(cleanup=True)
report.header()
read config ok
--------------------------------------------------------------------------------
submit pilots
create pilot manager ok
submit 1 pilot(s)
pilot.0000 local.localhost 1 cores 0 gpus ok
--------------------------------------------------------------------------------
submit tasks
create task manager ok
create 10 task description(s)
.......... ok
submit: ########################################################################
--------------------------------------------------------------------------------
gather results
wait : ########################################################################
DONE : 10
ok
* task.000000: DONE, exit: 0, out: task.000000 greets jabberwocky
* task.000001: DONE, exit: 0, out: task.000001 greets jabberwocky
* task.000002: DONE, exit: 0, out: task.000002 greets jabberwocky
* task.000003: DONE, exit: 0, out: task.000003 greets jabberwocky
* task.000004: DONE, exit: 0, out: task.000004 greets jabberwocky
* task.000005: DONE, exit: 0, out: task.000005 greets jabberwocky
* task.000006: DONE, exit: 0, out: task.000006 greets jabberwocky
* task.000007: DONE, exit: 0, out: task.000007 greets jabberwocky
* task.000008: DONE, exit: 0, out: task.000008 greets jabberwocky
* task.000009: DONE, exit: 0, out: task.000009 greets jabberwocky
--------------------------------------------------------------------------------
finalize
closing session rp.session.2e03c8f2-70bd-11ed-9f46-0242ac110002 \
close task manager ok
close pilot manager \
wait for 1 pilot(s)
O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\| 0 timeout
ok
session lifetime: 63.7s ok
--------------------------------------------------------------------------------