#!/usr/bin/python3
#
# pScheduler Run Scheduler
#

import copy
import datetime
import errno
import optparse
import os
import pscheduler
import random
import select
import threading
import time


pscheduler.set_graceful_exit()

random.seed()


# Gargle the arguments

opt_parser = optparse.OptionParser()

# Program options

opt_parser.add_option("-d", "--dsn",
                      help="Database connection string",
                      action="store", type="string", dest="dsn",
                      default="dbname=pscheduler")
opt_parser.add_option("--first-run-offset-multi",
                      help="Time offset for first runs in multi-participant tests (ISO8601)",
                      action="store", type="string", dest="firstoffsetmulti",
                      default="PT10S")
opt_parser.add_option("--first-run-offset-single",
                      help="Time offset for first runs in single-participant tests (ISO8601)",
                      action="store", type="string", dest="firstoffsetsingle",
                      default="PT5S")
opt_parser.add_option("-m", "--max-parallel",
                      help="Maximum concurrent archivings",
                      action="store", type="int", dest="max_parallel",
                      default=100)
opt_parser.add_option("-r", "--refresh",
                      help="Forced refresh interval (ISO8601)",
                      action="store", type="string", dest="refresh",
                      default="PT10S")
opt_parser.add_option("-R", "--retries",
                      help="Number of times to retry after a conflict",
                      action="store", type="int", dest="retries",
                      default=5)
opt_parser.add_option("--retry-backoff",
                      help="Maximum random backoff between retries (ISO8601)",
                      action="store", type="string", dest="retrybackoff",
                      default="PT10S")
opt_parser.add_option("-t", "--timeout",
                      help="Communication timeout (ISO8601)",
                      action="store", type="string", dest="timeout",
                      default="PT10S")

opt_parser.add_option("--worker-idle",
                      help="Idle time before worker processes exit",
                      action="store", type="string", dest="worker_idle",
                      default="PT1M")
opt_parser.add_option("--worker-threads",
                      help="Maximum threads per worker process",
                      action="store", type="int", dest="worker_threads",
                      default=20)

opt_parser.add_option("-v", "--verbose", action="store_true", dest="verbose")
opt_parser.add_option("--debug", action="store_true", dest="debug")

(options, args) = opt_parser.parse_args()

if options.max_parallel < 1:
    opt_parser.error("Number of concurrent archivings must be positive.")


# Minimum amount of time from now when the first run of a task can be
# scheduled.  This prevents "start now" tasks from being scheduled for
# a time before the participants can prepare for them.
# TODO: Potential race condition?  Yep.
try:
    first_run_offset_multi = pscheduler.iso8601_as_timedelta(options.firstoffsetmulti)
except ValueError:
    opt_parser.error('Invalid first offset for multi "' + options.firstoffsetmulti + '"')
try:
    first_run_offset_single = pscheduler.iso8601_as_timedelta(options.firstoffsetsingle)
except ValueError:
    opt_parser.error('Invalid first offset for single "' + options.firstoffsetsingle + '"')

try:
    refresh = pscheduler.iso8601_as_timedelta(options.refresh)
except ValueError:
    opt_parser.error('Invalid refresh interval "' + options.refresh + '"')
if pscheduler.timedelta_as_seconds(refresh) == 0:
    opt_parser.error("Refresh interval must be calculable as seconds.")

conflict_retries = options.retries
if conflict_retries < 0:
    opt_parser.error("Retries cannot be negative.")

try:
    conflict_retry_backoff = pscheduler.timedelta_as_seconds(
        pscheduler.iso8601_as_timedelta(options.retrybackoff)
    )
except ValueError:
    opt_parser.error('Invalid retry backoff "%s"' % options.retrybackoff)


try:
    timeout_td = pscheduler.iso8601_as_timedelta(options.timeout)
except ValueError:
    opt_parser.error('Invalid timeout "' + options.timeout + '"')
timeout = pscheduler.timedelta_as_seconds(timeout_td)


worker_idle_td = pscheduler.iso8601_as_timedelta(options.worker_idle)
if worker_idle_td is None:
    opt_parser.error('Invalid worker idle time "' + options.worker_idle + '"')
worker_idle = pscheduler.timedelta_as_seconds(worker_idle_td)

