diff --git a/applications/ems_datahubs/WinDataHubs.vcxproj b/applications/ems_datahubs/WinDataHubs.vcxproj index a8a305c..c2fb411 100644 --- a/applications/ems_datahubs/WinDataHubs.vcxproj +++ b/applications/ems_datahubs/WinDataHubs.vcxproj @@ -120,11 +120,15 @@ true _DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) true + D:\Workspace\EmsProjects\emsApplication\sdk\mysql\include;%(AdditionalIncludeDirectories) Console true - hv.lib;zlibd.lib;mysqlcppconn.lib;libiconvD.lib;%(AdditionalDependencies) + hv.lib;zlibd.lib;libmysql.lib;mysqlclient.lib;libiconvD.lib;%(AdditionalDependencies) + + + D:\Workspace\EmsProjects\emsApplication\sdk\mysql\lib\debug;%(AdditionalLibraryDirectories) diff --git a/applications/ems_datahubs/eventhandler.cpp b/applications/ems_datahubs/eventhandler.cpp index 92882c3..8cb6ee7 100644 --- a/applications/ems_datahubs/eventhandler.cpp +++ b/applications/ems_datahubs/eventhandler.cpp @@ -1,5 +1,7 @@ #include "eventhandler.h" +#include + #include #include #include @@ -37,7 +39,8 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes) SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); MessageFrame respFrame; - if( readbytes > 0xFFFF - 1 ) + + if( readbytes > FRAME_MAX_LENGTH - 1 ) //限制1Mb 字节 { hloge("too large data buffer to process: %d", readbytes); respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); @@ -45,12 +48,12 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes) return; } - hlogi("<=== decode OK [\n%s\n]", printHex(buf, readbytes).c_str()); + hlogi("<=== decode OK [\n%s\n]", printHex(buf, MIN(readbytes,512)).c_str()); MessageFrame* pReadFrame = (MessageFrame*)buf; hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len); - if( pReadFrame->frame_len > 0xFFFF - 1 ) + if( pReadFrame->frame_len > FRAME_MAX_LENGTH - 1 ) { hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); respFrame.setErrorFrame(ERR_INVALID_LEN); @@ -58,13 +61,36 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes) return; } - if( Frame_DeviceData_Request == pReadFrame->frame_type ) + switch (pReadFrame->frame_type) + { + case Frame_DeviceData_Request: { handleGatherData(io, buf, readbytes); + break; } - else + case Frame_Echo_Request: + { + hlogd("<== recieve test echo request and response signal back!"); + hio_write(io, (void*)&respFrame, sizeof respFrame); + break; + } + case Frame_Configure_DB_Request: + { + hlogd("<== recieve configuration file upload request!"); + int ret = handleConfigureData(io, buf, readbytes); + + if(ret!=0) + respFrame.setErrorFrame((ErrorCode)ret); + + hio_write(io, (void*)&respFrame, sizeof respFrame); + + break; + } + default: { assert(false); + break; + } } } @@ -246,4 +272,37 @@ void EventHandler::handleGatherData(hio_t* io, void* buf, int readbytes) hio_write(io, (void*)&respFrame, sizeof respFrame); return; } +} + +int EventHandler::handleConfigureData(hio_t* io, void* buf, int readbytes) +{ + assert(buf); + + __USING_NAMESPACE_HJ__; + + MessageFrame respFrame; + MessageFrame* pReadFrame = (MessageFrame*)buf; + + //保存文件 + std::string filename("d://test_1.zip"); + std::ofstream file(filename, std::ios::binary); + + if (!file) + { + hloge("failed to create file: %s",filename.c_str()); + return ERR_PERMISSION_DENIED; + } + // 将 pdata 中的内容写入文件 + file.write((const char*)(pReadFrame->frame_content), pReadFrame->frame_len); + + if (!file) + { + hloge("failed to write to file: %s", filename.c_str()); + return ERR_RETRANS_CONTENT; + } + else + { + hlogd("successfuly to write to file: %s", filename.c_str()); + return ErrorCode::ERR_OK; + } } \ No newline at end of file diff --git a/applications/ems_datahubs/eventhandler.h b/applications/ems_datahubs/eventhandler.h index 0f8be18..4ef694a 100644 --- a/applications/ems_datahubs/eventhandler.h +++ b/applications/ems_datahubs/eventhandler.h @@ -14,5 +14,6 @@ public: protected: //处理采集程序上传的数据,以JSON数据上报 static void handleGatherData(hio_t* io, void* buf, int readbytes); + static int handleConfigureData(hio_t* io, void* buf, int readbytes); }; diff --git a/applications/ems_datahubs/frame_define.h b/applications/ems_datahubs/frame_define.h index c0cbe62..2635915 100644 --- a/applications/ems_datahubs/frame_define.h +++ b/applications/ems_datahubs/frame_define.h @@ -7,7 +7,7 @@ __NAMESPACE_BEGIN__(HJ) #define FRAME_HEADER_LENGTH (5) #define FRAME_TAILE_LENGTH (4) - +#define FRAME_MAX_LENGTH (0x100000) //限制1Mb 字节 typedef enum tagErrorCode : unsigned char { ERR_OK = 0X00, @@ -16,6 +16,9 @@ typedef enum tagErrorCode : unsigned char ERR_INVALID_BUF_LEN = 0X03, ERR_INVALID_JSON_FMT = 0X04, ERR_INVALID_FSUCODE = 0X05, + ERR_INVALID_CFG_CONTENT = 0X06, // 文件内容错误 + ERR_RETRANS_CONTENT = 0X07, // 重传文件内容 + ERR_PERMISSION_DENIED = 0X08, //写文件权限不够 ERR_UNKOWN = 0XFF }ErrorCode; @@ -23,7 +26,9 @@ typedef enum tagFrameType : unsigned char { Frame_Response = 0x00, //返回帧 Frame_Request = 0x01, //请求帧 - Frame_DeviceData_Request = 0x02, //来自采集程序的数据请求包,将数据保存到数据库中 + Frame_Echo_Request = 0x02, //测试请求帧 + Frame_DeviceData_Request = 0x03, //来自采集程序的数据请求包,将数据保存到数据库中 + Frame_Configure_DB_Request = 0x04, //上传配置文件 }FrameType; typedef struct tagFrameTail @@ -31,6 +36,8 @@ typedef struct tagFrameTail unsigned char frame_delimiter[4] = { 0xEE,0xFF,0xEE,0xFF }; }FrameTail; +#define FRAME_HEADER_LENGTH (5) // frame_type(1) + frame_len(4) + typedef struct tagFrame { FrameType frame_type; //帧类型 diff --git a/applications/ems_datahubs/iconv-utils.cpp b/applications/ems_datahubs/iconv-utils.cpp index fc07941..34b9b23 100644 --- a/applications/ems_datahubs/iconv-utils.cpp +++ b/applications/ems_datahubs/iconv-utils.cpp @@ -155,7 +155,7 @@ int g2u(char* inbuf, size_t inlen, char* outbuf, size_t outlen) bool GBKToUTF8(const std::string& strGBK,std::string& str_result) { - int length = strGBK.size() * 2 + 1; + size_t length = strGBK.size() * 2 + 1; char* temp = (char*)malloc(sizeof(char) * length); @@ -175,7 +175,7 @@ bool GBKToUTF8(const std::string& strGBK,std::string& str_result) bool UTFtoGBK(const char* utf8, std::string& str_result) { - int length = strlen(utf8); + size_t length = strlen(utf8); char* temp = (char*)malloc(sizeof(char) * length); diff --git a/applications/ems_datahubs/main.cpp b/applications/ems_datahubs/main.cpp index fcbf455..ccc66cd 100644 --- a/applications/ems_datahubs/main.cpp +++ b/applications/ems_datahubs/main.cpp @@ -118,6 +118,7 @@ void print_help() int parse_confile(const char* confile) { +#ifdef __linux__ int ret = g_conf_ctx.parser->LoadFromFile(confile); if (ret != 0) { @@ -158,25 +159,26 @@ int parse_confile(const char* confile) logger_enable_fsync(hlog, hv_getboolean(str.c_str())); } #endif - +#endif // first log here hlogi("=========--- Welcome to the Earth ---========="); hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION); hlog_fsync(); -#if 0 +#if 1 g_conf_ctx.worker_processes = 1; g_conf_ctx.worker_threads = 1; g_conf_ctx.host = "0.0.0.0"; g_conf_ctx.port = 44242; g_conf_ctx.dbname = "hjems"; g_conf_ctx.dbuser = "root"; - g_conf_ctx.dbserver = "tcp://127.0.0.1:3306"; + g_conf_ctx.dbserver = "127.0.0.1"; #endif // worker_processes int worker_processes = 0; -#if 1 + +#if __linux__ #ifdef DEBUG // Disable multi-processes mode for debugging worker_processes = 0; @@ -338,7 +340,48 @@ int already_running(void) #endif /// end of checking only one instance /// //////////////////////////////////////////////// +#ifdef _WIN32 +int main(int argc, char** argv) +{ + // g_main_ctx + main_ctx_init(argc, argv); + conf_ctx_init(&g_conf_ctx); + // first log here + hlog_set_level_by_str("DEBUG"); + hlogi("=========--- Welcome to the Earth ---========="); + hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION); + hlog_fsync(); + +#if 1 + g_conf_ctx.worker_processes = 1; + g_conf_ctx.worker_threads = 1; + g_conf_ctx.host = "0.0.0.0"; + g_conf_ctx.port = 44242; + g_conf_ctx.dbname = "hjems"; + g_conf_ctx.dbuser = "root"; + g_conf_ctx.dbserver = "127.0.0.1"; +#endif + +#if TEST_UNPACK + memset(&unpack_setting, 0, sizeof(unpack_setting_t)); + unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH; + unpack_setting.mode = UNPACK_BY_DELIMITER; + unpack_setting.delimiter[0] = 0xEE; + unpack_setting.delimiter[1] = 0xFF; + unpack_setting.delimiter[2] = 0xEE; + unpack_setting.delimiter[3] = 0xFF; + unpack_setting.delimiter_bytes = 4; +#endif + + master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads); + + hlogi("=========--- I'll be back! ---========="); + + return 0; +} + +#else int main(int argc, char** argv) { #if 0 @@ -453,6 +496,7 @@ int main(int argc, char** argv) return 0; } +#endif void worker_fn(void* userdata) { @@ -460,7 +504,7 @@ void worker_fn(void* userdata) long port = ptrCtx->port; //initialize database connection - bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname); + bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,3306,ptrCtx->dbuser,"Hj57471000",ptrCtx->dbname); if (!dbok) { hloge("failed to open database, exit now..."); diff --git a/applications/ems_datahubs/opmysql.cpp b/applications/ems_datahubs/opmysql.cpp index a0c249f..be7170f 100644 --- a/applications/ems_datahubs/opmysql.cpp +++ b/applications/ems_datahubs/opmysql.cpp @@ -1,10 +1,14 @@ #include "opmysql.h" + +#ifndef _USING_MYSQL_51_LIB_ #include #include #include #include #include #include +#endif + #include #include #include "openjson.h" @@ -26,8 +30,9 @@ OpDatabase* OpDatabase::getInstance() return &m_instance; } -bool OpDatabase::OpenDatabase(const std::string& server, const std::string& dbuser, const std::string& database) +bool OpDatabase::OpenDatabase(const std::string& server /*= "127.0.0.1"*/, int dbport /*= 3306*/, const std::string& dbuser/* = "root"*/, const std::string& dbpasswd /*= "Hj57471000"*/, const std::string& database /*= "hjems"*/) { +#ifndef _USING_MYSQL_51_LIB_ // 打开数据库 try { @@ -64,18 +69,46 @@ bool OpDatabase::OpenDatabase(const std::string& server, const std::string& dbus hloge("Failed to connect to database: %s",ss.str().c_str()); return false; } +#else + bool ok = false; + m_pDbConnection = mysql_init(NULL); + //连接到MySQL服务器 + if (!mysql_real_connect(m_pDbConnection, server.c_str(), dbuser.c_str(), dbpasswd.c_str(), database.c_str(), dbport, NULL, 0)) + { + hloge("MySQL connection error: %s", mysql_error(m_pDbConnection)); + } + else + { + ok = true; + mysql_set_character_set(m_pDbConnection, "utf8"); + } + if (!ok) + { + hloge("Failed to connect to database."); + } + else + { + hlogd("new database connection created successfully!"); + } + return ok; +#endif } // 关闭数据库连接 void OpDatabase::CloseDatabase() { +#ifdef _USING_MYSQL_51_LIB_ + mysql_close(m_pDbConnection); +#else m_pDbConnection->close(); //delete m_pDbConnection; +#endif } void OpDatabase::InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id) { +#ifndef _USING_MYSQL_51_LIB_ const char* insertSql = "INSERT INTO tbl_data (data_timestamp, data_type, FsuCode, data_content, topic, device_id) VALUES (?,?,?,?,?,?);"; // 创建预编译的prepared statement std::unique_ptr insertStmt(m_pDbConnection->prepareStatement(insertSql)); @@ -100,4 +133,6 @@ void OpDatabase::InsertMessage(const std::string& ts, const std::string& msg_typ { hloge( "No rows affected during insert operation." ); } +#else +#endif } diff --git a/applications/ems_datahubs/opmysql.h b/applications/ems_datahubs/opmysql.h index c258500..adbf765 100644 --- a/applications/ems_datahubs/opmysql.h +++ b/applications/ems_datahubs/opmysql.h @@ -2,9 +2,18 @@ #define __MY_OPERATE_MYSQL__ #include -#include +#ifdef _WIN32 +# define _USING_MYSQL_51_LIB_ +# include +# include //MySQL C API include file +#else +# ifdef __linux__ +# include +# endif +#endif + class OpDatabase { protected: @@ -14,12 +23,18 @@ public: static OpDatabase* getInstance(); void CloseDatabase(); - bool OpenDatabase(const std::string& server = "tcp://127.0.0.1:3306", const std::string& dbuser = "root", const std::string& database="hjems"); + bool OpenDatabase(const std::string& server = "127.0.0.1", int dbport = 3306, const std::string& dbuser = "root", const std::string& dbpasswd = "Hj57471000", const std::string& database = "hjems"); + void InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id); protected: - //std::unique_ptr m_pDbConnection; - sql::Connection* m_pDbConnection; +#ifdef _WIN32 + MYSQL* m_pDbConnection; +#else +# ifdef __linux__ + sql::Connection* m_pDbConnection; +# endif +#endif private: static OpDatabase m_instance; };