#include <string>
#include <fcntl.h> 
#ifndef _WIN32
#include <unistd.h> 
#endif
#include <fstream>
#include <iomanip>

#include <hv/hv.h>
#include <hv/hmain.h>
#include <hv/iniparser.h>
#include <hv/hloop.h>
#include <hv/hsocket.h>
#include <hv/hssl.h>

#include "kdefine.h"
#include "opmysql.h"
#include "eventhandler.h"

#ifndef TEST_UNPACK
#define TEST_UNPACK 1
#endif

#if TEST_UNPACK
static unpack_setting_t unpack_setting;
#endif

//连接关闭回调
static void on_close(hio_t* io);
//接入回调
static void on_accept(hio_t* io);
//接收到数据
static void on_recv(hio_t* io, void* buf, int readbytes);
//重载参数回调
static void on_reload(void* userdata);

/*
 * @build: make
 * @usage: datahubs -h
 *         datahubs -v
 *
 *         datahubs -c datahubs.conf -d
 *         ps aux | grep datahubs
 *
 *         datahubs -s stop
 *         ps aux | grep datahubs
 *
 */

typedef struct conf_ctx_s
{
    IniParser* parser;
    int loglevel;
    int worker_processes;
    int worker_threads;
    std::string host;
    int port;
    std::string dbserver;
    std::string dbuser;
    std::string dbname;
} conf_ctx_t;

conf_ctx_t g_conf_ctx;

inline void conf_ctx_init(conf_ctx_t* ctx)
{
    ctx->parser = new IniParser;
    ctx->loglevel = LOG_LEVEL_DEBUG;
    ctx->worker_processes = 0;
    ctx->worker_threads = 0;
    ctx->port = 0;
    ctx->dbname = "hjems";
    ctx->dbuser = "root";
    ctx->dbserver = "tcp://127.0.0.1:3306";
}

static void print_version();
static void print_help();

static int  parse_confile(const char* confile);
static void worker_fn(void* userdata);

// short options
static const char options[] = "hvc:ts:dp:";
// long options
static const option_t long_options[] =
{
    {'h', "help",       NO_ARGUMENT},
    {'v', "version",    NO_ARGUMENT},
    {'c', "confile",    REQUIRED_ARGUMENT},
    {'t', "test",       NO_ARGUMENT},
    {'s', "signal",     REQUIRED_ARGUMENT},
    {'d', "daemon",     NO_ARGUMENT},
    {'p', "port",       REQUIRED_ARGUMENT}
};

static const char detail_options[] = R"(
  -h|--help                 Print this information
  -v|--version              Print version
  -c|--confile <confile>    Set configure file, default etc/{program}.conf
  -t|--test                 Test configure file and exit
  -s|--signal <signal>      Send <signal> to process,
                            <signal>=[start,stop,restart,status,reload]
  -d|--daemon               Daemonize
  -p|--port <port>          Set listen port
)";

void print_version()
{
    printf("%s version %s\n", g_main_ctx.program_name, K22_VERSION);
}

void print_help()
{
    printf("Usage: %s [%s]\n", g_main_ctx.program_name, options);
    printf("Options:\n%s\n", detail_options);
}

int parse_confile(const char* confile)
{
#ifdef __linux__
    int ret = g_conf_ctx.parser->LoadFromFile(confile);
    if (ret != 0)
    {
        printf("Load confile [%s] failed: %d\n", confile, ret);
        exit(-40);
    }

    // logfile
    std::string str = g_conf_ctx.parser->GetValue("logfile");
    if (!str.empty())
    {
        strncpy(g_main_ctx.logfile, str.c_str(), sizeof(g_main_ctx.logfile));
    }
    hlog_set_file(g_main_ctx.logfile);
#if 1
    // loglevel
    str = g_conf_ctx.parser->GetValue("loglevel");
    if (!str.empty())
    {
        hlog_set_level_by_str(str.c_str());
    }
    // log_filesize
    str = g_conf_ctx.parser->GetValue("log_filesize");
    if (!str.empty())
    {
        hlog_set_max_filesize_by_str(str.c_str());
    }
    // log_remain_days
    str = g_conf_ctx.parser->GetValue("log_remain_days");
    if (!str.empty())
    {
        hlog_set_remain_days(atoi(str.c_str()));
    }
    // log_fsync
    str = g_conf_ctx.parser->GetValue("log_fsync");
    if (!str.empty())
    {
        logger_enable_fsync(hlog, hv_getboolean(str.c_str()));
    }
#endif
#endif
    // first log here
    hlogi("=========--- Welcome to the Earth ---=========");
    hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION);
    hlog_fsync();

#if 1
    g_conf_ctx.worker_processes = 1;
    g_conf_ctx.worker_threads = 1;
    g_conf_ctx.host = "0.0.0.0";
    g_conf_ctx.port = 44242;
    g_conf_ctx.dbname = "hjems";
    g_conf_ctx.dbuser = "root";
    g_conf_ctx.dbserver = "127.0.0.1";
