完成上传文件消息处理

main
HwangKC 2024-11-15 18:22:43 +08:00
parent 1d8c55ea70
commit 440137cac2
8 changed files with 185 additions and 20 deletions

View File

@ -120,11 +120,15 @@
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<AdditionalIncludeDirectories>D:\Workspace\EmsProjects\emsApplication\sdk\mysql\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<AdditionalDependencies>hv.lib;zlibd.lib;mysqlcppconn.lib;libiconvD.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies>hv.lib;zlibd.lib;libmysql.lib;mysqlclient.lib;libiconvD.lib;%(AdditionalDependencies)</AdditionalDependencies>
<Version>
</Version>
<AdditionalLibraryDirectories>D:\Workspace\EmsProjects\emsApplication\sdk\mysql\lib\debug;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">

View File

@ -1,5 +1,7 @@
#include "eventhandler.h"
#include <fstream>
#include <hv/hv.h>
#include <hv/hmain.h>
#include <hv/iniparser.h>
@ -37,7 +39,8 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes)
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
MessageFrame respFrame;
if( readbytes > 0xFFFF - 1 )
if( readbytes > FRAME_MAX_LENGTH - 1 ) //限制1Mb 字节
{
hloge("too large data buffer to process: %d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_BUF_LEN);
@ -45,12 +48,12 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes)
return;
}
hlogi("<=== decode OK [\n%s\n]", printHex(buf, readbytes).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(buf, MIN(readbytes,512)).c_str());
MessageFrame* pReadFrame = (MessageFrame*)buf;
hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len);
if( pReadFrame->frame_len > 0xFFFF - 1 )
if( pReadFrame->frame_len > FRAME_MAX_LENGTH - 1 )
{
hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF);
respFrame.setErrorFrame(ERR_INVALID_LEN);
@ -58,13 +61,36 @@ void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes)
return;
}
if( Frame_DeviceData_Request == pReadFrame->frame_type )
switch (pReadFrame->frame_type)
{
case Frame_DeviceData_Request:
{
handleGatherData(io, buf, readbytes);
break;
}
else
case Frame_Echo_Request:
{
hlogd("<== recieve test echo request and response signal back!");
hio_write(io, (void*)&respFrame, sizeof respFrame);
break;
}
case Frame_Configure_DB_Request:
{
hlogd("<== recieve configuration file upload request!");
int ret = handleConfigureData(io, buf, readbytes);
if(ret!=0)
respFrame.setErrorFrame((ErrorCode)ret);
hio_write(io, (void*)&respFrame, sizeof respFrame);
break;
}
default:
{
assert(false);
break;
}
}
}
@ -247,3 +273,36 @@ void EventHandler::handleGatherData(hio_t* io, void* buf, int readbytes)
return;
}
}
int EventHandler::handleConfigureData(hio_t* io, void* buf, int readbytes)
{
assert(buf);
__USING_NAMESPACE_HJ__;
MessageFrame respFrame;
MessageFrame* pReadFrame = (MessageFrame*)buf;
//保存文件
std::string filename("d://test_1.zip");
std::ofstream file(filename, std::ios::binary);
if (!file)
{
hloge("failed to create file: %s",filename.c_str());
return ERR_PERMISSION_DENIED;
}
// 将 pdata 中的内容写入文件
file.write((const char*)(pReadFrame->frame_content), pReadFrame->frame_len);
if (!file)
{
hloge("failed to write to file: %s", filename.c_str());
return ERR_RETRANS_CONTENT;
}
else
{
hlogd("successfuly to write to file: %s", filename.c_str());
return ErrorCode::ERR_OK;
}
}

View File

@ -14,5 +14,6 @@ public:
protected:
//处理采集程序上传的数据以JSON数据上报
static void handleGatherData(hio_t* io, void* buf, int readbytes);
static int handleConfigureData(hio_t* io, void* buf, int readbytes);
};

View File

