5. Selecting a Task Scheduler

We have seen in the previous examples how the radical.pilot.TaskManager matches submitted tasks to pilots for execution. On constructing the task manager, it can be configured to use a specific scheduling policy for that. The following policies are implemented:

  • rp.SCHEDULER_ROUND_ROBIN: alternate tasks between all available pilot. This policy leads to a static and fair, but not necessarily load-balanced task assignment.

  • rp.SCHEDULER_BACKFILLING: dynamic task scheduling based on pilot capacity and availability. This is the most intelligent scheduler with good load balancing, but it comes with a certain scheduling overhead.

An important element to consider when discussing task scheduling is pilot startup time: pilot jobs can potentially sit in batch queues for a long time, or pass quickly, depending on their size and resource usage, resource policies, etc. Any static assignment of tasks will not be able to take that into account – and the first pilot may have finished all its work before a second pilot even came up.

This is what the backfilling scheduler tries to address: it only schedules tasks once the pilot is available, and only as many as a pilot can execute at any point in time. As this requires close communication between pilot and scheduler, that scheduler will incur a runtime overhead for each task – so that is only advisable for heterogeneous workloads and/or pilot setups, and for long running tasks.

The example given below shows an exemplary scheduling selector.It will select Round Robin scheduling for two pilots, and Backfilling for three or more.

Using multiple pilots is very powerful but it becomes more powerful if you allow RP to load-balance tasks between them. Selecting a Task Scheduler will show how to do just that.

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

[1]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version 1.18.1)
================================================================================


We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to ‘local.localhost’. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

[2]:
from dotenv import load_dotenv
load_dotenv()

RADICAL_PILOT_DBURL = os.getenv("RADICAL_PILOT_DBURL")
os.environ['RADICAL_PILOT_DBURL'] = RADICAL_PILOT_DBURL
resource = 'local.localhost'
session = rp.Session()
new session: [rp.session.6f53966c-70bc-11ed-a64d-0242ac110002]                 \
database   : [mongodb://kartikmodi:****@95.217.193.116:27017/rp_km]           ok

All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a ‘session.close()’ call in the ‘finally’ clause.

[3]:
def initialize_desc_object(resources):
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')
    report.header('submit pilots')
    pdescs = list()
    for resource in resources:
        pd_init = {
                   'resource'      : resource,
                   'runtime'       : 15,  # pilot runtime (min)
                   'exit_on_error' : True,
                   'project'       : config[resource].get('project', None),
                   'queue'         : config[resource].get('queue', None),
                   'access_schema' : config[resource].get('schema', None),
                   'cores'         : config[resource].get('cores', 1),
                   'gpus'          : config[resource].get('gpus', 0),
                  }
        pdescs.append(rp.PilotDescription(pd_init))
    return pdescs
[4]:
def launch_pilots(session,pdesc):
    pmgr = rp.PilotManager(session=session)
    pilots = pmgr.submit_pilots(pdesc)
    return pilots

In the submit_tasks function we will be using different schedulers depending on the number of pilots. tmgr = rp.TaskManager(session=session, scheduler=SCHED) will combine the pilot, the tasks and a scheduler via a TaskManager object. We will then create a workload of tasks (n=256) and create a new task description. Each task reports the id of the pilot it runs on. We then submit the task description to the PilotManager. This will trigger the selected scheduler to start assigning tasks to the pilots. We will use tmgr.wait_tasks()to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

[5]:
def submit_tasks(pilots):
    report.header('submit tasks')

    report.info('select scheduler')
    if len(pilots) in [1, 2]:
        SCHED = rp.SCHEDULER_ROUND_ROBIN
    else:
        SCHED = rp.SCHEDULER_BACKFILLING
    report.ok('>>%s\n' % SCHED)

    tmgr = rp.TaskManager(session=session, scheduler=SCHED)
    tmgr.add_pilots(pilots)


    n = 10  # number of tasks to run
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable = '/bin/echo'
        td.arguments  = ['$RP_PILOT_ID']

        tds.append(td)
        report.progress()
    report.ok('>>ok\n')


    tasks = tmgr.submit_tasks(tds)

    report.header('gather results')
    tmgr.wait_tasks()

    report.info('\n')
    return tasks

We create the report_task_progress function to report the task status of each task

[6]:
def report_task_progress(tasks):
    counts = dict()
    for task in tasks:
        out_str = task.stdout.strip()[:35]
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, out_str))
        if out_str not in counts:
            counts[out_str] = 0
        counts[out_str] += 1

    report.info("\n")
    for out_str in counts:
        report.info("  * %-20s: %3d\n" % (out_str, counts[out_str]))
    report.info("  * %-20s: %3d\n" % ('total', sum(counts.values())))

We put all function calls inside a try except block. Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

[7]:
try:
    pdesc = initialize_desc_object([resource])
    pilots = launch_pilots(session,pdesc)
    tasks = submit_tasks(pilots)
    report_task_progress(tasks)

except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')

finally:
    report.header('finalize')
    session.close(cleanup=True)

report.header()
read config                                                                   ok

--------------------------------------------------------------------------------
submit pilots

create pilot manager                                                          ok
submit 1 pilot(s)
        pilot.0000   local.localhost           1 cores       0 gpus           ok

--------------------------------------------------------------------------------
submit tasks

select scheduler                                                     round_robin
create task manager                                                           ok
create 10 task description(s)
        ..........                                                            ok
submit: ########################################################################

--------------------------------------------------------------------------------
gather results

wait  : ########################################################################
     DONE      :    10
                                                                              ok

  * task.000000: DONE, exit:   0, out: pilot.0000
  * task.000001: DONE, exit:   0, out: pilot.0000
  * task.000002: DONE, exit:   0, out: pilot.0000
  * task.000003: DONE, exit:   0, out: pilot.0000
  * task.000004: DONE, exit:   0, out: pilot.0000
  * task.000005: DONE, exit:   0, out: pilot.0000
  * task.000006: DONE, exit:   0, out: pilot.0000
  * task.000007: DONE, exit:   0, out: pilot.0000
  * task.000008: DONE, exit:   0, out: pilot.0000
  * task.000009: DONE, exit:   0, out: pilot.0000

  * pilot.0000          :  10
  * total               :  10

--------------------------------------------------------------------------------
finalize

closing session rp.session.6f53966c-70bc-11ed-a64d-0242ac110002                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)
        O/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|/-\|      0                                                          timeout
                                                                              ok
session lifetime: 63.7s                                                       ok

--------------------------------------------------------------------------------


[ ]: