emsApplication/applications/ems_datahubs/main.cpp

690 lines
18 KiB
C++

#include <string>
#include <fcntl.h>
#include <unistd.h>
#include <fstream>
#include <iomanip>
#include "kdefine.h"
#include "frame_define.h"
#include "kutilities.h"
#include "mqtt_msg.h"
#include "opmysql.h"
#include "iconv-utils.h"
#include "openjson.h"
#include <hv/hv.h>
#include <hv/hmain.h>
#include <hv/iniparser.h>
#include <hv/hloop.h>
#include <hv/hsocket.h>
#include <hv/hssl.h>
#ifndef TEST_UNPACK
#define TEST_UNPACK 1
#endif
#if TEST_UNPACK
static unpack_setting_t unpack_setting;
#endif
//连接关闭回调
static void on_close(hio_t* io);
//接入回调
static void on_accept(hio_t* io);
//接收到数据
static void on_recv(hio_t* io, void* buf, int readbytes);
//重载参数回调
static void on_reload(void* userdata);
/*
* @build: make
* @usage: datahubs -h
* datahubs -v
*
* datahubs -c datahubs.conf -d
* ps aux | grep datahubs
*
* datahubs -s stop
* ps aux | grep datahubs
*
*/
typedef struct conf_ctx_s
{
IniParser* parser;
int loglevel;
int worker_processes;
int worker_threads;
std::string host;
int port;
std::string dbserver;
std::string dbuser;
std::string dbname;
} conf_ctx_t;
conf_ctx_t g_conf_ctx;
inline void conf_ctx_init(conf_ctx_t* ctx)
{
ctx->parser = new IniParser;
ctx->loglevel = LOG_LEVEL_DEBUG;
ctx->worker_processes = 0;
ctx->worker_threads = 0;
ctx->port = 0;
ctx->dbname = "hjems";
ctx->dbuser = "root";
ctx->dbserver = "tcp://127.0.0.1:3306";
}
static void print_version();
static void print_help();
static int parse_confile(const char* confile);
static void worker_fn(void* userdata);
// short options
static const char options[] = "hvc:ts:dp:";
// long options
static const option_t long_options[] =
{
{'h', "help", NO_ARGUMENT},
{'v', "version", NO_ARGUMENT},
{'c', "confile", REQUIRED_ARGUMENT},
{'t', "test", NO_ARGUMENT},
{'s', "signal", REQUIRED_ARGUMENT},
{'d', "daemon", NO_ARGUMENT},
{'p', "port", REQUIRED_ARGUMENT}
};
static const char detail_options[] = R"(
-h|--help Print this information
-v|--version Print version
-c|--confile <confile> Set configure file, default etc/{program}.conf
-t|--test Test configure file and exit
-s|--signal <signal> Send <signal> to process,
<signal>=[start,stop,restart,status,reload]
-d|--daemon Daemonize
-p|--port <port> Set listen port
)";
void print_version()
{
printf("%s version %s\n", g_main_ctx.program_name, K22_VERSION);
}
void print_help()
{
printf("Usage: %s [%s]\n", g_main_ctx.program_name, options);
printf("Options:\n%s\n", detail_options);
}
int parse_confile(const char* confile)
{
int ret = g_conf_ctx.parser->LoadFromFile(confile);
if (ret != 0)
{
printf("Load confile [%s] failed: %d\n", confile, ret);
exit(-40);
}
// logfile
std::string str = g_conf_ctx.parser->GetValue("logfile");
if (!str.empty())
{
strncpy(g_main_ctx.logfile, str.c_str(), sizeof(g_main_ctx.logfile));
}
hlog_set_file(g_main_ctx.logfile);
// loglevel
str = g_conf_ctx.parser->GetValue("loglevel");
if (!str.empty())
{
hlog_set_level_by_str(str.c_str());
}
// log_filesize
str = g_conf_ctx.parser->GetValue("log_filesize");
if (!str.empty())
{
hlog_set_max_filesize_by_str(str.c_str());
}
// log_remain_days
str = g_conf_ctx.parser->GetValue("log_remain_days");
if (!str.empty())
{
hlog_set_remain_days(atoi(str.c_str()));
}
// log_fsync
str = g_conf_ctx.parser->GetValue("log_fsync");
if (!str.empty())
{
logger_enable_fsync(hlog, hv_getboolean(str.c_str()));
}
// first log here
// first log here
hlogi("=========--- Welcome to the Earth ---=========");
hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION);
hlog_fsync();
// worker_processes
int worker_processes = 0;
#ifdef DEBUG
// Disable multi-processes mode for debugging
worker_processes = 0;
#else
str = g_conf_ctx.parser->GetValue("worker_processes");
if (str.size() != 0)
{
if (strcmp(str.c_str(), "auto") == 0)
{
worker_processes = get_ncpu();
hlogd("worker_processes=ncpu=%d", worker_processes);
}
else
{
worker_processes = atoi(str.c_str());
}
}
#endif
g_conf_ctx.worker_processes = LIMIT(0, worker_processes, MAXNUM_WORKER_PROCESSES);
// worker_threads
int worker_threads = 0;
str = g_conf_ctx.parser->GetValue("worker_threads");
if (str.size() != 0)
{
if (strcmp(str.c_str(), "auto") == 0)
{
worker_threads = get_ncpu();
hlogd("worker_threads=ncpu=%d", worker_threads);
}
else {
worker_threads = atoi(str.c_str());
}
}
g_conf_ctx.worker_threads = LIMIT(0, worker_threads, 64);
//host
str = g_conf_ctx.parser->GetValue("host");
if (str.size() != 0)
{
g_conf_ctx.host = str;
}
else
{
g_conf_ctx.host = "0.0.0.0";
}
// port
int port = 0;
const char* szPort = get_arg("p");
if (szPort)
{
port = atoi(szPort);
}
if (port == 0)
{
port = g_conf_ctx.parser->Get<int>("port");
}
if (port == 0)
{
printf("Please config listen port!\n");
exit(-10);
}
g_conf_ctx.port = port;
str = g_conf_ctx.parser->GetValue("database");
if (str.size() != 0)
{
g_conf_ctx.dbname = str;
}
else
{
g_conf_ctx.dbname = "hjems";
}
hlogi("database = ('%s')", g_conf_ctx.dbname.c_str());
str = g_conf_ctx.parser->GetValue("dbuser");
if (str.size() != 0)
{
g_conf_ctx.dbuser = str;
}
else
{
g_conf_ctx.dbuser = "root";
}
hlogi("dbuser = ('%s')", g_conf_ctx.dbuser.c_str());
str = g_conf_ctx.parser->GetValue("dbserver");
if (str.size() != 0)
{
g_conf_ctx.dbserver = str;
}
else
{
g_conf_ctx.dbserver = "tcp://127.0.0.1:3306";
}
hlogi("dbserver = ('%s')", g_conf_ctx.dbserver.c_str());
hlogi("parse_confile('%s') OK", confile);
return 0;
}
//////1///////////////////////////////
#if 0
#define LOCKFILE "/var/lock/datahub.lock"
int lockfile(int fd)
{
struct flock fl;
fl.l_type = F_WRLCK;
fl.l_start = 0;
fl.l_whence = SEEK_SET;
fl.l_len = 0;
return fcntl(fd, F_SETLK, &fl);
}
int already_running(void)
{
int fd;
char buf[16];
fd = open(LOCKFILE, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fd < 0)
{
fprintf(stderr, "FAILED TO OPEN FILE" LOCKFILE);
exit(1);
}
if (lockfile(fd) < 0)
{
if (EACCES == errno || EAGAIN == errno)
{
close(fd);
return 1;
}
fprintf(stderr, "FAILED TO LOCK FILE" LOCKFILE);
exit(1);
}
ftruncate(fd, 0);
sprintf(buf, "%ld", (long)getpid());
write(fd, buf, strlen(buf) + 1);
return 0;
}
#endif
/// end of checking only one instance
/// ////////////////////////////////////////////////
int main(int argc, char** argv)
{
#if 0
if (already_running())
{
printf("PROGRAM ALREADY RUNNING...\n");
exit(0);
}
#endif
// g_main_ctx
main_ctx_init(argc, argv);
if (argc == 1)
{
print_help();
exit(10);
}
// int ret = parse_opt(argc, argv, options);
int ret = parse_opt_long(argc, argv, long_options, ARRAY_SIZE(long_options));
if (ret != 0) {
print_help();
exit(ret);
}
/*
printf("---------------arg------------------------------\n");
printf("%s\n", g_main_ctx.cmdline);
for (int i = 0; i < g_main_ctx.arg_kv_size; ++i) {
printf("%s\n", g_main_ctx.arg_kv[i]);
}
for (int i = 0; i < g_main_ctx.arg_list_size; ++i) {
printf("%s\n", g_main_ctx.arg_list[i]);
}
printf("================================================\n");
printf("---------------env------------------------------\n");
for (int i = 0; i < g_main_ctx.envc; ++i) {
printf("%s\n", g_main_ctx.save_envp[i]);
}
printf("================================================\n");
*/
// help
if (get_arg("h"))
{
print_help();
exit(0);
}
// version
if (get_arg("v"))
{
print_version();
exit(0);
}
// signal
signal_init(on_reload);
const char* signal = get_arg("s");
if( signal )
{
signal_handle(signal);
}
// g_conf_ctx
conf_ctx_init(&g_conf_ctx);
const char* confile = get_arg("c");
if (confile)
{
strncpy(g_main_ctx.confile, confile, sizeof(g_main_ctx.confile));
}
parse_confile(g_main_ctx.confile);
// test
if (get_arg("t"))
{
printf("Test confile [%s] OK!\n", g_main_ctx.confile);
exit(0);
}
#ifdef OS_UNIX
// daemon
if (get_arg("d"))
{
// nochdir, noclose
int ret = daemon(1, 1);
if (ret != 0)
{
printf("daemon error: %d\n", ret);
exit(-10);
}
}
#endif
// pidfile
create_pidfile();
#if TEST_UNPACK
memset(&unpack_setting, 0, sizeof(unpack_setting_t));
unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
unpack_setting.mode = UNPACK_BY_DELIMITER;
unpack_setting.delimiter[0] = 0xEE;
unpack_setting.delimiter[1] = 0xFF;
unpack_setting.delimiter[2] = 0xEE;
unpack_setting.delimiter[3] = 0xFF;
unpack_setting.delimiter_bytes = 4;
#endif
master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads);
hlogi("=========--- I'll be back! ---=========");
return 0;
}
void worker_fn(void* userdata)
{
conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata);
long port = ptrCtx->port;
//initialize database connection
bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname);
if (!dbok)
{
hloge("failed to open database, exit now...");
return;
}
hlogi("database connection created!");
hloop_t* loop = hloop_new(0);
const char* host = ptrCtx->host.c_str();
hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
if (listenio == NULL)
{
hloge("worker process finished");
return;
}
hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio));
hloop_run(loop);
hlogi("database connection close!");
OpDatabase::getInstance()->CloseDatabase();
hloop_free(&loop);
}
/// 各回调函数定义
static void on_reload(void* userdata)
{
hlogi("reload confile [%s]", g_main_ctx.confile);
parse_confile(g_main_ctx.confile);
}
static void on_close(hio_t* io)
{
hlogi("on_close fd=%d error=%d", hio_fd(io), hio_error(io));
}
//接入回调
static void on_accept(hio_t* io)
{
hlogi("on_accept connfd=%d", hio_fd(io));
char localaddrstr[SOCKADDR_STRLEN] = { 0 };
char peeraddrstr[SOCKADDR_STRLEN] = { 0 };
hlogi("accept connfd=%d [%s] <=== [%s]", hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
#if TEST_UNPACK
hio_set_unpack(io, &unpack_setting);
#endif
hio_read_start(io);
}
//接收到数据
static void on_recv(hio_t* io, void* buf, int readbytes)
{
USING_NAMESPACE_HJ;
char localaddrstr[SOCKADDR_STRLEN] = { 0 };
char peeraddrstr[SOCKADDR_STRLEN] = { 0 };
hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes,
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
MessageFrame respFrame;
if( readbytes > 0xFFFF - 1 )
{
hloge("too large data buffer to process: %d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_BUF_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
MessageFrame *pReadFrame = (MessageFrame*)buf;
if( pReadFrame->frame_len > 0xFFFF - 1 )
{
hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF);
respFrame.setErrorFrame(ERR_INVALID_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
OpenJson json;
CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //判断是否是utf-8
hlogi("<=== recieve buffer code is [%d]", buf_code);
std::string msg((char*)pReadFrame->frame_content, pReadFrame->frame_len);
if( buf_code == CODING::GBK
|| buf_code == CODING::UNKOWN )
{
std::string str_result;
//转换为UTF8
if( !GBKToUTF8(msg, str_result) )
{
hloge("Failed to transfer code from GBK to UTF-8");
respFrame.setErrorFrame(ERR_INVALID_UTF8);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("Successfuly transfer code from GBK to UTF-8!");
msg = str_result;
}
#ifdef _DEBUG
hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //这里还是好的
#endif
unsigned int len = msg.length();
char* pTmp = new char[len];
memcpy(pTmp, msg.c_str(), len);
std::shared_ptr<char> ptr; //放个智能指针省得忘记删除
ptr.reset(pTmp);
//hlogi("<=== decode OK, msg=[%s]", msg.c_str());
#if 0
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str());
#endif
//hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str());
if( !json.decode(msg) )
{
//delete[] pTmp;
hloge("Failed to decode json string pack , length=%d", readbytes);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hio_write(io, (void*)&respFrame, sizeof respFrame);
std::string fsucode = json["FsuCode"].s();
std::string msg_type = json["type"].s();
std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s();
if( fsucode.length() == 0 )
{
//delete[] pTmp;
hlogw("!!empty fsucode recieved!");
return;
}
hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str());
#ifdef _DEBUG
hlogd("<<<<Check Mem after decode here>>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str());
#endif
std::string out_compress;
int zip_ret = 0;
if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK )
{
hloge("Failed to compress source data, zip return value %d", zip_ret);
return;
}
hlogd("<<<<Compress result: string size from original [%d] to [%d]", len, out_compress.size());
#ifdef _DEBUG
hlogd("<<<<Compress string here>>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str());
#endif
//std::string msg2(pTmp, len);
if( msg_type == "gateway-data"
|| msg_type == "gateway-alarmdata"
|| msg_type == "gateway-writedata"
|| msg_type == "gateway-readdata"
|| msg_type == "web-write"
|| msg_type == "web-alarm" )
{
auto& IdCodeContent = json["IdCodeContent"];
if( IdCodeContent.size() <= 0 )
{
//delete[] pTmp;
hloge("invalid IdCodeContent's size: %d", IdCodeContent.size());
return;
}
auto& pNode = IdCodeContent[0]; //这是只解析第一个节点
std::string oid = pNode["OID"].s();
//OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
if( msg_type == "web-read" )
{
auto& IdCodeContent = json["IdCodes"];
if( IdCodeContent.size() <= 0 )
{
hloge("invalid IdCodes's size: %d", IdCodeContent.size());
//delete[] pTmp;
return;
}
auto& pNode = IdCodeContent[0]; //这是只解析第一个节点
std::string oid = pNode.s();
//OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
//delete[] pTmp;
}