worker_threads = options.worker_threads
if worker_threads < 1:
    opt_parser.error("Worker threads must be positive.")


log = pscheduler.Log(verbose=options.verbose, debug=options.debug, propagate=True)

dsn = pscheduler.string_from_file(options.dsn)

TIMEDELTA_ZERO = datetime.timedelta()


#
# Globals for use by worker pool processes
#


dbpool = None

def worker_process_setup(args):
    """Set up everything globally for a worker process"""
    dsn, max_size = args
    global dbpool
    dbpool = pscheduler.DBConnectionPool(dsn,
                                         options.worker_threads,
                                         name='runner-%s' % (os.getpid()),
                                         log_callback=log.warning)


def worker_process_teardown(*args):
    """Tear down up everything globally for a worker process"""
    # Nothing to do here, the pool will go away with the process.
    pass




#
# Range Class and Functions
#

class Range(object):
    """
    Expresses ranges of values.  Values may be of any type with a
    less-than operator.
    """

    def __init__(self, lower, upper):
        self.lower = min(lower, upper)
        self.upper = max(lower, upper)

    def __repr__(self):
        return "R(%s..%s)" % (self.lower, self.upper)

    # Note that this is not __len__, which has to return an integer.
    def length(self):
        return self.upper - self.lower

    def __lt__(self, rhs):
        return self.lower < rhs.lower \
            or ( self.lower == rhs.lower and self.upper < rhs.upper)

    def __or__(self, rhs):
        """Find where two ranges overlap, return None if no overlap"""
        assert isinstance(rhs, type(self)), "Wrong type"

        if self.lower > rhs.upper or self.upper < rhs.lower:
            return None
        return Range(max(self.lower, rhs.lower), min(self.upper, rhs.upper))

    def overlaps(self, candidates):
        """
        Find overlap in a list of candidate ranges, filtering out any that
        don't have any.
        """
        assert isinstance(candidates, list), "Wrong type"
        return [
            overlap for overlap in [
                self | x for x in candidates 
            ] if overlap is not None
        ]


def find_overlaps(range_lists):

    """
    Find a set of common ranges among several sets of them.

    (The use case for this in pScheduler is finding the common times
    that all participants in a task have available.)

    Applying this function to this input...

        [ [ Range(10, 50), Range(60, 77), Range(80, 90) ],
          [ Range(10, 12), Range(15,20), Range(20, 25), Range(51, 54),
            Range(65, 74), Range(81, 100) ],
          [ Range(22, 27), Range(65, 77), Range(82, 89) ],
          [ Range(1, 1000) ] ]

    ...will yield a result of [Range(22..25), Range(65..74), Range(82..89)].
    """

    sets = len(range_lists)

    # Common cases go first.

    if sets == 1:
        # One list merges with itself.
        return sorted(range_lists[0])

    if sets == 2:
        # Find overlaps in two lists

        # TODO: Figure out why we have to flatten this.
        def flatten(list_of_lists):
            return [ val for sublist in list_of_lists for val in sublist ]

        return sorted( flatten( [
            item for item in [
                item.overlaps(range_lists[1])
                for item in range_lists[0]
            ]
            if len(item) > 0
        ] ) )

    if sets == 0:
        # Nothing is... Nothing.
        return []

    # Anything more than two is recursive goodness to reduce the last
    # two sets into one and keep doing so until we get down to two.

    return sorted(find_overlaps(
        range_lists[0:-2] + [ find_overlaps(range_lists[-2:]) ]
    ))



