6. Staging Task Input Data
The vast majority of applications operate on data, and many of those read input data from files. Since RP provides an abstraction above the resource layer, it can run a task on any pilot the application created (see Selecting a Task Scheduler). To ensure that the task finds the data it needs on the resource where it runs, RP provides a mechanism to stage input data automatically.
If source and target file names are the same, and if action is the default rp.TRANSFER, then you can simply specify task input data by giving a list of file names (we’ll discuss more complex staging directives in a later example):
cud = rp.TaskDescription()
cud.executable = '/usr/bin/wc'
cud.arguments = ['-c', 'input.dat']
cud.input_staging = ['input.dat']
6.1. Running the Example
Below is an example application which uses the above code block. It otherwise does not differ from our earlier examples (but only adds on-th-fly creation of input.dat).
The result of this example’s execution is straight forward, as expected, but proves that the file staging happened as planned. You will likely notice though that the code runs significantly longer than earlier ones, because of the file staging overhead.
We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.
[1]:
import os
import sys
verbose = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose
import radical.pilot as rp
import radical.utils as ru
report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)
================================================================================
Getting Started (RP version 1.18.1)
================================================================================
We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.
We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.
[2]:
from dotenv import load_dotenv
load_dotenv()
RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.974ca4f6-70bc-11ed-bf55-0242ac110002] \
database : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km] ok
All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.
[3]:
def initialize_desc_object(resource):
report.info('read config')
config = ru.read_json('../config.json')
report.ok('>>ok\n')
report.header('submit pilots')
pdescs = list()
pd_init = {
'resource' : resource,
'runtime' : 15, # pilot runtime (min)
'exit_on_error' : True,
'project' : config[resource].get('project', None),
'queue' : config[resource].get('queue', None),
'access_schema' : config[resource].get('schema', None),
'cores' : config[resource].get('cores', 1),
'gpus' : config[resource].get('gpus', 0),
}
pdesc = rp.PilotDescription(pd_init)
return pdesc
[4]:
def launch_pilots(session,pdesc):
pmgr = rp.PilotManager(session=session)
pilots = pmgr.submit_pilots(pdesc)
return pilots
In this function, we first register the pilot in a TaskManager object. We then create a workload of char-counting a simple file. For this we create a the file right here and then use it as task input data for each task.
After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.
[5]:
def submit_tasks(pilots):
report.header('submit tasks')
tmgr = rp.TaskManager(session=session)
tmgr.add_pilots(pilots)
os.system('hostname > input.dat')
os.system('date >> input.dat')
n = 10
report.info('create %d task description(s)\n\t' % n)
tds = list()
for i in range(0, n):
td = rp.TaskDescription()
td.executable = '/usr/bin/wc'
td.arguments = ['-c', 'input.dat']
td.input_staging = {'source': 'client:///input.dat',
'target': 'task:///input.dat',
'action': rp.TRANSFER}
tds.append(td)
report.progress()
report.ok('>>ok\n')
tasks = tmgr.submit_tasks(tds)
report.header('gather results')
tmgr.wait_tasks()
return tasks
We create the report_task_progress function to report the task status of each task
[6]:
def report_task_progress(tasks):
report.info('\n')
for task in tasks:
report.plain(' * %s: %s, exit: %3s, out: %s\n'
% (task.uid, task.state[:4],
task.exit_code, task.stdout.strip()[:35]))
os.system('rm input.dat')
We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.
[7]:
try:
pdesc = initialize_desc_object(resource)
pilots = launch_pilots(session,pdesc)
tasks = submit_tasks(pilots)
report_task_progress(tasks)
except Exception as e:
report.error('caught Exception: %s\n' % e)
raise
except (KeyboardInterrupt, SystemExit):
report.warn('exit requested\n')
finally:
report.header('finalize')
session.close(cleanup=True)
report.header()
read config ok
--------------------------------------------------------------------------------
submit pilots
create pilot manager ok
submit 1 pilot(s)
pilot.0000 local.localhost 1 cores 0 gpus ok
--------------------------------------------------------------------------------
submit tasks
create task manager ok
create 10 task description(s)
.......... ok
submit: ########################################################################
--------------------------------------------------------------------------------
gather results
wait : ########################################################################
DONE : 10
ok
* task.000000: DONE, exit: 0, out: 80 input.dat
* task.000001: DONE, exit: 0, out: 80 input.dat
* task.000002: DONE, exit: 0, out: 80 input.dat
* task.000003: DONE, exit: 0, out: 80 input.dat
* task.000004: DONE, exit: 0, out: 80 input.dat
* task.000005: DONE, exit: 0, out: 80 input.dat
* task.000006: DONE, exit: 0, out: 80 input.dat
* task.000007: DONE, exit: 0, out: 80 input.dat
* task.000008: DONE, exit: 0, out: 80 input.dat
* task.000009: DONE, exit: 0, out: 80 input.dat
--------------------------------------------------------------------------------
finalize
closing session rp.session.974ca4f6-70bc-11ed-bf55-0242ac110002 \
close task manager ok
close pilot manager \
wait for 1 pilot(s)
O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\| 0 timeout
ok
session lifetime: 62.3s ok
--------------------------------------------------------------------------------
[ ]: