From e406217ad02fc0572532a9d5e001021a860d3552 Mon Sep 17 00:00:00 2001 From: hkc320 Date: Wed, 4 Sep 2024 16:24:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=B8=A7=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- applications/ems_datahubs/datahubs.vcxproj | 2 + applications/ems_datahubs/frame_define.h | 34 +- applications/ems_datahubs/kdefine.h | 2 +- applications/ems_datahubs/main.cpp | 473 +++++---------------- applications/ems_datahubs/openjson.cpp | 10 +- 5 files changed, 150 insertions(+), 371 deletions(-) diff --git a/applications/ems_datahubs/datahubs.vcxproj b/applications/ems_datahubs/datahubs.vcxproj index 715d003..551986e 100644 --- a/applications/ems_datahubs/datahubs.vcxproj +++ b/applications/ems_datahubs/datahubs.vcxproj @@ -84,6 +84,7 @@ + @@ -92,6 +93,7 @@ + diff --git a/applications/ems_datahubs/frame_define.h b/applications/ems_datahubs/frame_define.h index c0fa112..7188763 100644 --- a/applications/ems_datahubs/frame_define.h +++ b/applications/ems_datahubs/frame_define.h @@ -5,17 +5,43 @@ NAMESPACE_BEGIN(HJ) -typedef enum tagFrameType : char +typedef enum tagErrorCode : unsigned char { - Frame_Request = 0, //请求帧 - Frame_Response = 1, //返回帧 + ERR_OK = 0X00, + ERR_INVALID_LEN = 0X01, + ERR_INVALID_UTF8 = 0X02, + ERR_INVALID_BUF_LEN = 0X03, + ERR_UNKOWN = 0XFF +}ErrorCode; + +typedef enum tagFrameType : unsigned char +{ + Frame_Request = 0x00, //请求帧 + Frame_Response = 0x01, //返回帧 + Frame_DeviceData_Request = 0x02, //来自采集程序的数据请求包,将数据保存到数据库中 }FrameType; +typedef struct tagFrameTail +{ + unsigned char frame_delimiter[4] = { 0xEE,0xFF,0xEE,0xFF }; +}FrameTail; + typedef struct tagFrame { FrameType frame_type; //帧类型 unsigned int frame_len; //帧数据长度 - char frame_content[1]; //帧的内容,实际应为json字符串,由json内容自解释 + unsigned char frame_content[1]; //帧的内容,实际应为json字符串,由json内容自解释 + tagFrame() + { + frame_type = Frame_Response; + frame_len = 1; + frame_content[0] = ERR_OK; + } + void setErrorFrame(ErrorCode err = ERR_OK) + { + frame_len = 1; + frame_content[0] = err; + } }MessageFrame; NAMESPACE_END(HJ) diff --git a/applications/ems_datahubs/kdefine.h b/applications/ems_datahubs/kdefine.h index 55d84a9..1f253da 100644 --- a/applications/ems_datahubs/kdefine.h +++ b/applications/ems_datahubs/kdefine.h @@ -36,6 +36,6 @@ #define NAMESPACE_BEGIN(X) namespace X { #define NAMESPACE_END(X) } #define USING_NAMESPACE(X) using namespace X - +#define USING_NAMESPACE_HJ using namespace HJ #endif diff --git a/applications/ems_datahubs/main.cpp b/applications/ems_datahubs/main.cpp index 1563c97..86eab26 100644 --- a/applications/ems_datahubs/main.cpp +++ b/applications/ems_datahubs/main.cpp @@ -1,11 +1,16 @@ 锘#include -#include #include #include #include -#include #include -#include + +#include "kdefine.h" +#include "frame_define.h" +#include "kutilities.h" +#include "mqtt_msg.h" +#include "opmysql.h" +#include "iconv-utils.h" +#include "openjson.h" #include #include @@ -13,176 +18,24 @@ #include #include #include -#include - -#include "kdefine.h" - -#include "mqtt_msg.h" -#include "openjson.h" -#include "opmysql.h" - -#include "iconv-utils.h" +#ifndef TEST_UNPACK #define TEST_UNPACK 1 - - -#define CHUNK 16384 - -/* Compress from file source to file dest until EOF on source. -def() returns Z_OK on success, Z_MEM_ERROR if memory could not be -allocated for processing, Z_STREAM_ERROR if an invalid compression -level is supplied, Z_VERSION_ERROR if the version of zlib.h and the -version of the library linked do not match, or Z_ERRNO if there is -an error reading or writing the files. */ -int CompressString(const char* in_str, size_t in_len, std::string& out_str, int level) -{ - if (!in_str) - return Z_DATA_ERROR; - - int ret, flush; - unsigned have; - z_stream strm; - - unsigned char out[CHUNK]; - - /* allocate deflate state */ - strm.zalloc = Z_NULL; - strm.zfree = Z_NULL; - strm.opaque = Z_NULL; - ret = deflateInit(&strm, level); - if (ret != Z_OK) - return ret; - - std::shared_ptr sp_strm(&strm, [](z_stream* strm) - { - (void)deflateEnd(strm); - }); - - const char* end = in_str + in_len; - - //size_t pos_index = 0; - size_t distance = 0; - /* compress until end of file */ - do - { - distance = end - in_str; - strm.avail_in = (distance >= CHUNK) ? CHUNK : distance; - strm.next_in = (Bytef*)in_str; - - // next pos - in_str += strm.avail_in; - flush = (in_str == end) ? Z_FINISH : Z_NO_FLUSH; - - /* run deflate() on input until output buffer not full, finish - compression if all of source has been read in */ - do - { - strm.avail_out = CHUNK; - strm.next_out = out; - ret = deflate(&strm, flush); /* no bad return value */ - if (ret == Z_STREAM_ERROR) - break; - have = CHUNK - strm.avail_out; - out_str.append((const char*)out, have); - } while (strm.avail_out == 0); - - if (strm.avail_in != 0); /* all input will be used */ - break; - - /* done when last data in file processed */ - } while (flush != Z_FINISH); - - if (ret != Z_STREAM_END) /* stream will be complete */ - return Z_STREAM_ERROR; - - /* clean up and return */ - return Z_OK; -} - -/* Decompress from file source to file dest until stream ends or EOF. -inf() returns Z_OK on success, Z_MEM_ERROR if memory could not be -allocated for processing, Z_DATA_ERROR if the deflate data is -invalid or incomplete, Z_VERSION_ERROR if the version of zlib.h and -the version of the library linked do not match, or Z_ERRNO if there -is an error reading or writing the files. */ -int DecompressString(const char* in_str, size_t in_len, std::string& out_str) -{ - if (!in_str) - return Z_DATA_ERROR; - - int ret; - unsigned have; - z_stream strm; - unsigned char out[CHUNK]; - - /* allocate inflate state */ - strm.zalloc = Z_NULL; - strm.zfree = Z_NULL; - strm.opaque = Z_NULL; - strm.avail_in = 0; - strm.next_in = Z_NULL; - ret = inflateInit(&strm); - if (ret != Z_OK) - return ret; - - std::shared_ptr sp_strm(&strm, [](z_stream* strm) - { - (void)inflateEnd(strm); - }); - - const char* end = in_str + in_len; - - //size_t pos_index = 0; - size_t distance = 0; - - int flush = 0; - /* decompress until deflate stream ends or end of file */ - do - { - distance = end - in_str; - strm.avail_in = (distance >= CHUNK) ? CHUNK : distance; - strm.next_in = (Bytef*)in_str; - - // next pos - in_str += strm.avail_in; - flush = (in_str == end) ? Z_FINISH : Z_NO_FLUSH; - - /* run inflate() on input until output buffer not full */ - do - { - strm.avail_out = CHUNK; - strm.next_out = out; - ret = inflate(&strm, Z_NO_FLUSH); - - if (ret == Z_STREAM_ERROR) /* state not clobbered */ - break; - - switch (ret) - { - case Z_NEED_DICT: - ret = Z_DATA_ERROR; /* and fall through */ - case Z_DATA_ERROR: - case Z_MEM_ERROR: - return ret; - } - have = CHUNK - strm.avail_out; - out_str.append((const char*)out, have); - } while (strm.avail_out == 0); - - /* done when inflate() says it's done */ - } while (flush != Z_FINISH); - - /* clean up and return */ - return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; -} - - -//static OpDatabase gOpDatabase; +#endif #if TEST_UNPACK static unpack_setting_t unpack_setting; #endif +//杩炴帴鍏抽棴鍥炶皟 +static void on_close(hio_t* io); +//鎺ュ叆鍥炶皟 +static void on_accept(hio_t* io); +//鎺ユ敹鍒版暟鎹 +static void on_recv(hio_t* io, void* buf, int readbytes); +//閲嶈浇鍙傛暟鍥炶皟 +static void on_reload(void* userdata); + /* * @build: make * @usage: datahubs -h @@ -208,6 +61,7 @@ typedef struct conf_ctx_s std::string dbuser; std::string dbname; } conf_ctx_t; + conf_ctx_t g_conf_ctx; inline void conf_ctx_init(conf_ctx_t* ctx) @@ -421,12 +275,6 @@ int parse_confile(const char* confile) return 0; } -static void on_reload(void* userdata) -{ - hlogi("reload confile [%s]", g_main_ctx.confile); - parse_confile(g_main_ctx.confile); -} - //////1/////////////////////////////// #if 0 #define LOCKFILE "/var/lock/datahub.lock" @@ -593,165 +441,124 @@ int main(int argc, char** argv) return 0; } +void worker_fn(void* userdata) +{ + conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata); + long port = ptrCtx->port; + + //initialize database connection + bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname); + if (!dbok) + { + hloge("failed to open database, exit now..."); + return; + } + + hlogi("database connection created!"); + + hloop_t* loop = hloop_new(0); + const char* host = ptrCtx->host.c_str(); + hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept); + + if (listenio == NULL) + { + hloge("worker process finished"); + return; + } + + hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio)); + + hloop_run(loop); + + hlogi("database connection close!"); + OpDatabase::getInstance()->CloseDatabase(); + + hloop_free(&loop); +} + + +/// 鍚勫洖璋冨嚱鏁板畾涔 + +static void on_reload(void* userdata) +{ + hlogi("reload confile [%s]", g_main_ctx.confile); + parse_confile(g_main_ctx.confile); +} + static void on_close(hio_t* io) { hlogi("on_close fd=%d error=%d", hio_fd(io), hio_error(io)); } -std::string get_current_timestamp() +//鎺ュ叆鍥炶皟 +static void on_accept(hio_t* io) { - auto now = std::chrono::system_clock::now(); - //閫氳繃涓嶅悓绮惧害鑾峰彇鐩稿樊鐨勬绉掓暟 - uint64_t dis_millseconds = std::chrono::duration_cast(now.time_since_epoch()).count() - - std::chrono::duration_cast(now.time_since_epoch()).count() * 1000; - time_t tt = std::chrono::system_clock::to_time_t(now); - auto time_tm = localtime(&tt); - char strTime[25] = { 0 }; - sprintf(strTime, "%d-%02d-%02d %02d:%02d:%02d.%03d", time_tm->tm_year + 1900, - time_tm->tm_mon + 1, time_tm->tm_mday, time_tm->tm_hour, - time_tm->tm_min, time_tm->tm_sec, (int)dis_millseconds); - return std::string(strTime); -} + hlogi("on_accept connfd=%d", hio_fd(io)); -// 鍑芥暟鐢ㄤ簬灏嗗唴瀛樺潡杞崲涓哄崄鍏繘鍒跺瓧绗︿覆 -std::string printHex(const void* data, size_t size) -{ - std::ostringstream oss; - std::ostringstream oss2; - std::ostringstream ossrow; + char localaddrstr[SOCKADDR_STRLEN] = { 0 }; + char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; + hlogi("accept connfd=%d [%s] <=== [%s]", hio_fd(io), + SOCKADDR_STR(hio_localaddr(io), localaddrstr), + SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - const size_t lineSize = 16; // 姣忚杈撳嚭鐨勫瓧鑺傛暟 - const unsigned char* p = static_cast(data); + hio_setcb_close(io, on_close); + hio_setcb_read(io, on_recv); - int ic = 0; - int row = 0; - - ossrow << std::setw(8) << std::setfill('0') << std::hex << row++ << "h : "; - oss << ossrow.str().c_str(); - - for (size_t i = 0; i < size; ++i) - { - ic++; - - // 姣忎釜瀛楄妭涔嬮棿鐢ㄧ┖鏍煎垎闅 - oss << std::setw(2) << std::setfill('0') << std::hex << std::uppercase << static_cast(p[i]); - - char ch = (isprint(p[i]) != 0) ? p[i] : '.'; - - oss2 << ch; - - // 姣弆ineSize涓瓧鑺傛崲琛 - if ((i + 1) % lineSize == 0) - { - ossrow.clear(); - ossrow.str(""); - - oss << " [" << oss2.str().c_str() << "]" << std::endl; - oss2.clear(); - oss2.str(""); - - ossrow << std::setw(8) << std::setfill('0') << std::hex << row++ << "h : "; - oss << ossrow.str().c_str(); - - ic = 0; - } - else if (i == size - 1) - { - if ((i + 1) % lineSize != 0) - { - if (i % 2 != 0) - { - for (size_t j = 0; j < (lineSize - ic); j++) - { - oss << " --"; - } - } - else - { - for (size_t j = 0; j < (lineSize - ic); j++) - { - oss << " --"; - } - } - } - oss << " [" << oss2.str().c_str(); - - if ((i + 1) % lineSize != 0) - { - for (size_t j = 0; j < (lineSize - ic); j++) - { - oss << " "; - } - } - - oss << "]" << std::endl; - oss2.clear(); - oss2.str(""); - - ic = 0; - } -#if 0 - else if ((i + 1) % 8 == 0) - { - oss << " "; - oss2 << " "; - } +#if TEST_UNPACK + hio_set_unpack(io, &unpack_setting); #endif - else - { - oss << " "; - } - } - return oss.str(); -} + hio_read_start(io); +} //鎺ユ敹鍒版暟鎹 static void on_recv(hio_t* io, void* buf, int readbytes) { + USING_NAMESPACE_HJ; + char localaddrstr[SOCKADDR_STRLEN] = { 0 }; char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes, SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - char ret[1] = { 0 }; - if (readbytes > 0xFFFF - 1) + MessageFrame respFrame; + if( readbytes > 0xFFFF - 1 ) { hloge("too large data buffer to process: %d", readbytes); - ret[0] = 1; - hio_write(io, (void*)ret, 1); + respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); + hio_write(io, (void*)&respFrame, sizeof respFrame); return; } + + MessageFrame *pReadFrame = (MessageFrame*)buf; - MessageData* pData = (MessageData*)buf; + if( pReadFrame->frame_len > 0xFFFF - 1 ) + { + hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); + respFrame.setErrorFrame(ERR_INVALID_LEN); + hio_write(io, (void*)&respFrame, sizeof respFrame); + return; + } OpenJson json; - if (pData->content_len > 0xFFFF - 1) - { - hloge("too big string buffer to process: %d, it should be less than %d", pData->content_len, 0xFFFF); - ret[0] = 1; - hio_write(io, (void*)ret, 1); - return; - } - - CODING buf_code = GetCoding((unsigned char*)pData->content_data, pData->content_len); //鍒ゆ柇鏄惁鏄痷tf-8 + CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //鍒ゆ柇鏄惁鏄痷tf-8 hlogi("<=== recieve buffer code is [%d]", buf_code); - std::string msg(pData->content_data, pData->content_len); + std::string msg((char*)pReadFrame->frame_content, pReadFrame->frame_len); - if (buf_code == CODING::GBK - || buf_code == CODING::UNKOWN) + if( buf_code == CODING::GBK + || buf_code == CODING::UNKOWN ) { std::string str_result; //杞崲涓篣TF8 - if (!GBKToUTF8(msg, str_result)) + if( !GBKToUTF8(msg, str_result) ) { hloge("Failed to transfer code from GBK to UTF-8"); - ret[0] = 1; - hio_write(io, (void*)ret, 1); + respFrame.setErrorFrame(ERR_INVALID_UTF8); + hio_write(io, (void*)&respFrame, sizeof respFrame); return; } @@ -798,22 +605,21 @@ static void on_recv(hio_t* io, void* buf, int readbytes) //hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str()); - if (!json.decode(msg)) + if( !json.decode(msg) ) { //delete[] pTmp; hloge("Failed to decode json string pack , length=%d", readbytes); - ret[0] = 1; - hio_write(io, (void*)ret, 1); + hio_write(io, (void*)&respFrame, sizeof respFrame); return; } - hio_write(io, (void*)ret, 1); + hio_write(io, (void*)&respFrame, sizeof respFrame); std::string fsucode = json["FsuCode"].s(); std::string msg_type = json["type"].s(); std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s(); - if (fsucode.length() == 0) + if( fsucode.length() == 0 ) { //delete[] pTmp; hlogw("!!empty fsucode recieved!"); @@ -829,7 +635,7 @@ static void on_recv(hio_t* io, void* buf, int readbytes) std::string out_compress; int zip_ret = 0; - if ((zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK) + if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK ) { hloge("Failed to compress source data, zip return value %d", zip_ret); return; @@ -842,15 +648,15 @@ static void on_recv(hio_t* io, void* buf, int readbytes) #endif //std::string msg2(pTmp, len); - if (msg_type == "gateway-data" + if( msg_type == "gateway-data" || msg_type == "gateway-alarmdata" || msg_type == "gateway-writedata" || msg_type == "gateway-readdata" || msg_type == "web-write" - || msg_type == "web-alarm") + || msg_type == "web-alarm" ) { auto& IdCodeContent = json["IdCodeContent"]; - if (IdCodeContent.size() <= 0) + if( IdCodeContent.size() <= 0 ) { //delete[] pTmp; hloge("invalid IdCodeContent's size: %d", IdCodeContent.size()); @@ -860,13 +666,13 @@ static void on_recv(hio_t* io, void* buf, int readbytes) auto& pNode = IdCodeContent[0]; //杩欐槸鍙В鏋愮涓涓妭鐐 std::string oid = pNode["OID"].s(); - OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } - if (msg_type == "web-read") + if( msg_type == "web-read" ) { auto& IdCodeContent = json["IdCodes"]; - if (IdCodeContent.size() <= 0) + if( IdCodeContent.size() <= 0 ) { hloge("invalid IdCodes's size: %d", IdCodeContent.size()); //delete[] pTmp; @@ -876,63 +682,8 @@ static void on_recv(hio_t* io, void* buf, int readbytes) auto& pNode = IdCodeContent[0]; //杩欐槸鍙В鏋愮涓涓妭鐐 std::string oid = pNode.s(); - OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + //OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } //delete[] pTmp; } -//鎺ュ叆鍥炶皟 -static void on_accept(hio_t* io) -{ - hlogi("on_accept connfd=%d", hio_fd(io)); - - char localaddrstr[SOCKADDR_STRLEN] = { 0 }; - char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; - hlogi("accept connfd=%d [%s] <=== [%s]", hio_fd(io), - SOCKADDR_STR(hio_localaddr(io), localaddrstr), - SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - - hio_setcb_close(io, on_close); - hio_setcb_read(io, on_recv); - -#if TEST_UNPACK - hio_set_unpack(io, &unpack_setting); -#endif - - hio_read_start(io); -} - -void worker_fn(void* userdata) -{ - conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata); - long port = ptrCtx->port; - - //initialize database connection - bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname); - if (!dbok) - { - hloge("failed to open database, exit now..."); - return; - } - - hlogi("database connection created!"); - - hloop_t* loop = hloop_new(0); - const char* host = ptrCtx->host.c_str(); - hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept); - - if (listenio == NULL) - { - hloge("worker process finished"); - return; - } - - hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio)); - - hloop_run(loop); - - hlogi("database connection close!"); - OpDatabase::getInstance()->CloseDatabase(); - - hloop_free(&loop); -} diff --git a/applications/ems_datahubs/openjson.cpp b/applications/ems_datahubs/openjson.cpp index 1e3241b..2b8002d 100644 --- a/applications/ems_datahubs/openjson.cpp +++ b/applications/ems_datahubs/openjson.cpp @@ -188,10 +188,10 @@ bool OpenJson::Box::remove(OpenJson* node) //JsonContext OpenJson::Context::Context() : - offset_(0), data_(0), - root_(0), - size_(0) + size_(0), + offset_(0), + root_(0) { } @@ -278,10 +278,10 @@ OpenJson::OpenJson(JsonType type) :type_(type), context_(0), wcontext_(0), - box_(0), idx_(0), - key_(0), len_(0), + box_(0), + key_(0), segment_(0) { }