#
# Time Proposal Fetcher
#
def fetch_proposals(
        number,
        log_debug,
        task_urls,
        range_params,
        bind_addr,
        timeout,
        url,
        key_params,
        task_duration,
        proposed_priority
):
    # TODO: This would be good parallelized.
    # See http://stackoverflow.com/questions/5236364 for some ideas.

    range_set = []
    for task_url in task_urls:

        # TODO: It would be nice if the task had a list of the
        # runtimes URLs so we don't have to build it.
        runtime_url = task_url + '/runtimes'

        log_debug("%d: Fetching proposals from %s", number, runtime_url)

        # If we're running priorities, add that to the parameters.
        full_params = range_params
        if proposed_priority is not None:
            full_params = copy.copy(range_params)
            full_params['priority'] = proposed_priority
            full_params['api'] = '4'
            log_debug("%d: Priority will be %d", number, proposed_priority)

        status, json_ranges = pscheduler.url_get(runtime_url,
                                                 params=full_params,
                                                 bind=bind_addr,
                                                 timeout=timeout,
                                                 throw=False)

        if status in [ 404, 410 ]:
            log_debug("%d: Task is no longer there.  Canceling %s.",
                              number, url)
            del_status, del_result = pscheduler.url_delete(
                url, timeout=timeout, throw=False, params=key_params,
                bind=bind_addr)
            if del_status not in [ 200, 404, 410 ]:
                log and log.warning("%d: Error while canceling %s: %s",
                                    number, url, del_result)

            raise Exception("Task %s is gone" % (url))

        # If we asked for a priority and the server says not
        # implemented, it's probably an older one.  Return as if the
        # server proposed nothing.

        if status == 501 and proposed_priority is not None:
            log_debug("%d: Server says not implemented, may be old",
                              number)
            return []

        if status != 200:
            log_debug("%d: Got back %d: %s", number, status,
                              json_ranges)
            raise Exception("Error trying to schedule for %s: %s %d"
                    % (url, runtime_url, status))

        if len(json_ranges) == 0:
            log_debug("%d: No time available.", number)
            return []

        ranges = [ Range( pscheduler.iso8601_as_datetime(item['lower']),
                          pscheduler.iso8601_as_datetime(item['upper']) )
                   for item in json_ranges ]

        log_debug("%d: Ranges: %s", number, ranges)
        
        range_set.append(ranges)

    log_debug("%d: Done fetching time ranges", number)


    # Find the ranges all participants have in common

    return [ trange for trange in find_overlaps(range_set)
             if trange.length() >= task_duration ]




#
# Run Poster
#