#endif   
    // worker_processes
    int worker_processes = 0;


#if __linux__
#ifdef DEBUG
    // Disable multi-processes mode for debugging
    worker_processes = 0;
#else
    str = g_conf_ctx.parser->GetValue("worker_processes");
    if (str.size() != 0)
    {
        if (strcmp(str.c_str(), "auto") == 0)
        {
            worker_processes = get_ncpu();
            hlogd("worker_processes=ncpu=%d", worker_processes);
        }
        else
        {
            worker_processes = atoi(str.c_str());
        }
    }
#endif

    g_conf_ctx.worker_processes = LIMIT(0, worker_processes, MAXNUM_WORKER_PROCESSES);

    // worker_threads
    int worker_threads = 0;
    str = g_conf_ctx.parser->GetValue("worker_threads");
    if (str.size() != 0)
    {
        if (strcmp(str.c_str(), "auto") == 0)
        {
            worker_threads = get_ncpu();
            hlogd("worker_threads=ncpu=%d", worker_threads);
        }
        else {
            worker_threads = atoi(str.c_str());
        }
    }

    g_conf_ctx.worker_threads = LIMIT(0, worker_threads, 64);

    //host
    str = g_conf_ctx.parser->GetValue("host");
    if (str.size() != 0)
    {
        g_conf_ctx.host = str;
    }
    else
    {
        g_conf_ctx.host = "0.0.0.0";
    }

    // port
    int port = 0;
    const char* szPort = get_arg("p");
    if (szPort)
    {
        port = atoi(szPort);
    }

    if (port == 0)
    {
        port = g_conf_ctx.parser->Get<int>("port");
    }

    if (port == 0)
    {
        printf("Please config listen port!\n");
        exit(-10);
    }

    g_conf_ctx.port = port;

    str = g_conf_ctx.parser->GetValue("database");
    if (str.size() != 0)
    {
        g_conf_ctx.dbname = str;
    }
    else
    {
        g_conf_ctx.dbname = "hjems";
    }

    hlogi("database = ('%s')", g_conf_ctx.dbname.c_str());

	str = g_conf_ctx.parser->GetValue("dbuser");
	if (str.size() != 0)
	{
		g_conf_ctx.dbuser = str;
	}
	else
	{
		g_conf_ctx.dbuser = "root";
	}

	hlogi("dbuser = ('%s')", g_conf_ctx.dbuser.c_str());

	str = g_conf_ctx.parser->GetValue("dbserver");
	if (str.size() != 0)
	{
		g_conf_ctx.dbserver = str;
	}
	else
	{
		g_conf_ctx.dbserver = "tcp://127.0.0.1:3306";
	}

	hlogi("dbserver = ('%s')", g_conf_ctx.dbserver.c_str());

    hlogi("parse_confile('%s') OK", confile);
#endif
    return 0;
}

//////1///////////////////////////////
#if 0
#define LOCKFILE "/var/lock/datahub.lock"

int lockfile(int fd)
{
    struct flock fl;

    fl.l_type = F_WRLCK;
    fl.l_start = 0;
    fl.l_whence = SEEK_SET;
    fl.l_len = 0;

    return fcntl(fd, F_SETLK, &fl);
}

int already_running(void)
{
    int fd;
    char buf[16];

    fd = open(LOCKFILE, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);

    if (fd < 0)
    {
        fprintf(stderr, "FAILED TO OPEN FILE" LOCKFILE);
        exit(1);
    }

    if (lockfile(fd) < 0)
    {
        if (EACCES == errno || EAGAIN == errno)
        {
            close(fd);
            return 1;
        }

        fprintf(stderr, "FAILED TO LOCK FILE" LOCKFILE);
        exit(1);
    }

    ftruncate(fd, 0);

    sprintf(buf, "%ld", (long)getpid());
    write(fd, buf, strlen(buf) + 1);
    return 0;
}
#endif
/// end of checking only one instance
/// ////////////////////////////////////////////////
#ifdef _WIN32
int main(int argc, char** argv)
{
	// g_main_ctx
	main_ctx_init(argc, argv);
    conf_ctx_init(&g_conf_ctx);

	// first log here
    hlog_set_level_by_str("DEBUG");
	hlogi("=========--- Welcome to the Earth ---=========");
	hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION);
	hlog_fsync();

#if 1
	g_conf_ctx.worker_processes = 1;
	g_conf_ctx.worker_threads = 1;
	g_conf_ctx.host = "0.0.0.0";
	g_conf_ctx.port = 44242;
	g_conf_ctx.dbname = "hjems";
	g_conf_ctx.dbuser = "root";
	g_conf_ctx.dbserver = "127.0.0.1";
#endif   

#if TEST_UNPACK
	memset(&unpack_setting, 0, sizeof(unpack_setting_t));
	unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
	unpack_setting.mode = UNPACK_BY_DELIMITER;
	unpack_setting.delimiter[0] = 0xEE;
	unpack_setting.delimiter[1] = 0xFF;
	unpack_setting.delimiter[2] = 0xEE;
	unpack_setting.delimiter[3] = 0xFF;
	unpack_setting.delimiter_bytes = 4;
