#!/usr/bin/python3
#
# Execute runs of tasks and put the results into the database.
#

import datetime
import errno
import multiprocessing
import optparse
import os
import pscheduler
import psutil
import select
import signal
import socket
import tempfile
import time
import traceback


pscheduler.set_graceful_exit()


# Gargle the arguments

opt_parser = optparse.OptionParser()

# Program options

opt_parser.add_option("-d", "--dsn",
                      help="Database connection string",
                      action="store", type="string", dest="dsn",
                      default="")
opt_parser.add_option("-r", "--refresh",
                      help="Forced refresh interval (ISO8601)",
                      action="store", type="string", dest="refresh",
                      default="PT15S")
opt_parser.add_option("--terse-logging",
                      help="Don't log run details",
                      action="store_true",
                      dest="terse",
                      default=False)

opt_parser.add_option("--worker-idle",
                      help="Idle time before worker processes exit",
                      action="store", type="string", dest="worker_idle",
                      default="PT1M")
opt_parser.add_option("--worker-threads",
                      help="Maximum threads per worker process",
                      action="store", type="int", dest="worker_threads",
                      default=20)


opt_parser.add_option("--verbose", action="store_true", dest="verbose", default=False)
opt_parser.add_option("--debug", action="store_true", dest="debug", default=False)

(options, args) = opt_parser.parse_args()

refresh = pscheduler.iso8601_as_timedelta(options.refresh)
if refresh is None:
    opt_parser.error('Invalid refresh interval "' + options.refresh + '"')
if pscheduler.timedelta_as_seconds(refresh) == 0:
    opt_parser.error("Refresh interval must be calculable as seconds.")

worker_idle_td = pscheduler.iso8601_as_timedelta(options.worker_idle)
if worker_idle_td is None:
    opt_parser.error('Invalid worker idle time "' + options.worker_idle + '"')
worker_idle = pscheduler.timedelta_as_seconds(worker_idle_td)

worker_threads = options.worker_threads
if worker_threads < 1:
    opt_parser.error("Worker threads must be positive.")



log = pscheduler.Log(verbose=options.verbose, debug=options.debug, propagate=True)

dsn = pscheduler.string_from_file(options.dsn)

log_details = not options.terse


# ------------------------------------------------------------------------------

# Globals for use by worker pool processes

dbpool = None

def worker_process_setup(args):
    """Set up everything globally for a worker process"""
    dsn, max_size = args
    global dbpool
    dbpool = pscheduler.DBConnectionPool(dsn,
                                         options.worker_threads,
                                         name='runner-%s' % (os.getpid()),
                                         log_callback=log.warning)


def worker_process_teardown(*args):
    """Tear down up everything globally for a worker process"""
    # Nothing to do here, the pool will go away with the process.
    pass

 
# ------------------------------------------------------------------------------



# Current value of run start margin according to the database

__run_start_margin = None

def run_start_margin_set(db):
    global __run_start_margin

    try:
        rows = list(db.query("SELECT run_start_margin FROM configurables"))
    except Exception as ex:
        raise RuntimeError("Unable to get run start margin: %s" % (str(ex)))

    if (len(rows) != 1) or (len(rows[0]) != 1):
        raise RuntimeError("Unexpected return from start margin query: %s" % (str(rows)))

    __run_start_margin = rows[0][0]
    log.debug("Run start margin set to %s", __run_start_margin)


def run_start_margin():
    global __run_start_margin
    assert __run_start_margin is not None, "Run start margin was never set"
    return __run_start_margin


#
# A record of what we believe to be running and what we don't.
#

ids_running = pscheduler.ThreadSafeSet()

# This is run when a task completes
def run_completion(identifier, value, diags):
    ids_running.remove(identifier)
    if value is None:
        log.debug("%d: Worker reported completion", identifier)
    else:
        log.error("%d: Worker failed; returned %s: %s", identifier, value, diags)


#
# Clock Survey
#

def get_clock(arg):
    slot, url, bind = arg
    status, result = pscheduler.url_get(url, throw=False, bind=bind)
    if status != 200:
        result = { "error": status }
    return (slot, result)


def clock_survey(hosts, bind=None):

    if len(hosts) == 0:
        return []

    # Any null hosts become the local host
    hosts = [ pscheduler.api_local_host() if host is None else host
              for host in hosts ]

    host_args = []
    for slot in range(0,len(hosts)):
        host = hosts[slot]
        if host is None:
            continue
        host_args.append((slot, pscheduler.api_url(host, "/clock"), bind))

    # Prime the result with empties for anything that didn't get
    # tested.
    result = [None] * len(hosts)

    # Run the lot of tests in parallel
    pool = multiprocessing.pool.ThreadPool(processes=len(host_args))
    for slot, clock in pool.imap(get_clock, host_args, chunksize=1):
        result[slot] = clock
    pool.close()

    return pscheduler.json_dump(result)



#
# Class that does the test runs
#

class RunWorker(pscheduler.GenericWorker):

    def __init__(self, id, start_at, debug):

        self.id = id
        self.start_at = start_at
        self.finished = False
        self.output = []
        self.debug = debug

        # This will be set definitively when the worker runs because
        # the log class can't be passed across processes.
        self.log_debug = None


    def __post_new_result(self, result):
        """
        Post a finished run for this task using the result provided.
        """
        self.log_debug("%d: Got result: %s", self.id, result)
        try:
            json = pscheduler.json_load(result, max_schema=1)
        except ValueError:
            log.warning("%d: Discarding bogus result %s", self.id, result)
            return
        
        try:
            with self.dbpool(self.id) as db:
                cursor = db.cursor()
                cursor.execute("""
                WITH inserted_row AS (
                    INSERT INTO run (task, uuid, times, state, status, result_merged)
                    VALUES (%s,
                            NULL,
                            tstzrange(normalized_now(), normalized_now(), '[]'),
                            run_state_finished(),
                            0,
                            %s)
                    RETURNING *
                ) SELECT uuid FROM inserted_row
                """, [ self.task_id, result ])

                # Should get exactly one row back with the UUID of
                # the new result.

                if cursor.rowcount != 1:
                    self.log.error("%d: Failed to get UUID of posted run.",
                                   self.id)
                    return

                if log_details:
                    self.log_debug("%d: Posted result to %s/runs/%s",
                                   self.id, self.task_url,
                                   cursor.fetchone()[0])


        except Exception as ex:
            self.log.exception()
            self.log.error("%d: Failed to post run for result: %s", self.id, str(ex))


    def __accumulate_output(self, line):
        """
        Accumulate lines of output from the tool.  Treat any input
        lines that look like RFC7464 as a full result.
        """

        if len(line) > 2 and line[0] == "\x1e":
            # RFC7464.  The JSON parser won't care about the newline at the end.
            line = line[1:]
            self.log_debug("%d: RFC7464 line %s", self.id, line)
            self.__post_new_result(line)
            self.output = []
        else:
            # Ordinary text
            self.log_debug("%d: Plain text line %s", self.id, line)
            self.output.append(line)

        
    def __call__(self):
        """
        Run the tool in an exception-safe way
        """

        # These are initialized here because they can't be passed
        # through to another process.

        global dbpool
        self.dbpool = dbpool

        global log
        self.log = log

        self.log_debug = log.debug_always if self.debug else lambda *args: None

        try:
            self.log_debug("%d: Running", self.id)
            with tempfile.TemporaryDirectory() as self.temp:
                self.log_debug("%d: Temp space in %s", self.id, self.temp)
                self.__run()
            self.temp = None
        except Exception as ex:
            # Don't worry about the result here.  If __run() failed to
            # post anything, that will be the end of it.  If it did,
            # it might be salvageable.
            self.log_debug("%d: Exception: %s", self.id, ex)
            log.exception()
            return ex
        finally:
            self.log_debug("%d: Thread finished", self.id)

        # No news is good news.
        return None


    def __run(self):
        """
        Run the tool and, if the lead participant, gather, aggregate
        and post the results.
        """

        # Do as much preparation as possible before going to sleep.

        try:
            with self.dbpool(self.id) as db:
                cursor = db.cursor()
                cursor.execute("""
                           SELECT            
                               test.name,
                               tool.name,
                               task.uuid,
                               task.id,
                               task.participant,
                               task.participants,
                               lower(run.times),
                               upper(run.times),
                               task.json #> '{test}',
                               task.cli,
                               run.uuid,
                               run.part_data_full,
                               scheduling_class.enum,
                               task.json ->> 'lead-bind',
                               task.json #> '{contexts,contexts}',
                               task.json ->> '_key',
                               task.json -> 'debug'
                           FROM
                               run
                               JOIN task ON task.id = run.task
                               JOIN test ON test.id = task.test
                               JOIN scheduling_class
                                    ON scheduling_class.id = test.scheduling_class
                               JOIN tool ON tool.id = task.tool
                           WHERE run.id = %s
                           """, [self.id])

                # Should get exactly one row back.  If not, the run probably
                # vanished.

                if cursor.rowcount != 1:

                    if log_details:
                        self.log_debug("%d: Run is gone.  Stopping.", self.id)
                    return

                row = cursor.fetchone()

                test, tool, task_uuid, task_id, participant, participants, \
                    start, end, test_spec, cli, run_uuid, \
                    partdata, scheduling_class, lead_bind, contexts, key, debug = row

        except Exception as ex:
            self.log.exception()
            self.log.error("%d: Failed to fetch run info: %s",
                           self.id, str(ex))
            return

        # Debug if object creatior or the task said to.
        if self.debug or debug:
            self.log_debug = self.log.debug_always
            if debug:
                self.log_debug("%d: Task requested debug", self.id)

        if lead_bind is not None:
            self.log_debug("%d: Lead bind for this run is %s", self.id, lead_bind)

        # This gets used by __post_result, above.
        self.task_url = pscheduler.api_url_hostport(
            hostport=participants[0],
            path="/tasks/%s" % (task_uuid) )

        # This is only used for debut messages.
        run_url = "%s/runs/%s" % (self.task_url, run_uuid)

        # This will be used when a background-multi run produces a result.
        self.task_id = task_id

        if partdata is None:
            # TODO: Should simply bail out here.
            self.log.error("%d: Got NULL part_data_full for %s",
                           self.id, run_url)

        tool_input = pscheduler.json_dump({
            'schema': 1,
            'task-uuid': task_uuid,
            'schedule': {
                'start': pscheduler.datetime_as_iso8601(start),
                'duration': pscheduler.timedelta_as_iso8601(end - start)
                },
            'test': test_spec,
            'participant': participant,
            'participant-data': partdata,
            })


        # If there are contexts, do the advance work for that.

        # TODO: Might be good to see if we can just do all of te
        # running with the ChainedExecRunner.

        if contexts is not None and len(contexts[participant]):
            context_args = [
                {
                    "program": [
                        pscheduler.plugin_method_path("context", context["context"], "change")
                    ],
                    "input": context.get("data", {})
                }
                for context in contexts[participant]
                ]

            context_runner = pscheduler.ChainedExecRunner(
                context_args,
                argv=[ pscheduler.plugin_method_path("tool", tool, "run") ],
                stdin=tool_input)

        else:

            context_runner = None

        #
        # Wait for the start time to roll around
        #

        self.log_debug("%d: Start at %s", self.id, self.start_at)
        start_time = self.start_at - run_start_margin()
        self.log_debug("%d: Sleeping until test start at %s", self.id, start_time)
        pscheduler.sleep_until(start_time)

        how_late = pscheduler.time_now() - start_time
        self.log_debug("%d: Start time difference is %s", self.id, how_late)

        if how_late > run_start_margin():
            self.log.warning("%d: Starting %s later than scheduled.",
                             self.id, how_late)


        #
        # Get cracking
        #

        if log_details:
            self.log_debug("%d: Running %s", self.id, run_url)
            self.log_debug("%d: With %s: %s %s", self.id, tool, test, " ".join(cli))

        # Tell the database we're proceeding

        try:
            with self.dbpool(self.id) as db:
                cursor = db.cursor()
                cursor.execute("SELECT run_start(%s)", [self.id])
                assert cursor.rowcount == 1
                result = cursor.fetchone()[0]
                if result is not None:
                    self.log.info("%d: Aborting run: %s", self.id, result)
                    return
        except AssertionError as ae:
            self.log.error("%d: run_start did not return exactly one row.", self.id)
            raise ae
        except Exception as ex:
            self.log.exception()
            self.log.error("%d: Failed to start run: %s", self.id, str(ex))
            raise ex


        #
        # Do the local tool run
        #

        self.log_debug("%d: Tool input: %s", self.id, tool_input)

        timeout = pscheduler.timedelta_as_seconds(end - start + run_start_margin()) + 1

        # Environment that will be passed into the child process

        temp_env = { "TMPDIR": self.temp }
        log_env = self.log.environment(debug=debug)
        child_env = {**temp_env, **log_env}


        # Run it.

        if context_runner is None:

            returncode, stdout, stderr = pscheduler.plugin_invoke(
                "tool", tool, "run",
                stdin=tool_input,
                env_add=child_env,
                timeout=timeout,
                line_call=lambda l: self.__accumulate_output(l))

        else:

            returncode, stdout, stderr = context_runner.run(
                env_add=child_env,
                timeout=timeout,
                line_call=lambda l: self.__accumulate_output(l)
            )


        self.log_debug("%d: Program has finished.", self.id)

        stdout = "\n".join(self.output)

        self.log_debug("%d: Tool returned '%s'", self.id, stdout)

        if len(stdout) == 0:
            stdout = None
        else:
            # See if the test claimed failure
            try:
                result_json = pscheduler.json_load(stdout, max_schema=1)
            except ValueError:
                self.log.error("%d: Tool returned invalid JSON: %s", self.id, stdout)


        if len(stderr) == 0:
            stderr = None

        if returncode == 0:
            if log_details:
                self.log_debug("%d: Run returned %s", self.id, stdout)
        else:
            if log_details:
                self.log.error("%d: Run failed %d: %s", self.id,
                               returncode, stderr)
            if stderr is None:
                stderr = "Tool exited with status %d" % (returncode)

        try:
            state_selector = 2 if  (
                (participant > 0) or (scheduling_class == "background-multi")
                ) else returncode
            with self.dbpool(self.id) as db:
                db.cursor().execute("""
                UPDATE run
                SET
                status = %s,
                result = %s,
                errors = %s,
                state = CASE
                            WHEN %s = 0 THEN run_state_cleanup()
                            WHEN %s = 1 THEN run_state_failed()
                            ELSE run_state_finished()
                        END
                WHERE id = %s
                """,
                [returncode,
                 stdout,
                 stderr,
                 state_selector,
                 state_selector,
                 self.id])
        except Exception as ex:
            self.log.exception()
            self.log.error("%d: Failed to store local result: %s",
                           self.id, str(ex))
            return
        
        self.log_debug("%d: Stored local result", self.id)

        # The lead participant in non-background-multi tasks takes care of
        # gathering and merging the finished results.  Background-multi
        # tasks take care of inserting their own results.

        if participant == 0 and scheduling_class != "background-multi":

            self.log_debug("%d: Doing lead participant duties", self.id)

            # Wait until the scheduled time has passed, which is the
            # only time we can be sure results might be available.

            if len(participants) > 1:
                wait_time = pscheduler.time_until_seconds(end)
                self.log_debug("%d: Waiting for task end time to pass (%s)", self.id, wait_time)
                time.sleep(wait_time)
                self.log_debug("%d: Task end time has passed", self.id)
            else:
                self.log_debug("%d: Only one participant; not waiting.", self.id)

            # Fetch and combine the results.

            runs = [ pscheduler.api_url_hostport(
                hostport = host,
                path = '/tasks/%s/runs/%s'
                % (task_uuid, run_uuid) )
                     for host in participants ]

            self.log_debug("%d: Runs are %s", self.id, runs)
            self.log_debug("%d: Local run returned %d", self.id, returncode)

            if returncode == 0:
                try:
                    local_result = pscheduler.json_load(stdout)
                except ValueError:
                    error = "Tool {} returned invalid JSON".format(tool)
                    self.log.error("%d: %s '%s'", self.id, error, stdout)
                    local_result = {
                        "succeeded": False,
                        "error": error,
                        "diags": "Returned text:\n\n{}\n".format(stdout)
                    }
            else:
                self.log_debug("%d: Tool returned failure: %s", self.id, stderr)
                local_result = {
                    "succeeded": False,
                    "error": "Tool failed: {}".format(stderr),
                    "diags": "Tool standard output:\n\n{}\n\nTool standard error:\n\n{}\n".format(stdout, stderr)
                }

            # Assemble the results from each participant into an
            # array.

            result_full = [ None for _ in runs ]

            # We have this on hand.
            result_full[0] = local_result
            self.log_debug("%d: Accumulated local result", self.id)

            # Wait up to 15 seconds for all of the participants to
            # produce results.

            to_get = dict([(runs[index], index)
                           for index in range(1, len(runs))])

            deadline = pscheduler.time_now() + datetime.timedelta(seconds=15)

            for get in list(to_get):
                self.log_debug("%d: Fetching run %s", self.id, get)

                timeout = int(pscheduler.timedelta_as_seconds(
                    deadline - pscheduler.time_now()))

                status, run_result = pscheduler.url_get(
                    get,
                    params={ 'wait-local': True, 'wait': timeout },
                    bind=lead_bind,
                    throw=False,
                    timeout=timeout + 0.5
                )

                if status == 200:
                    self.log_debug("%d: Retrieved %s", self.id, run_result)
                    index = to_get[get]
                    got = run_result["result"]
                    if got is not None:
                        result_full[index] = got
                else:
                    self.log.warning("%d: Unable to retrieve run %s: %d: %s",
                                         self.id, get, status, run_result)
                    got = { "succeeded": False,
                            "error": run_result,
                            "diags": run_result
                        }

                del to_get[get]


            # Merge the results.  The merged-results method in the
            # plugin will determine whether or not the full results
            # constitute success or failure.

            self.log_debug("%d: Merging results: %s", self.id, result_full)
            merge_input = {
                "test": test_spec,
                "results": result_full
            }
            merge_input_text = pscheduler.json_dump(merge_input)
            self.log_debug("%d: Merging %s", self.id, merge_input_text)

            returncode, stdout, stderr = pscheduler.plugin_invoke(
                "tool", tool, "merged-results",
                stdin=merge_input_text, timeout=5)

            self.log_debug("%d: Merged results: %d %s %s", self.id, returncode, stdout, stderr)

            if returncode == 0:
                try:
                    result_merged = pscheduler.json_load(stdout)
                except ValueError as ex:
                    result_merged = {
                        "succeeded": False,
                        "error": "Tool {} merged-results returned invalid JSON '{}'".format(tool),
                        "diags": "Exited {}\n\nStandard out:\n\n{}\n\nStandard error:\n\n".format(returncode, stdout, stderr)
                    }
            else:
                result_merged = {
                    "succeeded": False,
                    "error": "Tool {} failed to merge results: {}".format(tool, stderr),
                    "diags": "Exited {}\n\nStandard out:\n\n{}\n\nStandard error:\n\n{}\n".format(returncode, stdout, stderr)
                }

            result_full_text = pscheduler.json_dump(result_full)
            self.log_debug("%d: Full result: %s ", self.id, result_full_text)

            result_merged_text = pscheduler.json_dump(result_merged)
            self.log_debug("%d: Merged result: %s ", self.id, result_merged_text)

            succeeded = result_merged.get("succeeded", False) if result_merged is not None else False

            # If the run was a failure, survey all of the particpants'
            # clocks.

            survey = None if succeeded else clock_survey(participants, lead_bind) 

            # Store full and merged results and the clock survey in
            # the local database.

            try:
                self.log_debug("%d: Merged result: Setting final state.  Succeeded=%s", self.id, succeeded)
                with self.dbpool(self.id) as db:
                    # TODO: Need to figure out succeeded.
                    db.cursor().execute("""
                                    UPDATE run
                                    SET
                                        state = CASE
                                                    WHEN %s THEN run_state_finished()
                                                    ELSE run_state_failed()
                                                END,
                                        result_full = %s,
                                        result_merged = %s,
                                        clock_survey = %s
                                    WHERE id = %s
                                    """,
                                    [succeeded,
                                     pscheduler.json_dump(result_full),
                                     result_merged_text,
                                     survey,
                                     self.id])
            except Exception as ex:
                self.log.exception()
                self.log.error("%d: Failed to store run: %s", self.id, str(ex))

        self.log_debug("%d: Run complete", self.id)
        self.finished = True

        # No news is good news.
        return None



#
# Main Program
#


# This is rough treatment, but child processes really have to go when
# the program exits.
def kill_subprocesses():
    for child in psutil.Process().children(recursive=True):
        try:
            os.kill(child.pid, signal.SIGKILL)
        except Exception:
            pass  # This is best-effort.

pscheduler.on_graceful_exit(kill_subprocesses)


def main_program():

    log.debug("Begin main")

    worker_pool = pscheduler.WorkerProcessPool(
        name="runner-pool",
        load_limit=options.worker_threads,
        setup=worker_process_setup,
        setup_args=(dsn, options.worker_threads,),
        teardown=worker_process_teardown,
        teardown_args=(),
        debug_callback=lambda m: log.debug(m),
        # Don't limit this; the OS will punish too many processes.
        pool_size_limit=None,
        idle_time=worker_idle
    )


    # This is for local use.
    db = pscheduler.PgConnection(dsn, name="runner")
    log.debug("Connected to DB")

    # Listen for notifications.
    for listen in ["run_ready", "configurables_changed" ]:
        log.debug("Listening for notification %s" % (listen))
        db.listen(listen)

    db.query("SELECT heartbeat_boot('runner')")

    # Prime this for the first run
    wait_time = datetime.timedelta()
    run_start_margin_set(db)

    worker_threads = {}

    while True:

        # Purge any workers that are no longer running
        for remove in [worker_id for worker_id in worker_threads
                       if worker_threads[worker_id].is_complete()]:
            log.debug("Purging completed thread %d", remove)
            del worker_threads[remove]

        if not pscheduler.timedelta_is_zero(wait_time):

            # Wait for a notification or the wait time to elapse.

            db.query("SELECT heartbeat('runner', %s)", [wait_time])

            worker_pool.groom()

            log.debug("Next run or check in %s", wait_time)

            if not db.wait(pscheduler.timedelta_as_seconds(refresh)):
                log.debug("Timed out.")

        else:

            log.debug("Not waiting.")

        db.query("SELECT heartbeat('runner')")

        runs_ready = []

        for (channel, payload, count) in db.notifications():
            log.debug(f'Notified: {channel} {payload} {count}')
            if channel == 'run_ready':
                log.debug(f'Run {payload} became ready')
                runs_ready.append(int(payload))
            elif channel == 'configurables_changed':
                log.debug('Configurables changed.')
                run_start_margin_set(db)

        # Operate only on runs that are scheduled to start before the next
        # forced refresh.
        # TODO: This should be moved into a stored procedure

        RUNS_QUERY = """
            SELECT * FROM (

            -- Runs we were explicity notified are ready and start
            -- and start before the next time we run this query.
            SELECT
                run.id AS run,
                lower(times) - normalized_wall_clock() AS start_in,
                lower(times) AS start_at,
                FALSE as background_multi
            FROM
                run
                JOIN task ON task.id = run.task
                JOIN test ON test.id = task.test
            WHERE
                lower(run.times) < (normalized_wall_clock() + %s + %s)
                AND run.id in %s
                AND test.scheduling_class <> scheduling_class_background_multi()

            UNION

            -- Non-background runs that haven't started
            SELECT
                run.id AS run,
                lower(times) - normalized_wall_clock() AS start_in,
                lower(times) AS start_at,
                FALSE as background_multi
            FROM
                run
                JOIN task ON task.id = run.task
                JOIN test ON test.id = task.test
            WHERE
                lower(run.times) < (normalized_wall_clock() + %s + %s)
                AND state = run_state_pending()
                AND test.scheduling_class <> scheduling_class_background_multi()

            UNION

            -- Background tasks that should be running.
            SELECT
                run.id AS run,
                'PT1S'::INTERVAL AS start_in,
                normalized_wall_clock() + 'PT1S'::INTERVAL AS start_at,
                TRUE as background_multi
            FROM
                run
                JOIN task ON task.id = run.task
                JOIN test ON test.id = task.test
            WHERE
                times @> normalized_now()
                AND task.enabled
                AND test.scheduling_class = scheduling_class_background_multi()
                AND run.state IN (run_state_pending(), run_state_running())
            ) t
            WHERE start_in >= '0'::INTERVAL
            ORDER BY start_in
            """

        wait_time = refresh

        run_ids = []
        runs_started = False

        for row in db.query(RUNS_QUERY, [
                refresh,
                run_start_margin(),
                # IN (x) has to have something, so use an array that will
                # never match anything in the table.
                tuple(runs_ready or [None]),
                refresh,
                run_start_margin()
        ]):

            run_id, start_in, start_at, background_multi = row

            if run_id in ids_running:
                log.debug("%d is already running" % (run_id))
                continue

            log.debug("Run %d, starts at %s", run_id, start_at)

            run_ids.append(run_id)

            try:

                # Make a worker and throw it into the pool.
                worker_pool(run_id, RunWorker(run_id, start_at, log.is_forced_debugging()), run_completion)
                log.debug("%d: Created worker", run_id)
                ids_running.add(run_id)

                log.debug("%d processors in pool: %s", len(worker_pool), worker_pool.status())

            except Exception as ex:

                # Any failure here means failure of the run.

                diags = "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
                log.error("%d: Unable to start worker: %s" % (run_id, diags))

                with db.cursor() as failed:
                    failed.execute("""
                        UPDATE RUN
                        SET
                          state = run_state_failed(),
                          status = 1,
                          errors = %s
                        WHERE id = %s
                    """, ["Failed to start worker: %s" % (diags),
                          run_id])


                log.error("Unable to set on-deck status: %s" % (str(ex)))


    # Not that this will ever be reached...
    db.close()


main_program()