def run_post(
        number,        # Logging tag
        log_debug,     # Method to call for debugging
        url,           # URL for task
        start_time,    # Desired start time
        anytime,       # Whether or not there are time restrictions
        bind_addr,     # Bind address for HTTP or None
        key=None,      # Access key
        log=None
):

    """
    Schedule a run of a task on all participating nodes.

    Returns a tuple containing:
        Run URI
        Scheduled start time
        Scheduled end time
        True if the run should be skipped and tried again later
        True if the above was because a scheduling conflict cropped up
        Error message
    """

    assert isinstance(start_time, datetime.datetime)

    log_debug("%d: Posting %s at %s", number, url, start_time)
    log_debug("%d: Bind is %s", number, bind_addr)

    key_params = {} if key is None else {"key": key}

    try:
        status, task = pscheduler.url_get(url, params={'detail': 1},
                                          bind=bind_addr, timeout=timeout)
    except pscheduler.URLException as ex:
        log_debug("%d: Failed to fetch %s: %s", number, url, str(ex))
        return (None, None, None, False, False,
                "Error fetching task: %s" % (str(ex)))

    # Generate a list of the task URLs

    participants = task['detail']['participants']
    log_debug("%d: Participant list is %s", number, participants)
    assert len(participants) >= 1

    # Make sure all of the participants' APIs answer before trying to
    # do anything with them.

    # Pylint infers task to be a string when it's actually a dictionary.
    lead_bind = task.get("lead-bind", None)  # pylint: disable=maybe-no-member

    log_debug("%d: Binding from %s", number, lead_bind)
    if not pscheduler.api_ping_all_up(participants, bind=lead_bind):

        # If the start time is after the next time we'd loop around,
        # we can try it again later.  Otherwise, tell the caller to
        # post a non-starter.

        skip = pscheduler.time_now() + refresh < start_time

        log_debug("%d: Some participants down or slow. %s",
                          number,
                          "Skipping." if skip else "Non-starter.") 
        return (None, None, None, skip, False,
                "One or more participants down or slow to respond.")
    else:
        log_debug("%d: All participants are up.", number)

    task_urls = [ pscheduler.api_replace_host(url, participant)
                  for participant in participants ]
    log_debug("%d: Task URLs are %s", number, task_urls)


    #
    # Figure out the range of times in which the task can be run.
    #

    task_duration = pscheduler.iso8601_as_timedelta(task['detail']['duration'])
    try:
        task_slip = pscheduler.iso8601_as_timedelta(task['schedule']['slip'])
    except KeyError:
        task_slip = TIMEDELTA_ZERO


    # If the task is a repeater, the run can't be slipped so far that
    # it would overlap with the next interval.  Adjust it accordingly.
    #
    # TODO: This should probably be enforced by the database when the
    # task is inserted.
    try:
        repeat_interval = pscheduler.iso8601_as_timedelta(task['schedule']['repeat'])
        if task_slip + task_duration >= repeat_interval:
            task_slip = repeat_interval - task_duration
            if task_slip < TIMEDELTA_ZERO:
                task_slip = TIMEDELTA_ZERO
            log_debug("%d: Chopped slip to %s", number, task_slip)
    except KeyError:
        pass

    run_range_end = start_time + task_duration + task_slip

    range_params = {
        'start': pscheduler.datetime_as_iso8601(start_time),
        'end': pscheduler.datetime_as_iso8601(run_range_end)
        }

    log_debug("%d: Possible run range %s", number, str(range_params))


    #
    # Figure out what the local priority should be for this run.
    #

    priority = None if anytime \
               else task.get("priority", None) # pylint: disable=maybe-no-member

    #
    # Get a list of the time ranges each participant has available to
    # run the task that overlap with the range we want.
    #
    # Try _twice_ to find common times: Once with no proposed priority
    # to see if the run can be scheduled without having to preempt
    # anything and, if it can't, again at the proposed priority to see
    # if preemption helps.
    #

    priority_attempts = [ None ]
    if priority is not None:
        priority_attempts.append(priority)


    for priority_attempt in priority_attempts:

        log_debug("%d: Trying to schedule with priority %s",
                          number, priority_attempt)

        try:
            common_ranges = fetch_proposals(number, log_debug,
                                            task_urls,
                                            range_params, bind_addr,
                                            timeout, url, key_params,
                                            task_duration,
                                            priority_attempt)
        except Exception as ex:
            log.exception()
            return (None, None, None, False, False, str(ex))

        log_debug("%d: Ranges in common: %s" % (number, common_ranges))
        if common_ranges:
            break
        
    if not common_ranges:
        return (None, None, None, False, False,
                "No times available for this run.")


    # If we're randomizing the start time, pick a range at random and
    # pick a random time within it.  Otherwise, take the earliest time
    # we can get.

    if ('sliprand' in task['schedule'] and task['schedule']['sliprand']):

        selected_range = random.choice(common_ranges)
        max_slip_offset = selected_range.length() - task_duration
        log_debug("%d: Slipping randomly up to %s", number,
                          max_slip_offset)

        if max_slip_offset:
            def us(td):
                return td.microseconds + 1000000 * (td.seconds + 86400 * td.days)
            increments = int(us(max_slip_offset) / 1000000)
            picked_increment = random.randrange(0, increments)
            log_debug("%d: %d increments, picked %d", number,
                              increments, picked_increment)
        else:
            picked_increment = 0

        slip_offset = picked_increment * datetime.timedelta(seconds=1)

    else:

        log_debug("%d: Taking earliest available time", number)
        selected_range = common_ranges[0]
        slip_offset = TIMEDELTA_ZERO

    log_debug("%d: Selected range %s", number, selected_range)
    log_debug("%d: Using a slip offset of %s", number, slip_offset)
    schedule_lower = selected_range.lower + slip_offset
    schedule_upper = schedule_lower + task_duration

    now = pscheduler.time_now()
    log_debug("%d: Horizon range: %s - %s", number,
              pscheduler.timedelta_as_iso8601(schedule_lower - now),
              pscheduler.timedelta_as_iso8601(schedule_upper - now))



    #
    # Post the runs to the participants
    #

    run_params = { 'start-time': schedule_lower.isoformat() }

    runs_posted = []

    # First one is the lead.  Post it and get the UUID.

    log_debug("%d: Posting lead run to %s", number, task_urls[0])
    log_debug("%d: Data %s", number, run_params)
    status, run_lead_url \
        = pscheduler.url_post(task_urls[0] + '/runs',
                              data=run_params,
                              params=key_params,
                              bind=bind_addr,
                              timeout=30,
                              throw=False,
                              json=True)

    if status == 409:
        log_debug("%d: Lead developed a schedule conflict.",
                          number)
        return (None, None, None, True, True,
                "Lead developed a schedule conflict.")

    if status != 200:
        log_debug("%d: Failed: %d %s", number, status,
                          run_lead_url)
        return (None, None, None, False, False,
                "Failed to post lead run: %s" % run_lead_url)

    log_debug("%d: Lead URL is %s", number, run_lead_url)
    assert isinstance(run_lead_url, str)
    runs_posted.append(run_lead_url)

    # TODO: This should parse the URL and change the netloc instead of
    # assembling URLs.

    # What to add to a task URL to make the run URL
    run_suffix = run_lead_url[len(task_urls[0]):]

    # Cover the rest of the participants if there are any.

    errors = []
    conflict = False

    for task_url in task_urls[1:]:

        put_url = task_url + run_suffix

        log_debug("%d: Putting run to participant %s", number, put_url)
        log_debug("%d: Parameters: %s", number, run_params)

        status, output = pscheduler.url_put(put_url,
                                            data=run_params,
                                            params=key_params,
                                            bind=bind_addr,
                                            timeout=30,
                                            throw=False,
                                            json=False  # No output.
                                            )

        log_debug("%d: PUT %d: %s", number, status, output)

        if status == 409:
            message = "%s developed a schedule conflict." % (put_url)
            log_debug("%d: %s", number, message)
            errors.append(message)
            conflict = True
            # No sense in continuing.
            break

        if status != 200:
            log_debug("%d: Failed: %s", number, output)
            errors.append(output)
            continue

        runs_posted.append(put_url)
        log_debug("%d: Succeeded.", number)

    if len(runs_posted) != len(task_urls):
        log_debug("%d: Removing runs: %s", number, runs_posted)
        pscheduler.url_delete_list(runs_posted,
                                   params=key_params,
                                   bind=bind_addr,
                                   timeout=timeout)
        return (None, None, None, False, conflict,
                "Failed to post/put runs to all participants: %s"
                %  ("; ".join(errors))
        )

    #
    # Fetch all per-participant data, merge it and distribute the
    # result to all participants.
    #

    log_debug("%d: Fetching per-participant data", number)

    part_data = []

    # The lead's participant data has to come from the local database
    # because it's invisible to the API

    try:
        with dbpool(number) as db:
            cursor = db.cursor()
            cursor.execute("SELECT part_data FROM run WHERE uuid = %s",
                           [pscheduler.api_run_uuid(run_lead_url)])
            if cursor.rowcount != 1:
                raise RuntimeError('Got %d rows of lead participant data instead of 1.' % (cursor.rowcount))
            lead_part_data = cursor.fetchone()[0]
    except Exception as ex:
        log_debug('%d: Failed to get lead participant data: %s' % (number, str(ex)))
        return (None, None, None, False, False,
                'Failed to get lead run data: %s' % (str(ex)))

    log_debug('%s: Lead part data: %s', number, str(lead_part_data))
    part_data.append(lead_part_data)


    for run in runs_posted[1:]:

        # TODO: Should this be multiple attempts to avoid a race condition?
        log_debug("%d: Getting part data from %s", number, run)
        status, result = pscheduler.url_get(run,
                                            params=key_params,
                                            bind=bind_addr,
                                            throw=False,
                                            timeout=timeout)
        if status != 200 or not 'participant-data' in result:
            log_debug("%d: Deleting runs: %s", number, runs_posted)
            pscheduler.url_delete_list(runs_posted, params=key_params,
                                       bind=bind_addr, timeout=timeout)
            return (None, None, None, False, False,
                    "Failed to get run data from %s: %s: %s" % (run, status, result) )

        part_data.append(result['participant-data'])
        log_debug("%d: Got %s", number,
                          result['participant-data'])

    full_data = { 'part-data-full': part_data }

    log_debug("%d: Full part data: %s", number, full_data)

    for run in runs_posted:
        log_debug("%d: Putting full part data to %s", number, run)
        status, result = pscheduler.url_put(run,
                                            data=full_data,
                                            params=key_params,
                                            bind=bind_addr,
                                            json=False,
                                            throw=False,
                                            timeout=30)
        if status != 200:
            pscheduler.url_delete_list(runs_posted, params=key_params,
                                       bind=bind_addr, timeout=timeout)
            # TODO: Better error?
            log_debug("%d: Failed: %s", number, result)
            return (None, None, None, False, False,
                    "Failed to post run data to all participants")


    # TODO: Probably also want to return the start and end times?
    log_debug("%d: Run posting finished", number)    

    return (runs_posted[0], schedule_lower, schedule_upper, False, False, None)


