Source code for radical.pilot.utils.misc


# pylint: disable=global-statement

import os
import time

from typing import Union

import radical.utils as ru

# max number of t out/err chars to push to tail
MAX_IO_LOGLENGTH = 1024

# cache resource configs
_rcfgs = None


# ------------------------------------------------------------------------------
#
def tail(txt: str, maxlen: int = MAX_IO_LOGLENGTH) -> str:

    # shorten the given string to the last <n> characters, and prepend
    # a notification.  This is used to keep logging information in mongodb
    # manageable(the size of mongodb documents is limited).

    if not txt:
        return ''

    if len(txt) > maxlen:
        return "[... CONTENT SHORTENED ...]\n%s" % txt[-maxlen:]
    else:
        return txt


# ------------------------------------------------------------------------------
#
def get_rusage() -> str:

    import resource

    self_usage  = resource.getrusage(resource.RUSAGE_SELF)
    child_usage = resource.getrusage(resource.RUSAGE_CHILDREN)

    rtime = time.time()
    utime = self_usage.ru_utime  + child_usage.ru_utime
    stime = self_usage.ru_stime  + child_usage.ru_stime
    rss   = self_usage.ru_maxrss + child_usage.ru_maxrss

    return "real %3f sec | user %.3f sec | system %.3f sec | mem %.2f kB" \
         % (rtime, utime, stime, rss)


# ------------------------------------------------------------------------------
#
def create_tar(tgt: str, dnames: str) -> None:
    '''
    Create a tarball on the file system which contains all given directories
    '''

    uid   = os.getuid()
    gid   = os.getgid()
    mode  = 16893
    mtime = int(time.time())

    fout  = open(tgt, 'wb')

    def rpad(s, size):
        return s + (size - len(s)) * '\0'

    def write_dir(path):
        data  = rpad(path, 100) \
              + rpad('%o' % mode,   8) \
              + rpad('%o' % uid,    8) \
              + rpad('%o' % gid,    8) \
              + rpad('%o' % 0,     12) \
              + rpad('%o' % mtime, 12) \
              + 8 * '\0' + '5'
        cksum = 256 + sum(ord(h) for h in data)
        data  = rpad(data  , 512)
        data  = data  [:-364] + '%06o\0' % cksum + data[-357:]
        fout.write(ru.as_bytes(data))

    for dname in dnames:
        write_dir(dname)

    fout.close()


# ------------------------------------------------------------------------------
#
[docs]def get_resource_configs() -> ru.Config: ''' Return all resource configurations used by `radical.pilot`. Returns: :obj:`radical.utils.Config`: the resource configurations Configurations for the individual resources are organized as sites and resources: cfgs = get_resource_configs() sites = cfgs.keys() for site in sites: resource_names = cfgs[site].keys() ''' global _rcfgs if not _rcfgs: _rcfgs = ru.Config('radical.pilot.resource', name='*', expand=False) # return a deep copy return ru.Config(from_dict=_rcfgs)
# ------------------------------------------------------------------------------ #
[docs]def get_resource_config(resource: str) -> Union[None, ru.Config]: ''' For the given resource label, return the resource configuration used by `radical.pilot`. Args: resource (:obj:`str`): resource label for which to return the cfg Returns: :obj:`radical.utils.Config`: the resource configuration The method returns `None` if no resource config is found for the specified resource label. ''' # populate cache if not _rcfgs: get_resource_configs() site, host = resource.split('.', 1) if site not in _rcfgs: return None if host not in _rcfgs[site]: return None # return a deep copy return ru.Config(cfg=_rcfgs[site][host])
# ------------------------------------------------------------------------------ #
[docs]def get_resource_fs_url(resource: str, schema : str = None) -> Union[None, ru.Url]: ''' For the given resource label, return the contact URL of the resource's file system. This corresponds to the `filesystem_endpoint` setting in the resource config. For example, `rs.filesystem.directory(get_resource_fs_url(...)).change_dir('/')` is equivalent to the base ``endpoint:///`` URL available for use in a `staging_directive`. Args: resource (:obj:`str`): resource label for which to return the url schema (:obj:`str`, optional): access schema to use for resource access. Defaults to the default access schema as defined in the resources config files. Returns: :obj:`radical.utils.Url`: the file system URL The method returns `None` if no resource config is found for the specified resource label and access schema. ''' rcfg = get_resource_config(resource) if not schema: schema = rcfg['schemas'][0] # return a deep copy return ru.Url(rcfg[schema]['filesystem_endpoint'])
# ------------------------------------------------------------------------------ #
[docs]def get_resource_job_url(resource: str, schema : str = None) -> Union[None, ru.Url]: ''' For the given resource label, return the contact URL of the resource's job manager. Args: resource (:obj:`str`): resource label for which to return the url schema (:obj:`str`, optional): access schema to use for resource access. Defaults to the default access schema as defined in the resources config files. Returns: :obj:`radical.utils.Url`: the job manager URL The method returns `None` if no resource config is found for the specified resource label and access schema. ''' rcfg = get_resource_config(resource) if not schema: schema = rcfg['schemas'][0] # return a deep copy return ru.Url(rcfg[schema]['job_manager_endpoint'])
# ------------------------------------------------------------------------------