7. Staging Task Output Data

Upon completion, tasks often create some amount of data. We have seen in Obtaining Task Details how we can inspect the task’s stdout string, but that will not be useful beyond the most trivial workloads. This section shows how to stage the output data of tasks back to the RP application, and/or to arbitrary storage locations and devices.

In principle, output staging is specified as the input staging discussed in the previous section:

  • source: what files need to be staged from the context of the task that terminated execution

  • target: where should the files be staged to

  • action: how should files be staged.

Note that in this example we specify the output file name to be changed to a unique name during staging:

for i in range(0, n):
    cud.executable     = '/bin/cp'
    cud.arguments      = ['-v', 'input.dat', 'output.dat']
    cud.input_staging  = ['input.dat']
    cud.output_staging = {'source': 'output.dat',
                          'target': 'output_%03d.dat' % i,
                          'action': rp.TRANSFER}

7.1. Running the Example

Below is an example application which uses the code block above.

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[1]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version 1.18.1)
================================================================================


We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

[2]:
from dotenv import load_dotenv
load_dotenv()

RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.bdcacd4c-70bc-11ed-a377-0242ac110002]                 \
database   : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km]           ok

In the create_pilot_description function, we create a dictionary to initialize the pilot description object.

[3]:
def create_pilot_description(resources):
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')
    report.header('submit pilots')
    pdescs = list()
    pd_init = {
               'resource'      : resource,
               'runtime'       : 15,  # pilot runtime (min)
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
              }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc

launch_pilots adds a PilotManager for managing one or more pilots.

[4]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots

In the submit_tasks function, we first register the pilot in a TaskManager object. We then create a workload of char-counting a simple file. For this we create a the file right here and then use it as task input data for each task.

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use tmgr.wait_tasks()to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

[5]:
def submit_tasks(pilots):
    report.header('submit tasks')

    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilots)


    os.system('hostname >  input.dat')
    os.system('date     >> input.dat')

    n = 128
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/bin/cp'
        td.arguments      = ['-v', 'input.dat', 'output.dat']
        td.input_staging  = ['input.dat']
        td.output_staging = {'source': 'task:///output.dat',
                              'target': 'client:///output_%03d.dat' % i,
                              'action': rp.TRANSFER}

        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    tasks = tmgr.submit_tasks(tds)

    report.header('gather results')
    tmgr.wait_tasks()
    return tasks

We create the report_task_progress function to report the task status of each task

[6]:
def report_task_progress(tasks):
    counts = dict()
    for task in tasks:
        out_str = task.stdout.strip()[:35]
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, out_str))
        if out_str not in counts:
            counts[out_str] = 0
        counts[out_str] += 1

    report.info("\n")
    for out_str in counts:
        report.info("  * %-20s: %3d\n" % (out_str, counts[out_str]))
    report.info("  * %-20s: %3d\n" % ('total', sum(counts.values())))

We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

[7]:
try:
    pdesc = create_pilot_description(resource)
    pilots = launch_pilots(session,pdesc)
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')

finally:
    report.header('finalize')
    session.close(cleanup=True)

report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok

--------------------------------------------------------------------------------
submit tasks

create task manager                                                           ok
create 128 task description(s)
        ........................................................................
        ........................................................              ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :   128
                                                                              ok
  * task.000000: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000001: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000002: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000003: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000004: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000005: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000006: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000007: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000008: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000009: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000010: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000011: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000012: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000013: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000014: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000015: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000016: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000017: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000018: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000019: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000020: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000021: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000022: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000023: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000024: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000025: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000026: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000027: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000028: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000029: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000030: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000031: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000032: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000033: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000034: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000035: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000036: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000037: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000038: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000039: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000040: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000041: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000042: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000043: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000044: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000045: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000046: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000047: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000048: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000049: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000050: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000051: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000052: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000053: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000054: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000055: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000056: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000057: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000058: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000059: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000060: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000061: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000062: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000063: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000064: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000065: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000066: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000067: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000068: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000069: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000070: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000071: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000072: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000073: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000074: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000075: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000076: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000077: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000078: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000079: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000080: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000081: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000082: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000083: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000084: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000085: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000086: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000087: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000088: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000089: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000090: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000091: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000092: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000093: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000094: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000095: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000096: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000097: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000098: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000099: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000100: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000101: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000102: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000103: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000104: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000105: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000106: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000107: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000108: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000109: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000110: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000111: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000112: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000113: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000114: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000115: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000116: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000117: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000118: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000119: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000120: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000121: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000122: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000123: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000124: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000125: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000126: DONE, exit:   0, out: 'input.dat' -> 'output.dat'
  * task.000127: DONE, exit:   0, out: 'input.dat' -> 'output.dat'

  * 'input.dat' -> 'output.dat': 128
  * total               : 128

--------------------------------------------------------------------------------
finalize

closing session rp.session.bdcacd4c-70bc-11ed-a377-0242ac110002                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)
        O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|      0                                                          timeout
                                                                              ok
session lifetime: 110.0s                                                      ok

--------------------------------------------------------------------------------