2. Obtaining Task Details

The previous chapter discussed the basic features of RP, how to submit a pilot, and how to submit tasks to that pilot for execution. Here, we show how an application can inspect the details of that execution, after the tasks complete.

Note that we capture the return value of submit_tasks() which is in fact a list of Task instances. We use those instances for inspection later on, after we waited for their completion. Inspection is also available earlier, but may then yield incomplete results. Note that a task always has a state throughout its life span, according to the state model discussed in RADICAL-Pilot (RP) - Overview.

The code block below shows how to report information about task state, exit code, and standard output. Later, we will see that standard error is handled equivalently.

report.plain('  * %s: %s, exit: %3s, out: %s\n' \
        % (task.uid, task.state[:4],
            task.exit_code, task.stdout.strip()[:35]))

Reporting standard output in this way is a convenience method that cannot replace proper staging of output files. The string returned by task.stdout.strip()[:35] will be shortened on very long outputs (longer than 1kB by default) and it may contain information from RP which is not part of the standard output of the application. The proper staging of output files will be discussed in a later example.

2.1. Running the Example

We will import the dotenv module for fetching our environment variables.

[1]:
%load_ext dotenv
%dotenv ../../../.env
Python-dotenv could not parse statement starting at line 6

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[2]:
import os
import sys

import radical.pilot as rp
import radical.utils as ru
report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)


================================================================================
 Getting Started (RP version 1.17.0)
================================================================================


We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

[3]:
resource = 'local.localhost'

To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

[4]:
session = rp.Session()
new session: [rp.session.rivendell.merzky.019262.0015]                         \
database   : [mongodb://localhost/am]                                         ok

In the create_pilot_description function, we create a dictionary to initialize the pilot description object.

[5]:
def create_pilot_description():
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')

    report.header('submit pilots')

    pd_init = {'resource'      : 'local.localhost',
               'runtime'       : 15,
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
               }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc

launch_pilots adds a PilotManager for managing one or more pilots.

[6]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots

In the submit_tasks function, we first register the pilot in a TaskManager object.

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use tmgr.wait_tasks()to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

[7]:
def submit_tasks(pilots):
    report.header('submit tasks')

    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilots)

    n = 10
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable = '/bin/date'
        tds.append(td)
        report.progress()

    report.ok('>>ok\n')
    tasks = tmgr.submit_tasks(tds)
    report.header('gather results')
    tmgr.wait_tasks()
    return tasks

We create the report_task_progress function to report the task status of each task

[8]:
def report_task_progress(tasks):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout[:35]))

    task_dict = tasks[0].as_dict()
    report.plain("task workdir : %s\n" % task_dict['task_sandbox'])
    report.plain("pilot id     : %s\n" % task_dict['pilot'])
    report.plain("exit code    : %s\n" % task_dict['exit_code'])
    report.plain("stdout       : %s\n" % task_dict['stdout'])

    task_dict = tasks[1].as_dict()
    report.plain("task workdir : %s\n" % task_dict['task_sandbox'])
    report.plain("pilot id     : %s\n" % task_dict['pilot'])
    report.plain("exit code    : %s\n" % task_dict['exit_code'])
    report.plain("exit stdout  : %s\n" % task_dict['stdout'])

We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

[ ]:
try:
    pdesc = create_pilot_description()
    pilots = launch_pilots(session,pdesc)
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')
finally:
    report.header('finalize')
    session.close(cleanup=True)
report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok

--------------------------------------------------------------------------------
submit tasks

create task manager                                                           ok
create 10 task description(s)
        ..........                                                            ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :    10
                                                                              ok

  * task.000000: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000001: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000002: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000003: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000004: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000005: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000006: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000007: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000008: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

  * task.000009: DONE, exit:   0, out: Tue Sep 27 23:39:54 CEST 2022

task workdir : file://localhost/home/merzky/radical.pilot.sandbox/rp.session.rivendell.merzky.019262.0015/pilot.0000/task.000000/
pilot id     : pilot.0000
exit code    : 0
stdout       : Tue Sep 27 23:39:54 CEST 2022

task workdir : file://localhost/home/merzky/radical.pilot.sandbox/rp.session.rivendell.merzky.019262.0015/pilot.0000/task.000001/
pilot id     : pilot.0000
exit code    : 0
exit stdout  : Tue Sep 27 23:39:54 CEST 2022


--------------------------------------------------------------------------------
finalize

closing session rp.session.rivendell.merzky.019262.0015                        \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)