#include #include #include #include #include #include "kdefine.h" #include "frame_define.h" #include "kutilities.h" #include "mqtt_msg.h" #include "opmysql.h" #include "iconv-utils.h" #include "openjson.h" #include #include #include #include #include #include #ifndef TEST_UNPACK #define TEST_UNPACK 1 #endif #if TEST_UNPACK static unpack_setting_t unpack_setting; #endif //连接关闭回调 static void on_close(hio_t* io); //接入回调 static void on_accept(hio_t* io); //接收到数据 static void on_recv(hio_t* io, void* buf, int readbytes); //重载参数回调 static void on_reload(void* userdata); /* * @build: make * @usage: datahubs -h * datahubs -v * * datahubs -c datahubs.conf -d * ps aux | grep datahubs * * datahubs -s stop * ps aux | grep datahubs * */ typedef struct conf_ctx_s { IniParser* parser; int loglevel; int worker_processes; int worker_threads; std::string host; int port; std::string dbserver; std::string dbuser; std::string dbname; } conf_ctx_t; conf_ctx_t g_conf_ctx; inline void conf_ctx_init(conf_ctx_t* ctx) { ctx->parser = new IniParser; ctx->loglevel = LOG_LEVEL_DEBUG; ctx->worker_processes = 0; ctx->worker_threads = 0; ctx->port = 0; ctx->dbname = "hjems"; ctx->dbuser = "root"; ctx->dbserver = "tcp://127.0.0.1:3306"; } static void print_version(); static void print_help(); static int parse_confile(const char* confile); static void worker_fn(void* userdata); // short options static const char options[] = "hvc:ts:dp:"; // long options static const option_t long_options[] = { {'h', "help", NO_ARGUMENT}, {'v', "version", NO_ARGUMENT}, {'c', "confile", REQUIRED_ARGUMENT}, {'t', "test", NO_ARGUMENT}, {'s', "signal", REQUIRED_ARGUMENT}, {'d', "daemon", NO_ARGUMENT}, {'p', "port", REQUIRED_ARGUMENT} }; static const char detail_options[] = R"( -h|--help Print this information -v|--version Print version -c|--confile Set configure file, default etc/{program}.conf -t|--test Test configure file and exit -s|--signal Send to process, =[start,stop,restart,status,reload] -d|--daemon Daemonize -p|--port Set listen port )"; void print_version() { printf("%s version %s\n", g_main_ctx.program_name, K22_VERSION); } void print_help() { printf("Usage: %s [%s]\n", g_main_ctx.program_name, options); printf("Options:\n%s\n", detail_options); } int parse_confile(const char* confile) { int ret = g_conf_ctx.parser->LoadFromFile(confile); if (ret != 0) { printf("Load confile [%s] failed: %d\n", confile, ret); exit(-40); } // logfile std::string str = g_conf_ctx.parser->GetValue("logfile"); if (!str.empty()) { strncpy(g_main_ctx.logfile, str.c_str(), sizeof(g_main_ctx.logfile)); } hlog_set_file(g_main_ctx.logfile); // loglevel str = g_conf_ctx.parser->GetValue("loglevel"); if (!str.empty()) { hlog_set_level_by_str(str.c_str()); } // log_filesize str = g_conf_ctx.parser->GetValue("log_filesize"); if (!str.empty()) { hlog_set_max_filesize_by_str(str.c_str()); } // log_remain_days str = g_conf_ctx.parser->GetValue("log_remain_days"); if (!str.empty()) { hlog_set_remain_days(atoi(str.c_str())); } // log_fsync str = g_conf_ctx.parser->GetValue("log_fsync"); if (!str.empty()) { logger_enable_fsync(hlog, hv_getboolean(str.c_str())); } // first log here // first log here hlogi("=========--- Welcome to the Earth ---========="); hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION); hlog_fsync(); // worker_processes int worker_processes = 0; #ifdef DEBUG // Disable multi-processes mode for debugging worker_processes = 0; #else str = g_conf_ctx.parser->GetValue("worker_processes"); if (str.size() != 0) { if (strcmp(str.c_str(), "auto") == 0) { worker_processes = get_ncpu(); hlogd("worker_processes=ncpu=%d", worker_processes); } else { worker_processes = atoi(str.c_str()); } } #endif g_conf_ctx.worker_processes = LIMIT(0, worker_processes, MAXNUM_WORKER_PROCESSES); // worker_threads int worker_threads = 0; str = g_conf_ctx.parser->GetValue("worker_threads"); if (str.size() != 0) { if (strcmp(str.c_str(), "auto") == 0) { worker_threads = get_ncpu(); hlogd("worker_threads=ncpu=%d", worker_threads); } else { worker_threads = atoi(str.c_str()); } } g_conf_ctx.worker_threads = LIMIT(0, worker_threads, 64); //host str = g_conf_ctx.parser->GetValue("host"); if (str.size() != 0) { g_conf_ctx.host = str; } else { g_conf_ctx.host = "0.0.0.0"; } // port int port = 0; const char* szPort = get_arg("p"); if (szPort) { port = atoi(szPort); } if (port == 0) { port = g_conf_ctx.parser->Get("port"); } if (port == 0) { printf("Please config listen port!\n"); exit(-10); } g_conf_ctx.port = port; str = g_conf_ctx.parser->GetValue("database"); if (str.size() != 0) { g_conf_ctx.dbname = str; } else { g_conf_ctx.dbname = "hjems"; } hlogi("database = ('%s')", g_conf_ctx.dbname.c_str()); str = g_conf_ctx.parser->GetValue("dbuser"); if (str.size() != 0) { g_conf_ctx.dbuser = str; } else { g_conf_ctx.dbuser = "root"; } hlogi("dbuser = ('%s')", g_conf_ctx.dbuser.c_str()); str = g_conf_ctx.parser->GetValue("dbserver"); if (str.size() != 0) { g_conf_ctx.dbserver = str; } else { g_conf_ctx.dbserver = "tcp://127.0.0.1:3306"; } hlogi("dbserver = ('%s')", g_conf_ctx.dbserver.c_str()); hlogi("parse_confile('%s') OK", confile); return 0; } //////1/////////////////////////////// #if 0 #define LOCKFILE "/var/lock/datahub.lock" int lockfile(int fd) { struct flock fl; fl.l_type = F_WRLCK; fl.l_start = 0; fl.l_whence = SEEK_SET; fl.l_len = 0; return fcntl(fd, F_SETLK, &fl); } int already_running(void) { int fd; char buf[16]; fd = open(LOCKFILE, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (fd < 0) { fprintf(stderr, "FAILED TO OPEN FILE" LOCKFILE); exit(1); } if (lockfile(fd) < 0) { if (EACCES == errno || EAGAIN == errno) { close(fd); return 1; } fprintf(stderr, "FAILED TO LOCK FILE" LOCKFILE); exit(1); } ftruncate(fd, 0); sprintf(buf, "%ld", (long)getpid()); write(fd, buf, strlen(buf) + 1); return 0; } #endif /// end of checking only one instance /// //////////////////////////////////////////////// int main(int argc, char** argv) { #if 0 if (already_running()) { printf("PROGRAM ALREADY RUNNING...\n"); exit(0); } #endif // g_main_ctx main_ctx_init(argc, argv); if (argc == 1) { print_help(); exit(10); } // int ret = parse_opt(argc, argv, options); int ret = parse_opt_long(argc, argv, long_options, ARRAY_SIZE(long_options)); if (ret != 0) { print_help(); exit(ret); } /* printf("---------------arg------------------------------\n"); printf("%s\n", g_main_ctx.cmdline); for (int i = 0; i < g_main_ctx.arg_kv_size; ++i) { printf("%s\n", g_main_ctx.arg_kv[i]); } for (int i = 0; i < g_main_ctx.arg_list_size; ++i) { printf("%s\n", g_main_ctx.arg_list[i]); } printf("================================================\n"); printf("---------------env------------------------------\n"); for (int i = 0; i < g_main_ctx.envc; ++i) { printf("%s\n", g_main_ctx.save_envp[i]); } printf("================================================\n"); */ // help if (get_arg("h")) { print_help(); exit(0); } // version if (get_arg("v")) { print_version(); exit(0); } // signal signal_init(on_reload); const char* signal = get_arg("s"); if( signal ) { signal_handle(signal); } // g_conf_ctx conf_ctx_init(&g_conf_ctx); const char* confile = get_arg("c"); if (confile) { strncpy(g_main_ctx.confile, confile, sizeof(g_main_ctx.confile)); } parse_confile(g_main_ctx.confile); // test if (get_arg("t")) { printf("Test confile [%s] OK!\n", g_main_ctx.confile); exit(0); } #ifdef OS_UNIX // daemon if (get_arg("d")) { // nochdir, noclose int ret = daemon(1, 1); if (ret != 0) { printf("daemon error: %d\n", ret); exit(-10); } } #endif // pidfile create_pidfile(); #if TEST_UNPACK memset(&unpack_setting, 0, sizeof(unpack_setting_t)); unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH; unpack_setting.mode = UNPACK_BY_DELIMITER; unpack_setting.delimiter[0] = 0xEE; unpack_setting.delimiter[1] = 0xFF; unpack_setting.delimiter[2] = 0xEE; unpack_setting.delimiter[3] = 0xFF; unpack_setting.delimiter_bytes = 4; #endif master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads); hlogi("=========--- I'll be back! ---========="); return 0; } void worker_fn(void* userdata) { conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata); long port = ptrCtx->port; //initialize database connection bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname); if (!dbok) { hloge("failed to open database, exit now..."); return; } hlogi("database connection created!"); hloop_t* loop = hloop_new(0); const char* host = ptrCtx->host.c_str(); hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept); if (listenio == NULL) { hloge("worker process finished"); return; } hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio)); hloop_run(loop); hlogi("database connection close!"); OpDatabase::getInstance()->CloseDatabase(); hloop_free(&loop); } /// 各回调函数定义 static void on_reload(void* userdata) { hlogi("reload confile [%s]", g_main_ctx.confile); parse_confile(g_main_ctx.confile); } static void on_close(hio_t* io) { hlogi("on_close fd=%d error=%d", hio_fd(io), hio_error(io)); } //接入回调 static void on_accept(hio_t* io) { hlogi("on_accept connfd=%d", hio_fd(io)); char localaddrstr[SOCKADDR_STRLEN] = { 0 }; char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; hlogi("accept connfd=%d [%s] <=== [%s]", hio_fd(io), SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); hio_setcb_close(io, on_close); hio_setcb_read(io, on_recv); #if TEST_UNPACK hio_set_unpack(io, &unpack_setting); #endif hio_read_start(io); } //接收到数据 static void on_recv(hio_t* io, void* buf, int readbytes) { USING_NAMESPACE_HJ; char localaddrstr[SOCKADDR_STRLEN] = { 0 }; char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes, SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); MessageFrame respFrame; if( readbytes > 0xFFFF - 1 ) { hloge("too large data buffer to process: %d", readbytes); respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } MessageFrame *pReadFrame = (MessageFrame*)buf; if( pReadFrame->frame_len > 0xFFFF - 1 ) { hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); respFrame.setErrorFrame(ERR_INVALID_LEN); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } OpenJson json; CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //判断是否是utf-8 hlogi("<=== recieve buffer code is [%d]", buf_code); std::string msg((char*)pReadFrame->frame_content, pReadFrame->frame_len); if( buf_code == CODING::GBK || buf_code == CODING::UNKOWN ) { std::string str_result; //转换为UTF8 if( !GBKToUTF8(msg, str_result) ) { hloge("Failed to transfer code from GBK to UTF-8"); respFrame.setErrorFrame(ERR_INVALID_UTF8); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } hlogi("Successfuly transfer code from GBK to UTF-8!"); msg = str_result; } #ifdef _DEBUG hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //这里还是好的 #endif unsigned int len = msg.length(); char* pTmp = new char[len]; memcpy(pTmp, msg.c_str(), len); std::shared_ptr ptr; //放个智能指针省得忘记删除 ptr.reset(pTmp); //hlogi("<=== decode OK, msg=[%s]", msg.c_str()); #if 0 hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str()); #endif //hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str()); if( !json.decode(msg) ) { //delete[] pTmp; hloge("Failed to decode json string pack , length=%d", readbytes); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } hio_write(io, (void*)&respFrame, sizeof respFrame); std::string fsucode = json["FsuCode"].s(); std::string msg_type = json["type"].s(); std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s(); if( fsucode.length() == 0 ) { //delete[] pTmp; hlogw("!!empty fsucode recieved!"); return; } hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str()); #ifdef _DEBUG hlogd("<<<>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str()); #endif std::string out_compress; int zip_ret = 0; if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK ) { hloge("Failed to compress source data, zip return value %d", zip_ret); return; } hlogd("<<<>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str()); #endif //std::string msg2(pTmp, len); if( msg_type == "gateway-data" || msg_type == "gateway-alarmdata" || msg_type == "gateway-writedata" || msg_type == "gateway-readdata" || msg_type == "web-write" || msg_type == "web-alarm" ) { auto& IdCodeContent = json["IdCodeContent"]; if( IdCodeContent.size() <= 0 ) { //delete[] pTmp; hloge("invalid IdCodeContent's size: %d", IdCodeContent.size()); return; } auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode["OID"].s(); //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } if( msg_type == "web-read" ) { auto& IdCodeContent = json["IdCodes"]; if( IdCodeContent.size() <= 0 ) { hloge("invalid IdCodes's size: %d", IdCodeContent.size()); //delete[] pTmp; return; } auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode.s(); //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } //delete[] pTmp; }