@ -7,7 +7,7 @@ __NAMESPACE_BEGIN__(HJ)
#define FRAME_HEADER_LENGTH (5)
#define FRAME_TAILE_LENGTH (4)
#define FRAME_MAX_LENGTH (0x100000) //限制1Mb 字节
typedef enum tagErrorCode : unsigned char
{
ERR_OK = 0X00,
@ -16,6 +16,9 @@ typedef enum tagErrorCode : unsigned char
ERR_INVALID_BUF_LEN = 0X03,
ERR_INVALID_JSON_FMT = 0X04,
ERR_INVALID_FSUCODE = 0X05,
ERR_INVALID_CFG_CONTENT = 0X06, // 文件内容错误
ERR_RETRANS_CONTENT = 0X07, // 重传文件内容
ERR_PERMISSION_DENIED = 0X08, //写文件权限不够
ERR_UNKOWN = 0XFF
}ErrorCode;
@ -23,7 +26,9 @@ typedef enum tagFrameType : unsigned char
{
Frame_Response = 0x00, //·µ»ØÖ¡
Frame_Request = 0x01, //ÇëÇóÖ¡
Frame_DeviceData_Request = 0x02, //来自采集程序的数据请求包,将数据保存到数据库中
Frame_Echo_Request = 0x02, //测试请求帧
Frame_DeviceData_Request = 0x03, //来自采集程序的数据请求包,将数据保存到数据库中
Frame_Configure_DB_Request = 0x04, //上传配置文件
}FrameType;
typedef struct tagFrameTail
@ -31,6 +36,8 @@ typedef struct tagFrameTail
unsigned char frame_delimiter[4] = { 0xEE,0xFF,0xEE,0xFF };
}FrameTail;
#define FRAME_HEADER_LENGTH (5) // frame_type(1) + frame_len(4)
typedef struct tagFrame
{
FrameType frame_type; //Ö¡ÀàÐÍ

View File

@ -155,7 +155,7 @@ int g2u(char* inbuf, size_t inlen, char* outbuf, size_t outlen)
bool GBKToUTF8(const std::string& strGBK,std::string& str_result)
{
int length = strGBK.size() * 2 + 1;
size_t length = strGBK.size() * 2 + 1;
char* temp = (char*)malloc(sizeof(char) * length);
@ -175,7 +175,7 @@ bool GBKToUTF8(const std::string& strGBK,std::string& str_result)
bool UTFtoGBK(const char* utf8, std::string& str_result)
{
int length = strlen(utf8);
size_t length = strlen(utf8);
char* temp = (char*)malloc(sizeof(char) * length);

View File

@ -118,6 +118,7 @@ void print_help()
int parse_confile(const char* confile)
{
#ifdef __linux__
int ret = g_conf_ctx.parser->LoadFromFile(confile);
if (ret != 0)
{
@ -158,25 +159,26 @@ int parse_confile(const char* confile)
logger_enable_fsync(hlog, hv_getboolean(str.c_str()));
}
#endif
#endif
// first log here
hlogi("=========--- Welcome to the Earth ---=========");
hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION);
hlog_fsync();
#if 0
#if 1
g_conf_ctx.worker_processes = 1;
g_conf_ctx.worker_threads = 1;
g_conf_ctx.host = "0.0.0.0";
g_conf_ctx.port = 44242;
g_conf_ctx.dbname = "hjems";
g_conf_ctx.dbuser = "root";
g_conf_ctx.dbserver = "tcp://127.0.0.1:3306";
g_conf_ctx.dbserver = "127.0.0.1";
#endif
// worker_processes
int worker_processes = 0;
#if 1
#if __linux__
#ifdef DEBUG
// Disable multi-processes mode for debugging
worker_processes = 0;
@ -338,7 +340,48 @@ int already_running(void)
#endif
/// end of checking only one instance
/// ////////////////////////////////////////////////
#ifdef _WIN32
int main(int argc, char** argv)
{
// g_main_ctx
main_ctx_init(argc, argv);
conf_ctx_init(&g_conf_ctx);
// first log here
hlog_set_level_by_str("DEBUG");
hlogi("=========--- Welcome to the Earth ---=========");
hlogi("%s version: %s", g_main_ctx.program_name, K22_VERSION);
hlog_fsync();
#if 1
g_conf_ctx.worker_processes = 1;
g_conf_ctx.worker_threads = 1;
g_conf_ctx.host = "0.0.0.0";
g_conf_ctx.port = 44242;
g_conf_ctx.dbname = "hjems";
g_conf_ctx.dbuser = "root";
g_conf_ctx.dbserver = "127.0.0.1";
#endif
#if TEST_UNPACK
memset(&unpack_setting, 0, sizeof(unpack_setting_t));
unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
unpack_setting.mode = UNPACK_BY_DELIMITER;
unpack_setting.delimiter[0] = 0xEE;
unpack_setting.delimiter[1] = 0xFF;
unpack_setting.delimiter[2] = 0xEE;
unpack_setting.delimiter[3] = 0xFF;
unpack_setting.delimiter_bytes = 4;
#endif
master_workers_run(worker_fn, (void*)(intptr_t)&g_conf_ctx, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads);
hlogi("=========--- I'll be back! ---=========");
return 0;
}
#else
int main(int argc, char** argv)
{
#if 0
@ -453,6 +496,7 @@ int main(int argc, char** argv)
return 0;
}
#endif
void worker_fn(void* userdata)
{
@ -460,7 +504,7 @@ void worker_fn(void* userdata)
long port = ptrCtx->port;
//initialize database connection
bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname);
bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,3306,ptrCtx->dbuser,"Hj57471000",ptrCtx->dbname);
if (!dbok)
{
hloge("failed to open database, exit now...");

View File

@ -1,10 +1,14 @@
#include "opmysql.h"
#ifndef _USING_MYSQL_51_LIB_
#include <mysql_driver.h>
#include <mysql_connection.h>
#include <cppconn/prepared_statement.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
#include <cppconn/exception.h>
#endif
#include <hv/hlog.h>
#include <sstream>
#include "openjson.h"
@ -26,8 +30,9 @@ OpDatabase* OpDatabase::getInstance()
return &m_instance;
}
bool OpDatabase::OpenDatabase(const std::string& server, const std::string& dbuser, const std::string& database)
bool OpDatabase::OpenDatabase(const std::string& server /*= "127.0.0.1"*/, int dbport /*= 3306*/, const std::string& dbuser/* = "root"*/, const std::string& dbpasswd /*= "Hj57471000"*/, const std::string& database /*= "hjems"*/)
{
#ifndef _USING_MYSQL_51_LIB_
// 打开数据库
try
{
@ -64,18 +69,46 @@ bool OpDatabase::OpenDatabase(const std::string& server, const std::string& dbus
hloge("Failed to connect to database: %s",ss.str().c_str());
return false;
}
#else
bool ok = false;
m_pDbConnection = mysql_init(NULL);
//Á¬½Óµ½MySQL·þÎñÆ÷
if (!mysql_real_connect(m_pDbConnection, server.c_str(), dbuser.c_str(), dbpasswd.c_str(), database.c_str(), dbport, NULL, 0))
{
hloge("MySQL connection error: %s", mysql_error(m_pDbConnection));
}
else
{
ok = true;
mysql_set_character_set(m_pDbConnection, "utf8");
}
if (!ok)
{
hloge("Failed to connect to database.");
}
else
{
hlogd("new database connection created successfully!");
}
return ok;
#endif
}
// 关闭数据库连接
void OpDatabase::CloseDatabase()
{
#ifdef _USING_MYSQL_51_LIB_
mysql_close(m_pDbConnection);
#else
m_pDbConnection->close();
//delete m_pDbConnection;
#endif
}
void OpDatabase::InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id)
{
#ifndef _USING_MYSQL_51_LIB_
const char* insertSql = "INSERT INTO tbl_data (data_timestamp, data_type, FsuCode, data_content, topic, device_id) VALUES (?,?,?,?,?,?);";
// 创建预编译的prepared statement
std::unique_ptr<sql::PreparedStatement> insertStmt(m_pDbConnection->prepareStatement(insertSql));
@ -100,4 +133,6 @@ void OpDatabase::InsertMessage(const std::string& ts, const std::string& msg_typ
{
hloge( "No rows affected during insert operation." );
}
#else
#endif
}

View File

@ -2,9 +2,18 @@
#define __MY_OPERATE_MYSQL__
#include <string>
#include <mysql_connection.h>
#ifdef _WIN32
# define _USING_MYSQL_51_LIB_
# include <WinSock2.h>
# include <mysql.h> //MySQL C API include file
#else
# ifdef __linux__
# include <mysql_connection.h>
# endif
#endif
class OpDatabase
{
protected:
@ -14,12 +23,18 @@ public:
static OpDatabase* getInstance();
void CloseDatabase();
bool OpenDatabase(const std::string& server = "tcp://127.0.0.1:3306", const std::string& dbuser = "root", const std::string& database="hjems");
bool OpenDatabase(const std::string& server = "127.0.0.1", int dbport = 3306, const std::string& dbuser = "root", const std::string& dbpasswd = "Hj57471000", const std::string& database = "hjems");
void InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id);
protected:
//std::unique_ptr<sql::Connection> m_pDbConnection;
sql::Connection* m_pDbConnection;
#ifdef _WIN32
MYSQL* m_pDbConnection;
#else
# ifdef __linux__
sql::Connection* m_pDbConnection;
# endif
#endif
private:
static OpDatabase m_instance;
};