8. Sharing Task Input Data

RP supports the concurrent execution of many tasks and, often, these tasks share some or all their input data, i.e., files. We have seen earlier that input staging can incur a significant runtime overhead. Such an overhead can be significantly reduced by avoiding redundant file staging operations.

Each RP pilot manages a shared data space where to store tasks’ input files. First, RP can stage input files into the shared data space of a pilot. Second, that pilot can create symbolic links (symlinks) in the work directory of each task to any file in the shared data space. In this way, set of tasks can access the same file, avoiding costly staging and replicating operations.

Stage shared data from pwd to the pilot’s shared data space

pilot.stage_in({'source': 'file://%s/input.dat' % os.getcwd(),
                'target': 'staging:///input.dat',
                'action': rp.TRANSFER})

Create a symlink in the work directory of each task to the file input.dat

for i in range(0, n):
    cud = rp.TaskDescription()

    cud.executable     = '/usr/bin/wc'
    cud.arguments      = ['-c', 'input.dat']
    cud.input_staging  = {'source': 'staging:///input.dat',
                          'target': 'input.dat',
                          'action': rp.LINK}

The rp.LINK staging action creates a symlink, avoiding the copy operation used by the rp.TRANSFER action.

Note: Unlike other methods in RP, the pilot.stage_in method is synchronous, i.e., it only returns once the transfer is completed. This may change in a future version of RP.

8.1. Running the Example

The output of the below example is the same as section 3.6, but the script should run significantly faster due to the removed staging redundancy, especially for non-local pilots.

[1]:
from dotenv import load_dotenv
load_dotenv()

[1]:
False

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[2]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version 1.18.1)
================================================================================


We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

[3]:
RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.00f1de08-70bd-11ed-99a4-0242ac110002]                 \
database   : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km]           ok

In the create_pilot_description function, we create a dictionary to initialize the pilot description object.

[4]:
def create_pilot_description(resources):
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')

    report.header('submit pilots')

    pmgr = rp.PilotManager(session=session)

    pd_init = {
               'resource'      : resource,
               'runtime'       : 15,
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
              }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc

launch_pilots adds a PilotManager for managing one or more pilots.

[5]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots

Create a workload of char-counting a simple file. We first create the file right here, and stage it to the pilot ‘shared_data’ space. We then synchronously stage the data to the pilot

[6]:
def create_and_stage_input_data(pilot):
    os.system('hostname >  input.dat')
    os.system('date     >> input.dat')

    report.info('stage in shared data')
    pilot.stage_in({'source': 'client:///input.dat',
                    'target': 'pilot:///input.dat',
                    'action': rp.TRANSFER})
    report.ok('>>ok\n')

In the submit_tasks function, we first register the pilot in a TaskManager object.

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use tmgr.wait_tasks()to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

[7]:
def submit_tasks(pilot):
    report.header('submit tasks')

    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilot)

    n = 10
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    outs = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/bin/cat'
        td.arguments      = ['input.dat']
        td.stdout         = 'STDOUT'
        td.input_staging  = {'source': 'pilot:///input.dat',
                             'target': 'task:///input.dat',
                             'action': rp.LINK}
        td.output_staging = {'source': 'task:///STDOUT',
                             'target': 'pilot:///STDOUT.%06d' % i,
                             'action': rp.COPY}
        outs.append('STDOUT.%06d' % i)
        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    tasks = tmgr.submit_tasks(tds)
    report.header('gather results')
    tmgr.wait_tasks()
    return tasks,outs

We create the report_task_progress function to report the task status of each task

[8]:
def report_task_progress(tasks,pilot,outs):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout.strip()[:35]))

    os.system('rm input.dat')

    report.info('stage out shared data')
    pilot.stage_out([{'source': 'pilot:///%s'  % fname,
                      'target': 'client:///%s' % fname,
                      'action': rp.TRANSFER} for fname in outs])
    report.ok('>>ok\n')

We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

[9]:
try:
    pdesc = create_pilot_description(resource)
    pilots = launch_pilots(session,pdesc)
    create_and_stage_input_data(pilots)
    tasks,outs = submit_tasks(pilots)
    report_task_progress(tasks,pilots,outs)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise
except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')
finally:
    report.header('finalize')
    session.close(cleanup=True)
report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager                                                          ok
create pilot manager                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok
stage in shared data                                                          ok

--------------------------------------------------------------------------------
submit tasks

create task manager                                                           ok
create 10 task description(s)
        ..........                                                            ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :    10
                                                                              ok

  * task.000000: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000001: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000002: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000003: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000004: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000005: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000006: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000007: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000008: DONE, exit:   0, out: build-18779102-project-707762-radic
  * task.000009: DONE, exit:   0, out: build-18779102-project-707762-radic
stage out shared data                                                         ok

--------------------------------------------------------------------------------
finalize

closing session rp.session.00f1de08-70bd-11ed-99a4-0242ac110002                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 0 pilot(s)
        O      0                                                               ok
                                                                              ok
close pilot manager                                                            \
wait for 1 pilot(s)
        O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|      0                                                          timeout
                                                                              ok
session lifetime: 73.2s                                                       ok

--------------------------------------------------------------------------------


[ ]: