#!/usr/bin/python3
#
# pScheduler Archiver Daemon
#


import collections
import datetime
import optparse
import os
import psycopg2
import queue
import signal
import threading
import time

import pscheduler

from dateutil.tz import tzlocal


pscheduler.set_graceful_exit()


# Gargle the arguments

opt_parser = optparse.OptionParser()

# Program options

opt_parser.add_option("-a", "--archive-defaults",
                      help="Directory containing default archivers",
                      action="store", type="string", dest="archive_defaults",
                      default="/etc/pscheduler/default-archives")
opt_parser.add_option("-d", "--dsn",
                      help="Database connection string",
                      action="store", type="string", dest="dsn",
                      default="dbname=pscheduler")
opt_parser.add_option("-m", "--max-parallel",
                      help="Maximum concurrent archivings",
                      action="store", type="int", dest="max_parallel",
                      default=100)
opt_parser.add_option("-p", "--pool-size",
                      help="Size of pool per archive type",
                      action="store", type="int", dest="pool_size",
                      default=25)
opt_parser.add_option("-r", "--refresh",
                      help="Forced refresh interval (ISO8601)",
                      action="store", type="string", dest="refresh",
                      default="PT1M")
opt_parser.add_option("--timeout",
                      help="Timeout for archiver I/O (ISO8601)",
                      action="store", type="string", dest="timeout",
                      default="PT2M")

opt_parser.add_option("--worker-idle",
                      help="Idle time before worker processes exit",
                      action="store", type="string", dest="worker_idle",
                      default="PT1M")
opt_parser.add_option("--worker-threads",
                      help="Maximum threads per worker process",
                      action="store", type="int", dest="worker_threads",
                      default=25)

opt_parser.add_option("--skim-interval",
                      help="How often to skim archiver pools (ISO8601)",
                      action="store", type="string", dest="skim_interval",
                      default="PT30S")
opt_parser.add_option("--archiver-iterations",
                      help="How many iterations before an archiver is stopped",
                      action="store", type="int", dest="archiver_iterations",
                      default=10000)
opt_parser.add_option("--archiver-idle",
                      help="How long idle archivers are kept",
                      action="store", type="string", dest="archiver_idle",
                      default="PT1M")


opt_parser.add_option("--verbose", action="store_true", dest="verbose")
opt_parser.add_option("--debug", action="store_true", dest="debug")
opt_parser.add_option("--debug_noio", action="store_true", dest="debug_noio")


(options, _args) = opt_parser.parse_args()

if options.max_parallel < 1:
    opt_parser.error("Number of concurrent archivings must be positive.")

if options.pool_size < 1:
    opt_parser.error("Pool size must be positive.")

refresh = pscheduler.iso8601_as_timedelta(options.refresh)
if refresh is None:
    opt_parser.error('Invalid refresh interval "' + options.refresh + '"')
refresh = pscheduler.timedelta_as_seconds(refresh)


if options.worker_threads < 1:
    opt_parser.error("Number of worker threads must be positive.")

skim_interval = pscheduler.iso8601_as_timedelta(options.skim_interval)
if skim_interval is None:
    opt_parser.error('Invalid skim interval "' + options.skim_interval + '"')

if options.archiver_iterations < 1:
    opt_parser.error("Number of archiver iterations must be positive.")
archiver_iterations = options.archiver_iterations

archiver_idle = pscheduler.iso8601_as_timedelta(options.archiver_idle)
if archiver_idle is None:
    opt_parser.error('Invalid archiver idle time "' + options.archiver_idle + '"')

timeout = pscheduler.iso8601_as_timedelta(options.timeout)
if timeout is None:
    opt_parser.error('Invalid timeout "' + options.timeout + '"')
timeout = pscheduler.timedelta_as_seconds(timeout)

worker_idle_td = pscheduler.iso8601_as_timedelta(options.worker_idle)
if worker_idle_td is None:
    opt_parser.error('Invalid worker idle time "' + options.worker_idle + '"')
worker_idle = pscheduler.timedelta_as_seconds(worker_idle_td)

log = pscheduler.Log(verbose=options.verbose, debug=options.debug, propagate=True)

dsn = pscheduler.string_from_file(options.dsn)




#
# Archive Processs, ProcessPools and Pool Collections
#


