diff --git a/applications/ems_datahubs/eventhandler.cpp b/applications/ems_datahubs/eventhandler.cpp new file mode 100644 index 0000000..0233ef6 --- /dev/null +++ b/applications/ems_datahubs/eventhandler.cpp @@ -0,0 +1,203 @@ +#include "eventhandler.h" + +#include +#include +#include +#include +#include +#include + +#include "kdefine.h" +#include "frame_define.h" +#include "openjson.h" +#include "iconv-utils.h" +#include "kutilities.h" + +EventHandler::EventHandler() +{ + +} + +EventHandler::~EventHandler() +{ + +} + + +void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes) +{ + __USING_NAMESPACE_HJ__; + + char localaddrstr[SOCKADDR_STRLEN] = { 0 }; + char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; + hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes, + SOCKADDR_STR(hio_localaddr(io), localaddrstr), + SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); + + MessageFrame respFrame; + if( readbytes > 0xFFFF - 1 ) + { + hloge("too large data buffer to process: %d", readbytes); + respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); + hio_write(io, (void*)&respFrame, sizeof respFrame); + return; + } + + hlogi("<=== decode OK [\n%s\n]", printHex(buf, readbytes).c_str()); + + MessageFrame* pReadFrame = (MessageFrame*)buf; + hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len); + + if( pReadFrame->frame_len > 0xFFFF - 1 ) + { + hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); + respFrame.setErrorFrame(ERR_INVALID_LEN); + hio_write(io, (void*)&respFrame, sizeof respFrame); + return; + } + + OpenJson json; + + CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //判断是否是utf-8 + + hlogi("<=== recieve buffer code is [%d]", buf_code); + + std::string msg((char*)pReadFrame->frame_content, pReadFrame->frame_len); + + if( buf_code == CODING::GBK + || buf_code == CODING::UNKOWN ) + { + std::string str_result; + //转换为UTF8 + if( !GBKToUTF8(msg, str_result) ) + { + hloge("Failed to transfer code from GBK to UTF-8"); + respFrame.setErrorFrame(ERR_INVALID_UTF8); + hio_write(io, (void*)&respFrame, sizeof respFrame); + return; + } + + hlogi("Successfuly transfer code from GBK to UTF-8!"); + msg = str_result; + } + +#ifdef _DEBUG + hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //这里还是好的 +#endif + + unsigned int len = msg.length(); + char* pTmp = new char[len]; + memcpy(pTmp, msg.c_str(), len); + + std::shared_ptr ptr; //放个智能指针省得忘记删除 + ptr.reset(pTmp); + + //hlogi("<=== decode OK, msg=[%s]", msg.c_str()); + +#if 0 + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str()); +#endif + + //hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str()); + + if( !json.decode(msg) ) + { + //delete[] pTmp; + hloge("Failed to decode json string pack , length=%d", readbytes); + hio_write(io, (void*)&respFrame, sizeof respFrame); + return; + } + + hio_write(io, (void*)&respFrame, sizeof respFrame); + + std::string fsucode = json["FsuCode"].s(); + std::string msg_type = json["type"].s(); + std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s(); + + if( fsucode.length() == 0 ) + { + //delete[] pTmp; + hlogw("!!empty fsucode recieved!"); + return; + } + + hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str()); + +#ifdef _DEBUG + hlogd("<<<>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str()); +#endif + + std::string out_compress; + + int zip_ret = 0; + if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK ) + { + hloge("Failed to compress source data, zip return value %d", zip_ret); + return; + } + + hlogd("<<<>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str()); +#endif + //std::string msg2(pTmp, len); + + if( msg_type == "gateway-data" + || msg_type == "gateway-alarmdata" + || msg_type == "gateway-writedata" + || msg_type == "gateway-readdata" + || msg_type == "web-write" + || msg_type == "web-alarm" ) + { + auto& IdCodeContent = json["IdCodeContent"]; + if( IdCodeContent.size() <= 0 ) + { + //delete[] pTmp; + hloge("invalid IdCodeContent's size: %d", IdCodeContent.size()); + return; + } + + auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 + std::string oid = pNode["OID"].s(); + + //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + } + + if( msg_type == "web-read" ) + { + auto& IdCodeContent = json["IdCodes"]; + if( IdCodeContent.size() <= 0 ) + { + hloge("invalid IdCodes's size: %d", IdCodeContent.size()); + //delete[] pTmp; + return; + } + + auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 + std::string oid = pNode.s(); + + //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + } + //delete[] pTmp; +} \ No newline at end of file diff --git a/applications/ems_datahubs/eventhandler.h b/applications/ems_datahubs/eventhandler.h new file mode 100644 index 0000000..2c30d2a --- /dev/null +++ b/applications/ems_datahubs/eventhandler.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +class EventHandler +{ +public: + EventHandler(); + virtual ~EventHandler(); + +public: + static void onRecvHandler(hio_t* io, void* buf, int readbytes); +}; + diff --git a/applications/ems_datahubs/frame_define.h b/applications/ems_datahubs/frame_define.h index 7188763..400553b 100644 --- a/applications/ems_datahubs/frame_define.h +++ b/applications/ems_datahubs/frame_define.h @@ -3,7 +3,7 @@ #include "kdefine.h" -NAMESPACE_BEGIN(HJ) +__NAMESPACE_BEGIN__(HJ) typedef enum tagErrorCode : unsigned char { @@ -16,8 +16,8 @@ typedef enum tagErrorCode : unsigned char typedef enum tagFrameType : unsigned char { - Frame_Request = 0x00, //请求帧 - Frame_Response = 0x01, //返回帧 + Frame_Response = 0x00, //返回帧 + Frame_Request = 0x01, //请求帧 Frame_DeviceData_Request = 0x02, //来自采集程序的数据请求包,将数据保存到数据库中 }FrameType; @@ -44,6 +44,6 @@ typedef struct tagFrame } }MessageFrame; -NAMESPACE_END(HJ) +__NAMESPACE_END__(HJ) #pragma pack() \ No newline at end of file diff --git a/applications/ems_datahubs/iconv-utils.cpp b/applications/ems_datahubs/iconv-utils.cpp index 8d8f0ab..fc07941 100644 --- a/applications/ems_datahubs/iconv-utils.cpp +++ b/applications/ems_datahubs/iconv-utils.cpp @@ -135,8 +135,11 @@ int code_convert(const char* from_charset, const char* to_charset, char* inbuf, return -1; } iconv_close(cd); - *pout = '\0'; - +#ifndef _WIN32 + * pout = '\0'; +#else + *pout = (char*)'\0'; +#endif return 0; } diff --git a/applications/ems_datahubs/kdefine.h b/applications/ems_datahubs/kdefine.h index 1f253da..5ab3434 100644 --- a/applications/ems_datahubs/kdefine.h +++ b/applications/ems_datahubs/kdefine.h @@ -33,9 +33,9 @@ #endif // __cplusplus -#define NAMESPACE_BEGIN(X) namespace X { -#define NAMESPACE_END(X) } -#define USING_NAMESPACE(X) using namespace X -#define USING_NAMESPACE_HJ using namespace HJ +#define __NAMESPACE_BEGIN__(X) namespace X { +#define __NAMESPACE_END__(X) } +#define __USING_NAMESPACE__(X) using namespace X +#define __USING_NAMESPACE_HJ__ using namespace HJ -#endif +#endif //__KDEFINE_INCLUDE__ diff --git a/applications/ems_datahubs/main.cpp b/applications/ems_datahubs/main.cpp index 86eab26..952797b 100644 --- a/applications/ems_datahubs/main.cpp +++ b/applications/ems_datahubs/main.cpp @@ -1,6 +1,8 @@ 锘#include #include +#ifndef _WIN32 #include +#endif #include #include @@ -19,6 +21,8 @@ #include #include +#include "eventhandler.h" + #ifndef TEST_UNPACK #define TEST_UNPACK 1 #endif @@ -134,6 +138,7 @@ int parse_confile(const char* confile) strncpy(g_main_ctx.logfile, str.c_str(), sizeof(g_main_ctx.logfile)); } hlog_set_file(g_main_ctx.logfile); +#if 1 // loglevel str = g_conf_ctx.parser->GetValue("loglevel"); if (!str.empty()) @@ -158,14 +163,26 @@ int parse_confile(const char* confile) { logger_enable_fsync(hlog, hv_getboolean(str.c_str())); } - // first log here +#endif + // first log here hlogi("=========--- Welcome to the Earth ---========="); hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION); hlog_fsync(); +#if 0 + g_conf_ctx.worker_processes = 1; + g_conf_ctx.worker_threads = 1; + g_conf_ctx.host = "0.0.0.0"; + g_conf_ctx.port = 44242; + g_conf_ctx.dbname = "hjems"; + g_conf_ctx.dbuser = "root"; + g_conf_ctx.dbserver = "tcp://127.0.0.1:3306"; +#endif // worker_processes int worker_processes = 0; + +#if 1 #ifdef DEBUG // Disable multi-processes mode for debugging worker_processes = 0; @@ -201,6 +218,7 @@ int parse_confile(const char* confile) worker_threads = atoi(str.c_str()); } } + g_conf_ctx.worker_threads = LIMIT(0, worker_threads, 64); //host @@ -272,6 +290,7 @@ int parse_confile(const char* confile) hlogi("dbserver = ('%s')", g_conf_ctx.dbserver.c_str()); hlogi("parse_confile('%s') OK", confile); +#endif return 0; } @@ -514,176 +533,6 @@ static void on_accept(hio_t* io) //鎺ユ敹鍒版暟鎹 static void on_recv(hio_t* io, void* buf, int readbytes) { - USING_NAMESPACE_HJ; - - char localaddrstr[SOCKADDR_STRLEN] = { 0 }; - char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; - hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes, - SOCKADDR_STR(hio_localaddr(io), localaddrstr), - SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - - MessageFrame respFrame; - if( readbytes > 0xFFFF - 1 ) - { - hloge("too large data buffer to process: %d", readbytes); - respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); - hio_write(io, (void*)&respFrame, sizeof respFrame); - return; - } - - MessageFrame *pReadFrame = (MessageFrame*)buf; - - if( pReadFrame->frame_len > 0xFFFF - 1 ) - { - hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); - respFrame.setErrorFrame(ERR_INVALID_LEN); - hio_write(io, (void*)&respFrame, sizeof respFrame); - return; - } - - OpenJson json; - - CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //鍒ゆ柇鏄惁鏄痷tf-8 - - hlogi("<=== recieve buffer code is [%d]", buf_code); - - std::string msg((char*)pReadFrame->frame_content, pReadFrame->frame_len); - - if( buf_code == CODING::GBK - || buf_code == CODING::UNKOWN ) - { - std::string str_result; - //杞崲涓篣TF8 - if( !GBKToUTF8(msg, str_result) ) - { - hloge("Failed to transfer code from GBK to UTF-8"); - respFrame.setErrorFrame(ERR_INVALID_UTF8); - hio_write(io, (void*)&respFrame, sizeof respFrame); - return; - } - - hlogi("Successfuly transfer code from GBK to UTF-8!"); - msg = str_result; - } - -#ifdef _DEBUG - hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //杩欓噷杩樻槸濂界殑 -#endif - - unsigned int len = msg.length(); - char* pTmp = new char[len]; - memcpy(pTmp, msg.c_str(), len); - - std::shared_ptr ptr; //鏀句釜鏅鸿兘鎸囬拡鐪佸緱蹇樿鍒犻櫎 - ptr.reset(pTmp); - - //hlogi("<=== decode OK, msg=[%s]", msg.c_str()); - -#if 0 - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str()); - hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str()); -#endif - - //hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str()); - - if( !json.decode(msg) ) - { - //delete[] pTmp; - hloge("Failed to decode json string pack , length=%d", readbytes); - hio_write(io, (void*)&respFrame, sizeof respFrame); - return; - } - - hio_write(io, (void*)&respFrame, sizeof respFrame); - - std::string fsucode = json["FsuCode"].s(); - std::string msg_type = json["type"].s(); - std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s(); - - if( fsucode.length() == 0 ) - { - //delete[] pTmp; - hlogw("!!empty fsucode recieved!"); - return; - } - - hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str()); - -#ifdef _DEBUG - hlogd("<<<>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str()); -#endif - - std::string out_compress; - - int zip_ret = 0; - if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK ) - { - hloge("Failed to compress source data, zip return value %d", zip_ret); - return; - } - - hlogd("<<<>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str()); -#endif - //std::string msg2(pTmp, len); - - if( msg_type == "gateway-data" - || msg_type == "gateway-alarmdata" - || msg_type == "gateway-writedata" - || msg_type == "gateway-readdata" - || msg_type == "web-write" - || msg_type == "web-alarm" ) - { - auto& IdCodeContent = json["IdCodeContent"]; - if( IdCodeContent.size() <= 0 ) - { - //delete[] pTmp; - hloge("invalid IdCodeContent's size: %d", IdCodeContent.size()); - return; - } - - auto& pNode = IdCodeContent[0]; //杩欐槸鍙В鏋愮涓涓妭鐐 - std::string oid = pNode["OID"].s(); - - //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); - } - - if( msg_type == "web-read" ) - { - auto& IdCodeContent = json["IdCodes"]; - if( IdCodeContent.size() <= 0 ) - { - hloge("invalid IdCodes's size: %d", IdCodeContent.size()); - //delete[] pTmp; - return; - } - - auto& pNode = IdCodeContent[0]; //杩欐槸鍙В鏋愮涓涓妭鐐 - std::string oid = pNode.s(); - - //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); - } - //delete[] pTmp; + EventHandler::onRecvHandler(io, buf, readbytes); }