4. Use Multiple Pilots

We have seen in the previous examples how an RP pilot acts as a container for multiple task executions. There is in principle no limit on how many of those pilots are used to execute a specific workload, and specifically, pilots don’t need to run on the same resource!

The below example demonstrates that. Instead of creating one pilot description, we here create one for any resource specified as command line parameter, no matter if those parameters point to the same resource targets or not. The tasks are distributed over the created set of pilots according to some scheduling mechanism – section Selecting a Task Scheduler will discuss how an application can choose between different scheduling policies. The default policy used here is Round Robin.

4.1. Running the Example

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[1]:
%load_ext dotenv
%dotenv ../../../.env
cannot find .env file
[2]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version 1.18.1)
================================================================================


We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

Another point to be noted is that in this example, we exemplarily start a pilot on local.localhost, and one on xsede.stampede:

[3]:
from dotenv import load_dotenv
load_dotenv()

sys.argv.append('local.localhost')
sys.argv.append('xsede.stampede')

if len(sys.argv) >= 2  : resources = sys.argv[1:]
else                   : resources = ['local.localhost']
session = rp.Session()
# TODO
resources = ['local.localhost']
new session: [rp.session.47db751e-70bc-11ed-a526-0242ac110002]                 \
database   : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km]           ok

All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.

[4]:
def initialize_desc_object(resources):
    print(resources)
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')
    report.header('submit pilots')
    pdescs = list()
    for resource in resources:
        pd_init = {
                   'resource'      : resource,
                   'runtime'       : 15,  # pilot runtime (min)
                   'exit_on_error' : True,
                   'project'       : config[resource].get('project', None),
                   'queue'         : config[resource].get('queue', None),
                   'access_schema' : config[resource].get('schema', None),
                   'cores'         : config[resource].get('cores', 1),
                   'gpus'          : config[resource].get('gpus', 0),
                  }
        pdescs.append(rp.PilotDescription(pd_init))
    return pdescs
[5]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots

In the submit_tasks function, we first register the pilot in a TaskManager object. We then create a workload of tasks. Each task reports the id of the pilot it runs on. After this we initialize the number of tasks(n=128) and create a new Task description.

[6]:
def submit_tasks(pilots):
    tasks = None
    for gen in range(1):
        report.header('submit tasks [%d]' % gen)
        tmgr = rp.TaskManager(session=session)
        tmgr.add_pilots(pilots)

        n = 10
        report.info('create %d task description(s)\n\t' % n)

        tds = list()
        for i in range(0, n):
            td = rp.TaskDescription()
            td.executable = '/bin/echo'
            td.arguments  = ['$RP_PILOT_ID']

            tds.append(td)
            report.progress()
        report.ok('>>ok\n')
        tasks = tmgr.submit_tasks(tds)
        report.header('gather results')
        tmgr.wait_tasks()

    report.info('\n')
    return tasks

We create the report_task_progress function to report the task status of each task

[7]:
def report_task_progress(tasks):
    counts = dict()
    for task in tasks:
        out_str = task.stdout.strip()[:35]
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, out_str))
        if out_str not in counts:
            counts[out_str] = 0
        counts[out_str] += 1

    report.info("\n")
    for out_str in counts:
        report.info("  * %-20s: %3d\n" % (out_str, counts[out_str]))
    report.info("  * %-20s: %3d\n" % ('total', sum(counts.values())))

We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

[8]:
try:
    pdesc = initialize_desc_object(resources)
    pilots = launch_pilots(session,pdesc)
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')

finally:
    report.header('finalize')
    session.close(cleanup=True)

report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager
['local.localhost']
                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok

--------------------------------------------------------------------------------
submit tasks [0]

create task manager                                                           ok
create 10 task description(s)
        ..........                                                            ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :    10
                                                                              ok

  * task.000000: DONE, exit:   0, out: pilot.0000
  * task.000001: DONE, exit:   0, out: pilot.0000
  * task.000002: DONE, exit:   0, out: pilot.0000
  * task.000003: DONE, exit:   0, out: pilot.0000
  * task.000004: DONE, exit:   0, out: pilot.0000
  * task.000005: DONE, exit:   0, out: pilot.0000
  * task.000006: DONE, exit:   0, out: pilot.0000
  * task.000007: DONE, exit:   0, out: pilot.0000
  * task.000008: DONE, exit:   0, out: pilot.0000
  * task.000009: DONE, exit:   0, out: pilot.0000

  * pilot.0000          :  10
  * total               :  10

--------------------------------------------------------------------------------
finalize

closing session rp.session.47db751e-70bc-11ed-a526-0242ac110002                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)
        O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|      0                                                          timeout
                                                                              ok
session lifetime: 63.9s                                                       ok

--------------------------------------------------------------------------------