# ------------------------------------------------------------------------------


class NonStarterException(Exception):
    """Exception thrown when a scheduling attempt results in a
    non-starter.  The exception's message is the reason."""
    pass


def schedule_task(number, task, key, runs, trynext, anytime, json,
                  participants, log_debug):
    """Do the actual work of scheduling a run."""
    # TODO: This could probably be part of the SchedulerWorker class.

    scheduling_error = None

    log_debug("%d: %sTASK %s, %d runs, try %s", number,
              "ANYTIME " if anytime else "",
              task, runs, trynext)

    # Punt the lead-bind value to "localhost" and make
    # everything would work nicely.  This is largely to avoid
    # the situation where the hostname points at an interface
    # that isn't up and is reasonably safe because we're only
    # talking to the lead, which is local.

    url = pscheduler.api_url(
        host=json.get("lead-bind", participants[0]),
        path="/tasks/%s" % (task))

    # For the first run only, push the start time out.  See comment
    # above near the declaration of first_run_offset_single.

    if runs == 0:
        later_start = pscheduler.time_now() \
                      + (first_run_offset_single \
                        if len(participants) == 1 \
                        else first_run_offset_multi)
        if trynext < later_start:
            trynext = later_start

    log_debug("%d: Trying to schedule %s for %s", number, task, trynext)
    log_debug("%d: URL is %s", number, url)

    lead_bind = json.get("lead-bind", None)

    # Try a few times to schedule the run.  Sometimes,
    # something else may schedule a time we're working on, so
    # when that happens, try again.

    tries_left = conflict_retries
    while tries_left > 0:
        tries_left -= 1
        try:
            run_uri, start_time, end_time, skip, conflict, scheduling_error \
                = run_post(number, log_debug, url, trynext, anytime, lead_bind,
                           log=log, key=key)
        except Exception as ex:
            scheduling_error = str(ex)
            skip = False
            break

        if not conflict:
            break

        sleep_time = conflict_retry_backoff * random.random()
        log_debug("%d: Developed a scheduling conflict.  Sleeping %fs.", number, sleep_time)
        time.sleep(sleep_time)


    if tries_left == 0:
        skip = False
        scheduling_error = "Gave up after too many scheduling conflicts."
        # TODO: Delete lead run

    if skip:
        log_debug("%d: Skipping: %s", number, scheduling_error)
        return

    if scheduling_error is not None:
        log_debug("%d: Unable: %s", number, scheduling_error)
        raise NonStarterException(scheduling_error)

    log_debug("%d: Scheduled for %s - %s at %s", number, start_time,
              end_time, run_uri)

    # TODO: Set lead run to pending so the runner will see it.

    try:
        with dbpool(number) as db:
            cursor = db.cursor()
            cursor.execute("""UPDATE run SET state = run_state_pending() WHERE uuid = %s""",
                           [pscheduler.api_run_uuid(run_uri)])
        log_debug(f'{number}: Run state set to pending')
    except Exception as ex:
        log_debug(f'{number}: Failed to set pending: {str(ex)}')
        # This is best-effort; the maintainer will clean up anything that gets left.
        pass