#endif

	master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads);

	hlogi("=========--- I'll be back! ---=========");

	return 0;
}

#else
int main(int argc, char** argv)
{
#if 0
    if (already_running())
    {
        printf("PROGRAM ALREADY RUNNING...\n");
        exit(0);
    }
#endif
    // g_main_ctx
    main_ctx_init(argc, argv);
    if (argc == 1)
    {
        print_help();
        exit(10);
    }
    // int ret = parse_opt(argc, argv, options);
    int ret = parse_opt_long(argc, argv, long_options, ARRAY_SIZE(long_options));
    if (ret != 0) {
        print_help();
        exit(ret);
    }

    /*
    printf("---------------arg------------------------------\n");
    printf("%s\n", g_main_ctx.cmdline);
    for (int i = 0; i < g_main_ctx.arg_kv_size; ++i) {
        printf("%s\n", g_main_ctx.arg_kv[i]);
    }
    for (int i = 0; i < g_main_ctx.arg_list_size; ++i) {
        printf("%s\n", g_main_ctx.arg_list[i]);
    }
    printf("================================================\n");

    printf("---------------env------------------------------\n");
    for (int i = 0; i < g_main_ctx.envc; ++i) {
        printf("%s\n", g_main_ctx.save_envp[i]);
    }
    printf("================================================\n");
    */

    // help
    if (get_arg("h"))
    {
        print_help();
        exit(0);
    }

    // version
    if (get_arg("v"))
    {
        print_version();
        exit(0);
    }

    // signal
    signal_init(on_reload);
    const char* signal = get_arg("s");
    if( signal )
    {
        signal_handle(signal);
    }

    // g_conf_ctx
    conf_ctx_init(&g_conf_ctx);
    const char* confile = get_arg("c");
    if (confile)
    {
        strncpy(g_main_ctx.confile, confile, sizeof(g_main_ctx.confile));
    }
    parse_confile(g_main_ctx.confile);

    // test
    if (get_arg("t"))
    {
        printf("Test confile [%s] OK!\n", g_main_ctx.confile);
        exit(0);
    }


#ifdef OS_UNIX
    // daemon
    if (get_arg("d"))
    {
        // nochdir, noclose
        int ret = daemon(1, 1);
        if (ret != 0)
        {
            printf("daemon error: %d\n", ret);
            exit(-10);
        }
    }
#endif

    // pidfile
    create_pidfile();

#if TEST_UNPACK
    memset(&unpack_setting, 0, sizeof(unpack_setting_t));
    unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
    unpack_setting.mode = UNPACK_BY_DELIMITER;
    unpack_setting.delimiter[0] = 0xEE;
    unpack_setting.delimiter[1] = 0xFF;
    unpack_setting.delimiter[2] = 0xEE;
    unpack_setting.delimiter[3] = 0xFF;
    unpack_setting.delimiter_bytes = 4;
#endif

    master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads);

    hlogi("=========--- I'll be back! ---=========");

    return 0;
}
#endif

void worker_fn(void* userdata)
{
    conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata);
    long port = ptrCtx->port;

    //initialize database connection
    bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,3306,ptrCtx->dbuser,"Hj57471000",ptrCtx->dbname);
    if (!dbok)
    {
        hloge("failed to open database, exit now...");
        return;
    }

    hlogi("database connection created!");

    hloop_t* loop = hloop_new(0);
    const char* host = ptrCtx->host.c_str();
    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);

    if (listenio == NULL)
    {
        hloge("worker process finished");
        return;
    }

    hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio));

    hloop_run(loop);

    hlogi("database connection close!");
    OpDatabase::getInstance()->CloseDatabase();

    hloop_free(&loop);
}


/// 各回调函数定义

static void on_reload(void* userdata)
{
    hlogi("reload confile [%s]", g_main_ctx.confile);
    parse_confile(g_main_ctx.confile);
}

static void on_close(hio_t* io)
{
    hlogi("on_close fd=%d error=%d", hio_fd(io), hio_error(io));
}

//接入回调
static void on_accept(hio_t* io)
{
    hlogi("on_accept connfd=%d", hio_fd(io));

    char localaddrstr[SOCKADDR_STRLEN] = { 0 };
    char peeraddrstr[SOCKADDR_STRLEN] = { 0 };
    hlogi("accept connfd=%d [%s] <=== [%s]", hio_fd(io),
        SOCKADDR_STR(hio_localaddr(io), localaddrstr),
        SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);

#if TEST_UNPACK
    hio_set_unpack(io, &unpack_setting);
#endif

    hio_read_start(io);
}

//接收到数据
static void on_recv(hio_t* io, void* buf, int readbytes)
{
    EventHandler::onRecvHandler(io, buf, readbytes);
}