9. Setup Task Environment

Different applications come with different requirements for the runtime environment. This section describes how the shell environment for a task can be configured.

The task environment is defined via a Python dictionary, as part of the task description:

cud = rp.TaskDescription()

cud.executable  = '/bin/echo'
cud.arguments   = ['$RP_TASK_ID greets $TEST']
cud.environment = {'TEST' : 'jabberwocky'}

Warning: Always call radical.pilot.Session.close() before your application terminates to terminate all lingering pilots. You can use the function argument cleanup=True to delete the entries of the session from the database. If you need to retain those data, use the function argument download=True.

This makes the environment variable TEST available during task execution. Some other variables, such as the RP_TASK_ID above, are set by RP internally and are here used for demonstration.

9.1. Running the Example

Below is an example that uses the code above to run a bag of echo commands.

As usual, we start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[1]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version 1.18.1)
================================================================================


We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

[2]:
from dotenv import load_dotenv
load_dotenv()

RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.2e03c8f2-70bd-11ed-9f46-0242ac110002]                 \
database   : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km]           ok

All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.

[3]:
def create_pilot_descriptions(resource):
    # read the config used for resource details
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')

    report.header('submit pilots')

    # Define an [n]-core local pilot that runs for [x] minutes
    # Here we use a dict to initialize the description object
    pd_init = {
               'resource'      : resource,
               'runtime'       : 15,  # pilot runtime (min)
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
              }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc
[4]:
def launch_pilots(session,pdesc):
    # Add a Pilot Manager. Pilot managers manage one or more Pilots.
    pmgr = rp.PilotManager(session=session)
    # Launch the pilot.
    pilot = pmgr.submit_pilots(pdesc)
    return pilot

[5]:
def submit_tasks(pilot):
    report.header('submit tasks')

    # Register the Pilot in a TaskManager object.
    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilot)

    # Create a workload of Tasks.
    # Each task runs a specific `echo` command

    n = 10   # number of tasks to run
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):

        # create a new Task description, and fill it.
        # Here we don't use dict initialization.
        td = rp.TaskDescription()
        td.environment = {'TEST' : 'jabberwocky'}
        td.executable  = '/bin/echo'
        td.arguments   = ['$RP_TASK_ID greets $TEST']

        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    # Submit the previously created Task descriptions to the
    # PilotManager. This will trigger the selected scheduler to start
    # assigning Tasks to the Pilots.
    tasks = tmgr.submit_tasks(tds)

    # Wait for all tasks to reach a final state (DONE, CANCELED or FAILED).
    report.header('gather results')
    tmgr.wait_tasks()
    return tasks
[6]:
def report_task_progress(tasks):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout.strip()[:35]))
[7]:
try:
    pdesc = create_pilot_descriptions('local.localhost')
    pilots = launch_pilots(session,pdesc)
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)

except Exception as e:
    # Something unexpected happened in the pilot code above
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    # the callback called sys.exit(), and we can here catch the
    # corresponding KeyboardInterrupt exception for shutdown.  We also catch
    # SystemExit (which gets raised if the main threads exits for some other
    # reason).
    report.warn('exit requested\n')

finally:
    # always clean up the session, no matter if we caught an exception or
    # not.  This will kill all remaining pilots.
    report.header('finalize')
    session.close(cleanup=True)

report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok

--------------------------------------------------------------------------------
submit tasks

create task manager                                                           ok
create 10 task description(s)
        ..........                                                            ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :    10
                                                                              ok

  * task.000000: DONE, exit:   0, out: task.000000 greets jabberwocky
  * task.000001: DONE, exit:   0, out: task.000001 greets jabberwocky
  * task.000002: DONE, exit:   0, out: task.000002 greets jabberwocky
  * task.000003: DONE, exit:   0, out: task.000003 greets jabberwocky
  * task.000004: DONE, exit:   0, out: task.000004 greets jabberwocky
  * task.000005: DONE, exit:   0, out: task.000005 greets jabberwocky
  * task.000006: DONE, exit:   0, out: task.000006 greets jabberwocky
  * task.000007: DONE, exit:   0, out: task.000007 greets jabberwocky
  * task.000008: DONE, exit:   0, out: task.000008 greets jabberwocky
  * task.000009: DONE, exit:   0, out: task.000009 greets jabberwocky

--------------------------------------------------------------------------------
finalize

closing session rp.session.2e03c8f2-70bd-11ed-9f46-0242ac110002                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)
        O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|      0                                                          timeout
                                                                              ok
session lifetime: 63.7s                                                       ok

--------------------------------------------------------------------------------