# ------------------------------------------------------------------------------

class SchedulerWorker(pscheduler.GenericWorker):

    """Handles scheduling for a single run."""

    def __init__(self, number, task, key, runs, trynext, anytime, json, participants, task_debug, forced_debug):
        """Initialize"""

        self.number = number

        self.task = task
        self.key = key
        self.runs = runs
        self.trynext = trynext
        self.anytime = anytime
        self.json = json
        self.participants = participants
        self.task_debug = task_debug
        self.forced_debug = forced_debug


    def __call__(self):
        """Do the scheduling"""

        if self.task_debug or self.forced_debug:
            log_debug = log.debug_always
            if self.task_debug:
                log_debug("%d: Task requested debug", self.number)
        else:
            log_debug = lambda *args: None

        log_debug("%d: Scheduling task %s at %s", self.number, self.task, self.trynext)


        try:

            schedule_task(self.number, self.task, self.key,
                          self.runs, self.trynext, self.anytime, self.json,
                          self.participants, log_debug)

        except NonStarterException as ex:

            # Run isn't going to happen.  Post a non-starting run on
            # the local (lead) system.

            log.info("%d: Posting non-starting run at %s for task %s: %s",
                     self.number,
                     pscheduler.datetime_as_iso8601(self.trynext),
                     self.task, str(ex))

            try:
                with dbpool(self.number) as db:
                    cursor = db.cursor()
                    cursor.execute("""SELECT api_run_post(%s, %s, NULL, %s)""",
                              [self.task, self.trynext, str(ex)])
            except Exception as nsex:
                log.error("%d: Insertion of non-starter failed: %s",
                          self.number, str(nsex))

        finally:

            # However it went, we're no longer working on this task.
            log_debug("%d: Thread finished", self.number)

        # No news is good news.
        return None