class ArchiveProcess(object):
    """
    Maintain an archive process that streams JSON in and out.
    """

    def __init__(self,
                 archiver,
                 logger,
                 max_iterations=None,  # Max iterations before expiring
                 idle=None             # Idle time before expiring
                ):
        self.archiver = archiver
        self.log = logger
        self.max_iterations = max_iterations
        self.idle = idle
        self.program = None
        self._used(initialize=True)
        self._establish()

    def __repr__(self):
        result = '<%s %s' % (self.__class__.__name__, self.archiver)
        if self.max_iterations is not None:
            result += ' Iters:%d/%d' % (self.iterations, self.max_iterations)
        if self.expires is not None:
            result += ' Expires:%s' % (self.expires - datetime.datetime.now())
        result += ' Expired>' if self.is_expired() else '>'
        return result


    def _used(self, initialize=False):
        """Calculate a new iteration count and expiration time"""
        if initialize:
            self.iterations = 0
            self.expires = None
        else:
            self.iterations += 1
        if self.idle is not None:
            self.expires = datetime.datetime.now() + self.idle


    def _establish(self):
        """
        Start the archiver process
        """

        if self.program is not None and not self.program.running():
            self.log.warning("%s: Program stopped running unexpectedly", self.archiver)
            self.program.done()
            self.program = None

        if self.is_expired():
            self.log.debug("%s: Expired.  Restarting.", self.archiver)
            self.done()

        # TODO: This is pulling timeout from a global and shouldn't be.
        if self.program is None:
            path = pscheduler.plugin_method_path("archiver", self.archiver, "archive")
            self.log.debug("%s: Starting %s", self.archiver, path)
            self.program = pscheduler.StreamingJSONProgram([path], timeout=timeout,
                                                           debug=self.log.debug, label=self.archiver)
            self._used(initialize=True)


    def __call__(self, json):
        """
        Do a round trip with the program.
        """
        self._establish()
        self._used()
        # TODO:  Should assert or raise if expired.
        try:
            return self.program(json)
        except (IOError, pscheduler.StreamingJSONProgramFailure) as ex:
            self.log.debug("Plugin failed: %s", ex)
            self.done()
            # Return an error but Don't retry.  The
            # StreamingJSONProgram will have retried.
            return {
                "succeeded": False,
                "error": "Possibly-permanent archiver error: %s" % (str(ex))
                }

        assert False, "Should not be reached."


    def done(self):
        """Stop the archive process"""
        if self.program is not None:
            self.log.debug("%s: Process done", self.archiver)
            self.program.done()
            self.program = None


    def is_expired(self):
        """Determine if the program has expired through time or iterations.."""
        return ((self.max_iterations is not None and (self.iterations >= self.max_iterations))
                or (self.idle is not None and (datetime.datetime.now() > self.expires)))


    def __del__(self):
        self.done()



class ArchiverProcessPool(object):
    """
    A pool of archivers of a single type
    """

    def __init__(self, archiver, logger, max_size, max_iterations=None, idle=None, max_process_tries=20):
        self.archiver = archiver
        self.log = logger
        self.max_size = max_size
        self.max_iterations = max_iterations
        self.idle = idle
        self.max_process_tries = max_process_tries

        self.pool = collections.deque()
        self.lock = threading.Lock()
        self.size = 0
        self.reset_cycle = 0
        self.skim_count = 0


    def __len__(self):
        return self.size


    def __call__(self, json, label=None):
        """
        Pull a process from the pool (or create one) and run a result
        through it.
        """

        label = '' if label is None else ' ' + label

        process = None
        tries = 0

        self.log.debug("Pool %s%s: Getting a process (%d available, %d max)",
                       self.archiver, label, len(self.pool), self.max_size)

        while process is None:
            with self.lock:
                # Take a process from the pool.
                try:

                    while True:

                        # If the pool is empty, an IndexError will break out of the loop.
                        process = self.pool.pop()

                        # If what we got is useable, use it.
                        if not process.is_expired():
                            break

                        # What we got was expired.  Remove it.
                        self.log.debug("Pool %s%s: Purging an expired process", self.archiver, label)
                        del process
                        self.size -= 1

                    self.log.debug("Pool %s%s: Got an existing process (%d in pool)",
                                   self.archiver, label, len(self.pool))
                except IndexError:
                    # Pool is empty.
                    self.log.debug("Pool %s%s: No processes available.", self.archiver, label)
                    if self.size < self.max_size:
                        # Room for new processes; create one.
                        self.log.debug("Pool %s%s: Creating a new process",
                                       self.archiver, label)
                        process = ArchiveProcess(self.archiver, self.log,
                                                 max_iterations=self.max_iterations,
                                                 idle=self.idle)
                        self.size += 1
                        self.log.debug("Pool %s%s: New process %d",
                                       self.archiver, label, self.size)
                    else:
                        # Pool is full and all processes are busy.
                        self.log.debug("Pool %s%s: Pool is full and busy.", self.archiver, label)

                if process is not None:
                    cycle_at_start = self.reset_cycle

            # TODO: Need to do something better than this, but it has
            # to be deadlock-proof.
            if process is None:
                tries += 1
                if tries >= self.max_process_tries:
                    self.log.debug("Pool %s%s: Gave up waiting for a process", self.archiver, label)
                    return {
                        "succeeded": False,
                        "error": "Unable to get an archiver process after %s attempts" % self.max_process_tries
                        # Don't do a retry here.  This is a problem.
                    }
                self.log.debug("Pool %s%s: Waiting for a process (%d)", self.archiver, label, tries)
                time.sleep(0.5)

        try:
            # TODO: This gets lengthy, so use a shorter version for now.
            ### self.log.debug("Pool %s%s: Sending JSON %s", self.archiver, label, json)
            self.log.debug("Pool %s%s: Sending JSON", self.archiver, label)
            result = process(json)
            self.log.debug("Pool %s%s: Process returned %s", self.archiver, label, result)
        except Exception as ex:
            self.log.error("Pool %s%s: Exception: %s", self.archiver, label, str(ex))
            raise ex
        finally:
            # Dispose of the process no matter what happened.
            with self.lock:
                same_cycle = self.reset_cycle == cycle_at_start
                if same_cycle \
                   and self.skim_count == 0 \
                   and self.size <= self.max_size:
                    self.pool.append(process)
                    self.log.debug("Pool %s%s: Returned a process (%d in pool)",
                                   self.archiver, label, len(self.pool))
                else:
                    process.done()
                    self.size -= 1
                    if same_cycle and self.skim_count > 0:
                        self.skim_count -= 1
                    self.log.debug("Pool %s%s: Dropped a process", self.archiver, label)

        return result


    def fill(self):
        """
        Artificially add processes up to the maximum.  (Intended for test
        and debug only.)
        """
        with self.lock:
            while self.size < self.max_size:
                self.pool.append(ArchiveProcess(self.archiver, self.log))
                self.size += 1
            self.log.debug("Pool %s: Filled to %d", self.archiver, self.size)


    def drain(self):
        """
        Force all running processes to stop.
        """
        with self.lock:
            # Anything in the pool is idle
            self.size -= len(self.pool)
            for proc in self.pool:
                proc.done()
            self.pool.clear()

            # Changing reset cycles makes runners not re-pool themselves
            self.reset_cycle += 1

        self.log.debug("Pool %s: Drained", self.archiver)



    def skim(self):
        """
        Clean out anything that's expired
        """

        self.log.debug("Pool %s: Skimming", self.archiver)
        with self.lock:
            rotates_left = len(self.pool)
            while rotates_left > 0:
                self.pool.rotate(1)
                rotates_left -= 1
                if self.pool[-1].is_expired():
                    popped = self.pool.pop()
                    self.log.debug("Pool %s: Skimmed %s", self.archiver, popped)
                    popped.done()
                    self.size -= 1
                    rotates_left -= 1


    def done(self):
        self.drain()


    def __del__(self):
        self.done()


class ArchiverPoolCollection(object):
    """
    A collection of ArchiverProcessPools for each archiver type.
    """

    def __init__(self, logger, max_size=10, max_iterations=None, idle=None, skim_time=None):
        self.log = logger
        self.max_size = max_size
        self.max_iterations = max_iterations
        self.idle = idle

        self.pool = {}
        self.lock = threading.Lock()

        if skim_interval is not None:
            self.log.debug("Starting archiver pool skimmer.")
            self.skim_interval = skim_time.total_seconds()
            self.skimmer = threading.Thread(
                target=self._skimmer)
            self.skimmer.setDaemon(True)
            self.skimmer.start()


    def _skimmer(self):
        """
        Worker thread to periodically skim the pools.
        """
        self.log.debug("Archiver pool skimmer running.")
        while True:
            time.sleep(self.skim_interval)
            self.log.debug("Skimming the pools.")
            self.skim()


    def __call__(self, archiver, json, label=None):
        """
        Archive a result to the correct pool.
        """

        with self.lock:
            if archiver not in self.pool:
                self.log.debug("No archiver pool for %s.  Creating one." % archiver)
                self.pool[archiver] = ArchiverProcessPool(
                    archiver, self.log,
                    max_size=self.max_size,
                    max_iterations=self.max_iterations,
                    idle=self.idle
                    )

        assert archiver in self.pool
        return self.pool[archiver](json, label=label)


    def skim(self):
        """
        Skim all pools
        """
        with self.lock:
            for archiver in self.pool:
                self.log.debug("Skimming archiver pool %s" % archiver)
                self.pool[archiver].skim()


    def drain(self):
        """
        Drain all pools
        """
        with self.lock:
            for archiver in self.pool:
                self.log.debug("Draining archiver pool %s" % archiver)
                self.pool[archiver].drain()


    def done(self):
        self.drain()



# ------------------------------------------------------------------------------

# Globals for use by worker pool processes

dbpool = None

archiver_collection = None

def worker_process_setup(args):
    """Set up everything globally for a worker process"""

    pool_dsn, _max_size = args
    global dbpool
    dbpool = pscheduler.DBConnectionPool(pool_dsn,
                                         options.worker_threads,
                                         name='archiver-%s' % (os.getpid()),
                                         log_callback=log.warning)

    global archiver_collection
    archiver_collection = ArchiverPoolCollection(
        max_size=options.pool_size,
        max_iterations=archiver_iterations,
        idle=archiver_idle,
        skim_time=skim_interval,
        logger=log)




def worker_process_teardown(*args):
    """Tear down up everything globally for a worker process"""
    global archiver_collection
    if archiver_collection is not None:
        archiver_collection.done()
        archiver_collection = None


def worker_process_reset_collection():
    global archiver_collection
    archiver_collection.done()



# ------------------------------------------------------------------------------


class DefaultArchiveMaintainer(object):
    """Maintainer for default archives.  This runs in the main process."""

    def __init__(self, path, db_dsn, logger):

        self.path = path
        self.dsn = db_dsn
        self.log = logger

        # This needs its own database connection without autocommit.
        self.db = pscheduler.PgConnection(db_dsn, name="archiver-default",
                                          autocommit=False)

        # Most recent file or directory change we saw
        self.most_recent = 0


    def refresh(self):

        self.log.debug("Refreshing default archivers from %s", self.path)

        if not os.path.isdir(self.path):
            self.log.debug("%s is not a directory", self.path)
            return

        timestamps = [os.path.getmtime(self.path)]

        # Examine everything before tinkering with the database

        archives = []

        # Hold warnings until after we see something's changed so we
        # don't spew them repeatedly.
        warnings = []

        try:
            paths = [os.path.join(self.path, f)
                     for f in os.listdir(self.path)]
        except OSError as ex:
            self.log.debug("Unable to read %s: %s", self.path, ex)
            return

        # Gather the timestamps

        for path in paths:

            self.log.debug("Examining file %s", path)

            try:

                if os.path.isfile(path):

                    # Append the timestamp whether the file is valid or
                    # not, because we're looking for any kind of change.
                    # Note that we check mtime *and* ctime to catch
                    # permission changes and the like.
                    timestamps.append(max(os.path.getctime(path),
                                          os.path.getmtime(path)))
                else:
                    self.log.debug("Not a file")
                    continue

            except Exception as ex:
                warnings.append("Ignoring %s: %s" % (path, str(ex)))
                continue

        # If nothing's changed, we're done.

        timestamps = sorted(timestamps, reverse=True)
        newest = timestamps[0]
        if newest <= self.most_recent:
            self.log.debug("Nothing has changed; not updating.")
            return

        self.most_recent = newest

        # Parse the files and make a list to be inserted into the database

        for path in paths:

            self.log.debug("Reading file %s", path)

            try:
                with open(path, 'r') as content:
                    spec = pscheduler.json_load(content, max_schema=1)
            except Exception as ex:
                warnings.append("Ignoring %s: %s" % (path, str(ex)))
                continue

            archives.append({
                "path": path,
                "spec": spec
            })

        # Spit out any warnings that were accumulated

        for warning in warnings:
            self.log.warning(warning)

        # Write to the database.

        self.log.debug("Writing the database")

        try:
            self.db.query("DELETE FROM archive_default")
        except Exception as ex:
            self.log.error("Failed to delete defaults: %s", str(ex))

        for archive in archives:

            self.db.query("SAVEPOINT archiver_insert")
            failed = True
            try:
                self.db.query("INSERT INTO archive_default (archive) VALUES (%s)",
                              [pscheduler.json_dump(archive["spec"])])
                self.log.debug("Inserted data from %s", archive["path"])
                failed = False
            except psycopg2.Error as ex:
                self.log.warning("Ignoring %s: %s", archive["path"], str(ex))
            except Exception as ex:
                self.log.error("%s: %s", archive["path"], str(ex))

            if failed:
                self.db.query("ROLLBACK TO SAVEPOINT archiver_insert")
            else:
                self.db.query("RELEASE SAVEPOINT archiver_insert")

        # Finish up and get out

        self.db.commit()



