#ifndef HV_CHANNEL_HPP_
#define HV_CHANNEL_HPP_

#include <string>
#include <functional>
#include <memory>
#include <atomic>

#include "hloop.h"
#include "hsocket.h"

#include "Buffer.h"

namespace hv {

class Channel {
public:
    Channel(hio_t* io = NULL) {
        io_ = io;
        fd_ = -1;
        id_ = 0;
        ctx_ = NULL;
        status = CLOSED;
        if (io) {
            fd_ = hio_fd(io);
            id_ = hio_id(io);
            ctx_ = hio_context(io);
            hio_set_context(io, this);
            if (hio_is_opened(io)) {
                status = OPENED;
            }
            if (hio_getcb_read(io) == NULL) {
                hio_setcb_read(io_, on_read);
            }
            if (hio_getcb_write(io) == NULL) {
                hio_setcb_write(io_, on_write);
            }
            if (hio_getcb_close(io) == NULL) {
                hio_setcb_close(io_, on_close);
            }
        }
    }

    virtual ~Channel() {
        if (isOpened()) {
            close();
            // NOTE: Detach after destructor to avoid triggering onclose
            if (io_ && id_ == hio_id(io_)) {
                hio_set_context(io_, NULL);
            }
        }
    }

    hio_t*      io() { return io_; }
    int         fd() { return fd_; }
    uint32_t    id() { return id_; }
    int error() { return hio_error(io_); }

    // context
    void* context() {
        return ctx_;
    }
    void setContext(void* ctx) {
        ctx_ = ctx;
    }
    template<class T>
    T* newContext() {
        ctx_ = new T;
        return (T*)ctx_;
    }
    template<class T>
    T* getContext() {
        return (T*)ctx_;
    }
    template<class T>
    void deleteContext() {
        if (ctx_) {
            delete (T*)ctx_;
            ctx_ = NULL;
        }
    }

    // contextPtr
    std::shared_ptr<void> contextPtr() {
        return contextPtr_;
    }
    void setContextPtr(const std::shared_ptr<void>& ctx) {
        contextPtr_ = ctx;
    }
    void setContextPtr(std::shared_ptr<void>&& ctx) {
        contextPtr_ = std::move(ctx);
    }
    template<class T>
    std::shared_ptr<T> newContextPtr() {
        contextPtr_ = std::make_shared<T>();
        return std::static_pointer_cast<T>(contextPtr_);
    }
    template<class T>
    std::shared_ptr<T> getContextPtr() {
        return std::static_pointer_cast<T>(contextPtr_);
    }
    void deleteContextPtr() {
        contextPtr_.reset();
    }

    bool isOpened() {
        if (io_ == NULL || status >= DISCONNECTED) return false;
        return id_ == hio_id(io_) && hio_is_opened(io_);
    }
    bool isClosed() {
        return !isOpened();
    }

    int startRead() {
        if (!isOpened()) return -1;
        return hio_read_start(io_);
    }

    int stopRead() {
        if (!isOpened()) return -1;
        return hio_read_stop(io_);
    }

    int readOnce() {
        if (!isOpened()) return -1;
        return hio_read_once(io_);
    }

    int readString() {
        if (!isOpened()) return -1;
        return hio_readstring(io_);
    }

    int readLine() {
        if (!isOpened()) return -1;
        return hio_readline(io_);
    }

    int readBytes(int len) {
        if (!isOpened() || len <= 0) return -1;
        return hio_readbytes(io_, len);
    }

    // write thread-safe
    int write(const void* data, int size) {
        if (!isOpened()) return -1;
        return hio_write(io_, data, size);
    }

    int write(Buffer* buf) {
        return write(buf->data(), buf->size());
    }

    int write(const std::string& str) {
        return write(str.data(), str.size());
    }

    // iobuf setting
    void setReadBuf(void* buf, size_t len) {
        if (io_ == NULL) return;
        hio_set_readbuf(io_, buf, len);
    }
    void setMaxReadBufsize(uint32_t size) {
        if (io_ == NULL) return;
        hio_set_max_read_bufsize(io_, size);
    }
    void setMaxWriteBufsize(uint32_t size) {
        if (io_ == NULL) return;
        hio_set_max_write_bufsize(io_, size);
    }
    size_t writeBufsize() {
        if (io_ == NULL) return 0;
        return hio_write_bufsize(io_);
    }
    bool isWriteComplete() {
        return writeBufsize() == 0;
    }

    // close thread-safe
    int close(bool async = false) {
        if (isClosed()) return -1;
        status = CLOSED;
        return async ? hio_close_async(io_) : hio_close(io_);
    }

public:
    hio_t*      io_;
    int         fd_;
    uint32_t    id_;
    void*       ctx_;
    enum Status {
        OPENED,
        CONNECTING,
        CONNECTED,
        DISCONNECTED,
        CLOSED,
    };
    std::atomic<Status>          status;
    std::function<void(Buffer*)> onread;
    // NOTE: Use Channel::isWriteComplete in onwrite callback to determine whether all data has been written.
    std::function<void(Buffer*)> onwrite;
    std::function<void()>        onclose;
    std::shared_ptr<void>        contextPtr_;

private:
    static void on_read(hio_t* io, void* data, int readbytes) {
        Channel* channel = (Channel*)hio_context(io);
        if (channel && channel->onread) {
            Buffer buf(data, readbytes);
            channel->onread(&buf);
        }
    }

