#!/usr/bin/env python3
"""
DESCRIPTION
"""

import sys
import pscheduler
from kafka import KafkaProducer
from kafka.errors import KafkaError
import tempfile
import os

MAX_SCHEMA = 1

SECURITY_PROTOCOL_SSL = "SSL"

log_prefix = "archiver-kafka"

log = pscheduler.Log(prefix=log_prefix, quiet=False)

archiver = None


class psKafkaArchiver(object):
    def is_connected(self):
        """
        Determine if producer is connected
        """
        return self.producer is not None

    def close(self):
        """
        Close producer connection
        """
        if self.producer:
            self.producer.close()
        self.producer = None

        if(self.security_protocol == SECURITY_PROTOCOL_SSL):
            log.debug("Cleaning up cert files")
            os.unlink(self.ssl_cafile)
            os.unlink(self.ssl_keyfile)
            os.unlink(self.ssl_certfile)

    def publish(self, message):
        """
        Send message to producer
        """
        if self.is_connected() is True:
            self.producer.send(self.topic, message)

    def _write_to_tempfile(self, data):
        tfile = tempfile.NamedTemporaryFile(delete=False)
        tfile.write(bytes(data,'utf-8'))
        tfile.flush()
        tfile.close()
        return tfile

    def __init__(
        self,
        bootstrap_servers,
        topic,
        archiver_id='ps-kafka-archiver',
        retries=3,
        security_protocol="",
        ssl_cacert=None,
        ssl_cert=None,
        ssl_key=None,
        ssl_password="",
        ssl_checkhostname=False,
    ):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.ssl_checkhostname = False
        self.client_id = archiver_id
        self.retries = retries
        log.debug(security_protocol)
        if security_protocol == SECURITY_PROTOCOL_SSL:
            self.security_protocol = security_protocol
            self.ssl_cacert = ssl_cacert
            cafile = self._write_to_tempfile(ssl_cacert)
            self.ssl_cafile = cafile.name
            self.ssl_cert = ssl_cert
            certfile = self._write_to_tempfile(ssl_cert)
            self.ssl_certfile = certfile.name
            self.ssl_key = ssl_key
            keyfile = self._write_to_tempfile(ssl_key)
            self.ssl_keyfile = keyfile.name
            if ssl_password:
                self.ssl_password = ssl_password
            log.debug("Cert Files: "+self.ssl_cafile+","+self.ssl_certfile+","+self.ssl_keyfile)
            self.ssl_checkhostname = ssl_checkhostname
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                client_id=self.client_id,
                retries=self.retries,
                security_protocol=self.security_protocol,
                ssl_cafile=self.ssl_cafile,
                ssl_certfile=self.ssl_certfile,
                ssl_keyfile=self.ssl_keyfile,
                ssl_password=self.ssl_password,
                ssl_check_hostname=self.ssl_checkhostname,
            )
        else:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                client_id=self.client_id,
                retries=self.retries,
            )


def get_archiver(archiverconfig):
    global archiver
    if archiver is not None and archiver.is_connected():
        log.debug("Kafka archiver exists. So reusing")
        return
    else:
        if archiver:
            archiver.close()
        archiver = None
        create_archiver(archiverconfig)


def create_archiver(archiverconfig):
    bootstrap_servers = archiverconfig["server"]
    log.debug("Bootstrap server: " + bootstrap_servers)
    topic = archiverconfig["topic"]
    log.debug("Topic: " + topic)
    archiver_id = archiverconfig["archiver-id"]
    retries = archiverconfig["kafka-retries"]
    security_protocol = archiverconfig["security-protocol"]
    log.debug("Security protocol is"+security_protocol)
    global archiver

    if (security_protocol == SECURITY_PROTOCOL_SSL):
        log.debug("Found security protocol in config")

        archiver = psKafkaArchiver(
            bootstrap_servers=bootstrap_servers,
            archiver_id=archiver_id,
            topic=topic,
            retries=retries,
            security_protocol=security_protocol,
            ssl_cert=archiverconfig["_ssl-cert"],
            ssl_cacert=archiverconfig["_ssl-cacert"],
            ssl_key=archiverconfig["_ssl-key"],
            ssl_password=archiverconfig["_ssl-password"],
            ssl_checkhostname=archiverconfig["ssl-checkhostname"],
        )
    else:
        log.debug("Creating plain text archiver")
        archiver = psKafkaArchiver(
            bootstrap_servers=bootstrap_servers,
            archiver_id=archiver_id,
            topic=topic,
            retries=retries,
        )


def archive(json):
    # Do archiving here

    data = json["data"]
    message = pscheduler.json_dump(json["result"]).encode("utf-8")
    log.debug("Kafka server and topic %s" % data)
    try:
        get_archiver(data)
        archiver.publish(message)
        result = {"succeeded": True} 
        archiver.close()

    except Exception as ex:
        # Cleanup producer and start a fresh connection
        if archiver:
            archiver.close()
        get_archiver(data)
        result = {"succeeded": False, "error": "Failed to send message: %s" % (str(ex))}
        if "retry-policy" in data:
            log.debug("Retry exists")
            policy = pscheduler.RetryPolicy(data["retry-policy"], iso8601=True)
            retry_time = policy.retry(json["attempts"])
            if retry_time is not None:
                result["retry"] = retry_time
    return result


PARSER = pscheduler.RFC7464Parser(sys.stdin)
EMITTER = pscheduler.RFC7464Emitter(sys.stdout)

for parsed in PARSER:
    try:
        EMITTER(archive(parsed))
    except BrokenPipeError as ex:
        log.warning("Broken pipe during archiving; parent must have exited.")
        pscheduler.succeed()

pscheduler.succeed()