#
# Main Program
#

def main_program():

    # TODO: All DB transactions need to be error checked

    worker_pool = pscheduler.WorkerProcessPool(
        name="scheduler-pool",
        load_limit=options.worker_threads,
        setup=worker_process_setup,
        setup_args=(dsn, options.worker_threads,),
        teardown=worker_process_teardown,
        teardown_args=(),
        debug_callback=lambda m: log.debug(m),
        # Don't limit this; the OS will punish too many processes.
        pool_size_limit=None,
        idle_time=worker_idle
    )


    db = pscheduler.PgConnection(dsn, name="scheduler")
    db.listen("task_change")
    db.query("SELECT heartbeat_boot('scheduler')")

    # This is separate so non-start activity doesn't get queued up
    # behind the connection that's doing the work in this function's
    # loop.  Having the non-starers queue up isn't a big deal because
    # they're quick.

    # What workers are running
    max_workers = options.max_parallel

    workers = pscheduler.ThreadSafeSet()
    worker_threads = {}

    def scheduling_completion(identifier, value, diags):
        workers.remove(identifier)
        if value is None:
            log.debug("%d: Worker reported completion", identifier)
        else:
            log.error("%d: Worker failed; returned %s: %s", identifier, value, diags)

    wait = False  # First iteration is always no-wait.

    while True:

        if wait:

            # Purge any workers that are no longer running
            for remove in [ worker_id for worker_id in worker_threads if worker_id not in workers ]:
                log.debug("Purging completed thread %d", remove)
                del worker_threads[remove]

            worker_pool.groom()

            db.query("SELECT heartbeat('scheduler', %s)", [refresh])

            log.debug("Waiting.")
            if db.wait(pscheduler.timedelta_as_seconds(refresh)):
                log.debug("Task change.")

                # Discard pending notifications.  This is safe to do
                # because all we ever get is task changes and the
                # query below will find everything that needs
                # scheduling.
                _ = db.notifications()
            else:
                log.debug("Timed out.")

        else:

            log.debug("Not waiting.")


        wait = True

        query = """SELECT task, uuid, key, runs, trynext, anytime, json, participants, debug
            FROM schedule_runs_to_schedule"""
        args = []

        # Weed out any tasks that have a worker scheduling a run.

        working = workers.items()

        if len(working) > 0:
            query += " WHERE task NOT IN %s"
            args.append(tuple(working))
            log.debug("Already have workers for %s", working)

            # Don't ask for more rows than we're going to be able to consume

            limit = max_workers - len(working)
            if limit < 1:
                log.debug("Already have a full slate of workers, skipping query.")
                continue

            if limit < max_workers:
                log.debug("Limiting query to %d tasks", limit)
            query += " LIMIT %d" % (limit)

        for row in db.query(query, args):

            number, task, key, runs, trynext, anytime, json, participants, debug = row

            if number in workers:
                log.debug(f'Already have a worker for {number}')
                continue

            try:
                # Make a worker and throw it into the pool.
                worker = SchedulerWorker(number, task, key, runs, trynext, anytime, json,
                                         participants, debug, log.is_forced_debugging())
                worker_pool(number, worker, scheduling_completion)
                workers.add(number)
                log.debug("%d: Created worker", number)
                wait = False
            except Exception as ex:
                log.exception(message="%d: Exception while starting worker" % (number))

        if db.notifications():
            wait = False



main_program()