    static void on_write(hio_t* io, const void* data, int writebytes) {
        Channel* channel = (Channel*)hio_context(io);
        if (channel && channel->onwrite) {
            Buffer buf((void*)data, writebytes);
            channel->onwrite(&buf);
        }
    }

    static void on_close(hio_t* io) {
        Channel* channel = (Channel*)hio_context(io);
        if (channel) {
            channel->status = CLOSED;
            if (channel->onclose) {
                channel->onclose();
            }
        }
    }
};

class SocketChannel : public Channel {
public:
    std::function<void()>   onconnect; // only for TcpClient
    std::function<void()>   heartbeat;

    SocketChannel(hio_t* io) : Channel(io) {
    }
    virtual ~SocketChannel() {}

    // SSL/TLS
    int enableSSL() {
        if (io_ == NULL) return -1;
        return hio_enable_ssl(io_);
    }
    bool isSSL() {
        if (io_ == NULL) return false;
        return hio_is_ssl(io_);
    }
    int setSSL(hssl_t ssl) {
        if (io_ == NULL) return -1;
        return hio_set_ssl(io_, ssl);
    }
    int setSslCtx(hssl_ctx_t ssl_ctx) {
        if (io_ == NULL) return -1;
        return hio_set_ssl_ctx(io_, ssl_ctx);
    }
    int newSslCtx(hssl_ctx_opt_t* opt) {
        if (io_ == NULL) return -1;
        return hio_new_ssl_ctx(io_, opt);
    }
    // for hssl_set_sni_hostname
    int setHostname(const std::string& hostname) {
        if (io_ == NULL) return -1;
        return hio_set_hostname(io_, hostname.c_str());
    }

    // timeout
    void setConnectTimeout(int timeout_ms) {
        if (io_ == NULL) return;
        hio_set_connect_timeout(io_, timeout_ms);
    }
    void setCloseTimeout(int timeout_ms) {
        if (io_ == NULL) return;
        hio_set_close_timeout(io_, timeout_ms);
    }
    void setReadTimeout(int timeout_ms) {
        if (io_ == NULL) return;
        hio_set_read_timeout(io_, timeout_ms);
    }
    void setWriteTimeout(int timeout_ms) {
        if (io_ == NULL) return;
        hio_set_write_timeout(io_, timeout_ms);
    }
    void setKeepaliveTimeout(int timeout_ms) {
        if (io_ == NULL) return;
        hio_set_keepalive_timeout(io_, timeout_ms);
    }

    // heartbeat
    // NOTE: Beware of circular reference problems caused by passing SocketChannelPtr by value.
    void setHeartbeat(int interval_ms, std::function<void()> fn) {
        if (io_ == NULL) return;
        heartbeat = std::move(fn);
        hio_set_heartbeat(io_, interval_ms, send_heartbeat);
    }

    /*
     * unpack
     *
     * NOTE: unpack_setting_t of multiple IOs of the same function also are same,
     *       so only the pointer of unpack_setting_t is stored in hio_t,
     *       the life time of unpack_setting_t shoud be guaranteed by caller.
     */
    void setUnpack(unpack_setting_t* setting) {
        if (io_ == NULL) return;
        hio_set_unpack(io_, setting);
    }

    int startConnect(int port, const char* host = "127.0.0.1") {
        sockaddr_u peeraddr;
        memset(&peeraddr, 0, sizeof(peeraddr));
        int ret = sockaddr_set_ipport(&peeraddr, host, port);
        if (ret != 0) {
            // hloge("unknown host %s", host);
            return ret;
        }
        return startConnect(&peeraddr.sa);
    }

    int startConnect(struct sockaddr* peeraddr) {
        if (io_ == NULL) return -1;
        hio_set_peeraddr(io_, peeraddr, SOCKADDR_LEN(peeraddr));
        return startConnect();
    }

    int startConnect() {
        if (io_ == NULL) return -1;
        status = CONNECTING;
        hio_setcb_connect(io_, on_connect);
        return hio_connect(io_);
    }

    bool isConnected() {
        return status == CONNECTED && isOpened();
    }

    std::string localaddr() {
        if (io_ == NULL) return "";
        struct sockaddr* addr = hio_localaddr(io_);
        char buf[SOCKADDR_STRLEN] = {0};
        return SOCKADDR_STR(addr, buf);
    }

    std::string peeraddr() {
        if (io_ == NULL) return "";
        struct sockaddr* addr = hio_peeraddr(io_);
        char buf[SOCKADDR_STRLEN] = {0};
        return SOCKADDR_STR(addr, buf);
    }

private:
    static void on_connect(hio_t* io) {
        SocketChannel* channel = (SocketChannel*)hio_context(io);
        if (channel) {
            channel->status = CONNECTED;
            if (channel->onconnect) {
                channel->onconnect();
            }
        }
    }

    static void send_heartbeat(hio_t* io) {
        SocketChannel* channel = (SocketChannel*)hio_context(io);
        if (channel && channel->heartbeat) {
            channel->heartbeat();
        }
    }
};

typedef std::shared_ptr<Channel>        ChannelPtr;
typedef std::shared_ptr<SocketChannel>  SocketChannelPtr;

}

#endif // HV_CHANNEL_HPP_