#
# Archive Worker
#

class ArchiveWorker(pscheduler.GenericWorker):

    def __init__(self, number, row, full_debug):

        self.number = number
        self.row = row
        self.full_debug = full_debug

        # This is set in a later method.
        self.log_debug = None


    def __call__(self):
        """
        Archive the result in a thread-safe way
        """

        self.log_debug = log.debug_always if self.full_debug else lambda *args: None

        self.log_debug("%d: Thread running", self.number)
        try:
            self.__run()
        except Exception:
            # Don't worry about the result here.  If __run() failed to
            # post anything, that will be the end of it.  If it did,
            # it might be salvageable.
            log.exception()
        finally:
            self.log_debug("%d: Thread finished", self.number)


    def __run(self):
        """
        Do the deed
        """

        failed = False

        archiving_id, task_uuid, run_uuid, archiver, archiver_data, start, \
            duration, test, tool, participants, result_merged, attempts, \
            last_attempt, transform, task_detail, run_detail, spec, debug = self.row

        if debug:
            log_debug = log.debug_always
            log_debug("%d: Task requested debug", archiving_id)
        else:
            # Take whatever was set up when we were instantiated
            log_debug = self.log_debug

        participants_merged = []
        for participant in participants:
            participants_merged.append(pscheduler.api_local_host() \
                                       if participant is None \
                                       else participant)

        uri_host = spec.get("uri-host", pscheduler.api_local_host())
        task_href = pscheduler.api_url(host=uri_host, path="tasks/%s" % (task_uuid))
        run_href = pscheduler.api_url(host=uri_host, path="tasks/%s/runs/%s" \
                                      % (task_uuid, run_uuid))


        if 'label' in spec:
            label = "%d: %s" % (archiving_id, spec['label'])
        else:
            label = str(archiving_id)


        # Strip the internal-use-only parts from the task details.
        for remove in [
                'archives'
        ]:
            try:
                del task_detail[remove]
            except KeyError:
                pass


        # Strip the internal-use-only parts from the run details.
        for remove in [
                'archivings',
                'participant-data',
                'participant-data-full',
                'result'
        ]:
            try:
                del run_detail[remove]
            except KeyError:
                pass

        log_debug("%s: Task is %s", label, task_href)
        log_debug("%s: Run is %s", label, run_href)

        json = {
            # This may contain private data that the archiver needs to see.
            'data': archiver_data,
            'task-href': task_href,
            'run-href': run_href,
            'result': {
                'id': run_uuid,
                'schedule': {
                    'start': pscheduler.datetime_as_iso8601(start),
                    'duration': pscheduler.timedelta_as_iso8601(duration)
                },
                # This may contain data that isn't to be leaked.
                'test': pscheduler.json_decomment(test, prefix="_", null=True),
                'tool': {
                    'name': tool['name'],
                    'version': tool['version'],
                },
                'run': run_detail,
                'task': task_detail,
                'participants': participants_merged,
                'result': result_merged,
                'reference': task_detail.get('reference', None)
            },
            'attempts': attempts,
            'last-attempt': None if last_attempt is None \
            else pscheduler.datetime_as_iso8601(last_attempt)
        }

        json_result = json["result"]

        json_result["task"]["href"] = task_href
        json_result["run"]["href"] = run_href
        json_result["run"]["task-href"] = task_href

        # If there's a transform (already validated), do it.

        if transform is not None:
            log_debug("%d Transforming input %s", self.number, json)
            log_debug("%d Script is %s", self.number, transform["script"])
            try:
                raw = transform.get("output-raw", False)
                transformer = pscheduler.JQFilter(
                    filter_spec=transform.get("script", "."),
                    args=transform.get("args", {}),
                    output_raw=raw
                )
                if raw:
                    json["result"] = transformer(json["result"])
                else:
                    # Result is always a single object when doing JSON.
                    json["result"] = transformer(json["result"])[0]
                log_debug("%s: Transformed to %s", label, json)
            except Exception as ex:
                log.error("%s: Error during transformation: %s", label, str(ex))
                failed = True
                returncode = 1
                stdout_attempt = ""
                stderr = "Error during transformation: %s" % (str(ex))
                result = {
                    "succeeded": False,
                    "error": stderr
                }


        if json["result"] is None:
            log_debug("%s: Null transform result; not archiving", label)
            returncode = 0
            stdout_attempt = "Archiving stopped by transform."
            stderr = ""
            skip = True
            result = {
                "succeeded": True,
                "error": stdout_attempt
            }
        else:
            skip = False

        if (not failed) and (not skip):

            # TODO: This gets long, so use a shorter version for now.
            # log_debug("%s: Archiving to %s: %s", label, archiver, json)
            log_debug("%s: Archiving to %s", label, archiver)

            try:
                result = archiver_collection(archiver, json, label=label)
                log_debug("%s: Returned JSON from archiver: %s", label, result)
                returncode = 0
            except Exception as ex:
                log_debug("%s: Archiver threw an exception: %s", label, ex)
                result = {
                    "succeeded": False,
                    "error": "Exception during archiving: %s" % (str(ex))
                }
                log.exception("%s: Exception" % (label))
                returncode = 1

            assert result is not None


        attempt = {
            # TODO: Figure out to format this with -xx:xx for the timezone offset.
            "time": pscheduler.datetime_as_iso8601(datetime.datetime.now(tzlocal())),
            "return-code": returncode,
            "stdout": result,
            "stderr": "No result present." if result is None else result.get("error", "")
        }

        if returncode != 0:

            log_debug("%s: Permanent Failure: %s", label, result.get("error", "Unspecified error"))
            with dbpool(self.number) as db:
                cursor = db.cursor()
                cursor.execute("""UPDATE archiving
                                  SET
                                      completed = TRUE,
                                      attempts = attempts + 1,
                                      last_attempt = now(),
                                      next_attempt = NULL,
                                      diags = diags || (%s::JSONB)
                                  WHERE id = %s""",
                               [pscheduler.json_dump(attempt), archiving_id])

        else:

            if result is not None and result.get('succeeded', False):
                log_debug("%s: Succeeded: %s to %s", label, run_uuid, archiver)
                with dbpool(self.number) as db:
                    cursor = db.cursor()
                    cursor.execute("""UPDATE archiving
                                      SET
                                          completed = TRUE,
                                          archived = TRUE,
                                          attempts = attempts + 1,
                                          last_attempt = now(),
                                          next_attempt = NULL,
                                          diags = diags || (%s::JSONB)
                                      WHERE id = %s""",
                                   [pscheduler.json_dump(attempt), archiving_id])

            else:

                log.warning("%s: Failed to archive %s to %s: %s",
                            label, run_href, archiver,
                            result.get("error", "Unspecified problem"))

                # If there's a retry, schedule the next one.

                if "retry" in result:

                    next_delta = pscheduler.iso8601_as_timedelta(
                        result['retry'])

                    next_try = datetime.datetime.now(tzlocal()) \
                           + next_delta

                    log_debug("%s: Rescheduling for %s", label, next_try)

                    with dbpool(self.number) as db:
                        cursor = db.cursor()
                        cursor.execute("""UPDATE archiving
                                          SET
                                              attempts = attempts + 1,
                                              last_attempt = now(),
                                              next_attempt = %s,
                                              diags = diags || (%s::JSONB)
                                          WHERE id = %s""",
                                       [next_try,
                                        pscheduler.json_dump(attempt),
                                        archiving_id])

                else:

                    log_debug("%s: No retry requested.  Giving up.", label)

                    log.warning("%s: Gave up archiving %s to %s",
                                label, run_href, archiver)

                    with dbpool(self.number) as db:
                        cursor = db.cursor()
                        cursor.execute("""UPDATE archiving
                                          SET
                                              completed = TRUE,
                                              attempts = attempts + 1,
                                              last_attempt = now(),
                                              next_attempt = NULL,
                                              diags = diags || (%s::JSONB)
                                          WHERE id = %s""",
                                       [pscheduler.json_dump(attempt), archiving_id])



#
# Main Program
#

def main_program():

    db = pscheduler.PgConnection(dsn)
    db.listen("archiving_change")
    db.listen("warmboot")

    # Something to maintain the default archiver list
    default_maintainer = DefaultArchiveMaintainer(options.archive_defaults, dsn, log)
    default_maintainer.refresh()

    worker_pool = pscheduler.WorkerProcessPool(
        name="archiver-pool",
        load_limit=options.worker_threads,
        setup=worker_process_setup,
        setup_args=(dsn, options.worker_threads,),
        teardown=worker_process_teardown,
        teardown_args=(),
        debug_callback=log.debug,
        # Don't limit this; the OS will punish too many processes.
        pool_size_limit=None,
        idle_time=worker_idle
    )

    next_refresh = None

    db.query("SELECT heartbeat_boot('archiver')")

    ids_archiving = pscheduler.ThreadSafeSet()

    # This is run when a task completes
    def archive_completion(identifier, value, diags):
        """Handle the worker pool notifying us of completion"""
        ids_archiving.remove(identifier)
        if value is None:
            log.debug("%d: Worker reported completion", identifier)
        else:
            log.error("%d: Worker failed; returned %s: %s", identifier, value, diags)


    while True:

        db.query("SELECT heartbeat('archiver')")

        warmboot = False

        # Wait for something to happen.

        if next_refresh is None:

            log.debug("Retrieving immediately.")

        else:

            next_refresh = min(next_refresh, refresh)

            log.debug("Waiting %s for change or notification", next_refresh)

            warmboot = False

            db.query("SELECT heartbeat('archiver', %s)", [datetime.timedelta(seconds=next_refresh)])

            worker_pool.groom()

            if next_refresh is not None and db.wait(next_refresh):
                notifications = [tup[0] for tup in db.notifications()]
                log.debug("Notifications: %s" % (" ".join(notifications)))
                warmboot = "warmboot" in notifications
            else:
                log.debug("Not waiting or nohing happened.")

        if warmboot:
            log.info("Warm boot; restarting.")
            # The pool will hang around until its idle children disappear.
            del worker_pool
            return True

        # Until we hear otherwise...
        next_refresh = refresh

        query_limit = options.max_parallel - len(ids_archiving)

        # If we're already full up on workers, don't bother hitting the database
        if query_limit < 1:
            log.debug("Already have a full slate of workers.")
            continue

        log.debug("Limiting query to %d", query_limit)

        exclusions = list(ids_archiving.items())
        log.debug("Excluding %s", exclusions)

        result = db.query("""SELECT id, task_uuid, run_uuid, archiver,
                             archiver_data, start,
                             duration, test, tool, participants, result,
                             attempts, last_attempt, transform, task_detail, run_detail,
                             spec, debug
                             FROM archiving_next(%s, %s)""",
                          [query_limit, exclusions])

        if len(result) == 0:
            log.debug("Nothing to archive; finding time until next archiving.")
            result = db.query("""SELECT next_attempt - now() FROM archiving
                                 WHERE NOT completed AND next_attempt > now()
                                 ORDER BY next_attempt LIMIT 1""")

            assert len(result) < 2
            if len(result) == 0:
                log.debug("Nothing in the future.")
            else:
                next_refresh = pscheduler.timedelta_as_seconds(next(result)[0])
                log.debug("Next archiving in %s", next_refresh)

            continue

        # Only refresh the default archivers after a wait that got rows.
        if next_refresh is not None:
            default_maintainer.refresh()

        log.debug("Got %d rows", len(result))

        for row in result:

            # Don't bother if there are already too many archivers running.
            # TODO: Why do we do this when we checked for it above?
            if len(ids_archiving) >= options.max_parallel:
                log.debug("Already running %d archivers.", len(ids_archiving))
                break

            archiving_id = row[0]

            if archiving_id in ids_archiving:
                log.debug("%d: Already running a worker", archiving_id)
                continue

            log.debug("%d: Starting worker", archiving_id)
            try:
                worker_pool(archiving_id,
                            ArchiveWorker(archiving_id, row, log.is_forced_debugging()),
                            archive_completion)
                ids_archiving.add(archiving_id)
                next_refresh = None
                log.debug("%d: Worker started", archiving_id)
            except Exception:
                log.exception(message="%d: Exception while starting worker" % (archiving_id))




main_program()
