From 3d1d223c03e0c64f45dd2c13e13d81e4c724d73f Mon Sep 17 00:00:00 2001 From: HwangKC Date: Wed, 29 May 2024 14:46:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=E4=B8=BA=E6=94=AF=E6=8C=81my?= =?UTF-8?q?sql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- applications/ems_datahubs/Makefile | 2 +- applications/ems_datahubs/Makefile.arm | 2 +- applications/ems_datahubs/datahubs.conf | 4 +- applications/ems_datahubs/main.cpp | 58 +- applications/ems_datahubs/openjson.cpp | 1431 +++++++++++++++++++++++ applications/ems_datahubs/openjson.h | 276 +++++ applications/ems_datahubs/opmysql.cpp | 102 ++ applications/ems_datahubs/opmysql.h | 27 + 8 files changed, 1886 insertions(+), 16 deletions(-) create mode 100644 applications/ems_datahubs/openjson.cpp create mode 100644 applications/ems_datahubs/openjson.h create mode 100644 applications/ems_datahubs/opmysql.cpp create mode 100644 applications/ems_datahubs/opmysql.h diff --git a/applications/ems_datahubs/Makefile b/applications/ems_datahubs/Makefile index 67318f8..33b5e4a 100644 --- a/applications/ems_datahubs/Makefile +++ b/applications/ems_datahubs/Makefile @@ -13,7 +13,7 @@ TARGET := datahubs #compile and lib parameter # CC := g++ -LIBS := -L/usr/local/lib -lhv -lsqlite3 -lopdb -lpthread -liconv -ldl -lz +LIBS := -L/usr/local/lib -lhv -lmysqlcppconn -lpthread -liconv -ldl -lz LDFLAGS := DEFINES := INCLUDE := -I. -I/usr/local/include diff --git a/applications/ems_datahubs/Makefile.arm b/applications/ems_datahubs/Makefile.arm index e486ab0..81bee17 100644 --- a/applications/ems_datahubs/Makefile.arm +++ b/applications/ems_datahubs/Makefile.arm @@ -14,7 +14,7 @@ TARGET := datahubs.arm # CC := arm-g++ CXX := arm-g++ -LIBS := -L/usr/local/arm/lib -lhv -lsqlite3 -lopdb -lpthread -liconv -ldl -lz +LIBS := -L/usr/local/arm/lib -lhv -lmysqlcppconn -lpthread -liconv -ldl -lz LDFLAGS := DEFINES := INCLUDE := -I. -I/usr/local/arm/include diff --git a/applications/ems_datahubs/datahubs.conf b/applications/ems_datahubs/datahubs.conf index 7859441..25df79c 100644 --- a/applications/ems_datahubs/datahubs.conf +++ b/applications/ems_datahubs/datahubs.conf @@ -13,4 +13,6 @@ worker_threads = auto host = 0.0.0.0 port = 44242 -dbfile = /home/hkc/workspace/projects/datahubs/ems.db +database = hjems +dbuser = root +dbserver = tcp://127.0.0.1:3306 diff --git a/applications/ems_datahubs/main.cpp b/applications/ems_datahubs/main.cpp index bbc621c..028385e 100644 --- a/applications/ems_datahubs/main.cpp +++ b/applications/ems_datahubs/main.cpp @@ -16,8 +16,8 @@ #include #include "mqtt_msg.h" -#include -#include +#include "openjson.h" +#include "opmysql.h" #include "iconv-utils.h" @@ -181,7 +181,7 @@ int DecompressString(const char* in_str, size_t in_len, std::string& out_str) } -static OpDatabase gOpDatabase; +//static OpDatabase gOpDatabase; #if TEST_UNPACK static unpack_setting_t unpack_setting; @@ -208,7 +208,9 @@ typedef struct conf_ctx_s int worker_threads; std::string host; int port; - std::string db_file; + std::string dbserver; + std::string dbuser; + std::string dbname; } conf_ctx_t; conf_ctx_t g_conf_ctx; @@ -219,6 +221,9 @@ inline void conf_ctx_init(conf_ctx_t* ctx) ctx->worker_processes = 0; ctx->worker_threads = 0; ctx->port = 0; + ctx->dbname = "hjems"; + ctx->dbuser = "root"; + ctx->dbserver = "tcp://127.0.0.1:3306"; } static void print_version(); @@ -380,17 +385,41 @@ int parse_confile(const char* confile) g_conf_ctx.port = port; - str = g_conf_ctx.parser->GetValue("dbfile"); + str = g_conf_ctx.parser->GetValue("database"); if (str.size() != 0) { - g_conf_ctx.db_file = str; + g_conf_ctx.dbname = str; } else { - g_conf_ctx.db_file = "./ems.db"; + g_conf_ctx.dbname = "hjems"; } - hlogi("database = ('%s')", g_conf_ctx.db_file.c_str()); + hlogi("database = ('%s')", g_conf_ctx.dbname.c_str()); + + str = g_conf_ctx.parser->GetValue("dbuser"); + if (str.size() != 0) + { + g_conf_ctx.dbuser = str; + } + else + { + g_conf_ctx.dbuser = "root"; + } + + hlogi("dbuser = ('%s')", g_conf_ctx.dbuser.c_str()); + + str = g_conf_ctx.parser->GetValue("dbserver"); + if (str.size() != 0) + { + g_conf_ctx.dbserver = str; + } + else + { + g_conf_ctx.dbserver = "tcp://127.0.0.1:3306"; + } + + hlogi("dbserver = ('%s')", g_conf_ctx.dbserver.c_str()); hlogi("parse_confile('%s') OK", confile); return 0; @@ -809,7 +838,7 @@ static void on_recv(hio_t* io, void* buf, int readbytes) return; } - hlogd("<<<>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str()); @@ -834,7 +863,7 @@ static void on_recv(hio_t* io, void* buf, int readbytes) auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode["OID"].s(); - gOpDatabase.InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } if (msg_type == "web-read") @@ -850,7 +879,7 @@ static void on_recv(hio_t* io, void* buf, int readbytes) auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode.s(); - gOpDatabase.InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); + OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } //delete[] pTmp; } @@ -879,11 +908,10 @@ static void on_accept(hio_t* io) void worker_fn(void* userdata) { conf_ctx_t* ptrCtx = (conf_ctx_t*)(intptr_t)(userdata); - //long port = (long)(intptr_t)(userdata); long port = ptrCtx->port; //initialize database connection - bool dbok = gOpDatabase.OpenDatabase(ptrCtx->db_file); + bool dbok = OpDatabase::getInstance()->OpenDatabase(ptrCtx->dbserver,ptrCtx->dbuser,ptrCtx->dbname); if (!dbok) { hloge("failed to open database, exit now..."); @@ -905,5 +933,9 @@ void worker_fn(void* userdata) hlogi("port=%ld pid=%ld tid=%ld listenfd=%d", port, hv_getpid(), hv_gettid(), hio_fd(listenio)); hloop_run(loop); + + hlogi("database connection close!"); + OpDatabase::getInstance()->CloseDatabase(); + hloop_free(&loop); } diff --git a/applications/ems_datahubs/openjson.cpp b/applications/ems_datahubs/openjson.cpp new file mode 100644 index 0000000..1e3241b --- /dev/null +++ b/applications/ems_datahubs/openjson.cpp @@ -0,0 +1,1431 @@ +/*************************************************************************** + * Copyright (C) 2023-, openlinyou, + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + ***************************************************************************/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "openjson.h" + + +#define PRINTF printf +#if (defined(_MSC_VER) && (_MSC_VER >= 1400 )) +inline int SNPRINTF(char* buffer, size_t size, const char* format, ...) +{ + va_list va; + va_start(va, format); + int result = vsnprintf_s(buffer, size, _TRUNCATE, format, va); + va_end(va); + return result; +} +#define SSCANF sscanf_s +#else +#define SNPRINTF snprintf +#define SSCANF sscanf +#endif + +static inline void doubleToStr(double v, char* buffer, int size) +{ + double tmp = floor(v); + if (tmp == v) + SNPRINTF(buffer, size, "%ld", (long)v); + else + SNPRINTF(buffer, size, "%g", v); +} + +static inline bool strToDouble(const char* str, double* value) +{ + return SSCANF(str, "%lf", value) == 1 ? true : false; +} + +static void int32ToStr(int32_t n, char* str, size_t size) +{ + if (str == 0 || size < 1) return; + str[size - 1] = 0; + if (size < 2) return; + if (n == 0) + { + str[0] = '0'; + return; + } + size_t i = 0; + char buf[128] = { 0 }; + int32_t tmp = n < 0 ? -n : n; + while (tmp && i < 128) + { + buf[i++] = (tmp % 10) + '0'; + tmp = tmp / 10; + } + size_t len = n < 0 ? ++i : i; + if (len > size) + { + len = size; + i = len - 1; + } + str[i] = 0; + while (1) + { + --i; + if (i < 0 || buf[len - i - 1] == 0) break; + str[i] = buf[len - i - 1]; + } + if (i == 0) str[i] = '-'; +} + +static void int64ToStr(int64_t n, char* str, size_t size) +{ + if (str == 0 || size < 1) return; + str[size - 1] = 0; + if (size < 2) return; + if (n == 0) + { + str[0] = '0'; + return; + } + size_t i = 0; + char buf[128] = { 0 }; + int64_t tmp = n < 0 ? -n : n; + while (tmp && i < 128) + { + buf[i++] = (tmp % 10) + '0'; + tmp = tmp / 10; + } + size_t len = n < 0 ? ++i : i; + if (len > size) + { + len = size; + i = len - 1; + } + str[i] = 0; + while (1) + { + --i; + if (i < 0 || buf[len - i - 1] == 0) break; + str[i] = buf[len - i - 1]; + } + if (i == 0) str[i] = '-'; +} + +static int32_t strToInt32(const char* str) +{ + const char* ptr = str; + if (*ptr == '-' || *ptr == '+') ptr++; + int32_t tmp = 0; + while (*ptr != 0) + { + if ((*ptr < '0') || (*ptr > '9')) break; + tmp = tmp * 10 + (*ptr - '0'); + ptr++; + } + if (*str == '-') tmp = -tmp; + return tmp; +} + +static int64_t strToInt64(const char* str) +{ + const char* ptr = str; + if (*ptr == '-' || *ptr == '+') ptr++; + int64_t temp = 0; + while (*ptr != 0) + { + if ((*ptr < '0') || (*ptr > '9')) break; + temp = temp * 10 + (*ptr - '0'); + ptr++; + } + if (*str == '-') temp = -temp; + return temp; +} + + +//JsonBox +OpenJson::Box::Box() +{ +} + +OpenJson::Box::~Box() +{ + for (size_t i = 0; i < childs_.size(); i++) + { + if (childs_[i]) + { + delete childs_[i]; + } + } + childs_.clear(); +} + +bool OpenJson::Box::remove(OpenJson* node) +{ + if (!node) return false; + std::vector::iterator iter; + for (iter = childs_.begin(); iter != childs_.end(); iter++) + { + if (*iter == node) + { + childs_.erase(iter); + delete node; + return true; + } + } + return false; +} + +//JsonContext +OpenJson::Context::Context() + : + offset_(0), + data_(0), + root_(0), + size_(0) +{ +} + +OpenJson::Context::~Context() +{ +} + +void OpenJson::Context::startRead() +{ + size_ = rbuffer_.size(); + data_ = (char*)rbuffer_.data(); + offset_ = 0; +} + +void OpenJson::Context::startWrite() +{ + wbuffer_.clear(); +} + +//Segment +OpenJson::Segment::Segment(SegmentType type) +{ + setType(type); +} +OpenJson::Segment::~Segment() +{ +} +void OpenJson::Segment::setType(SegmentType type) +{ + type_ = type; + value_.int64_ = 0; +} +void OpenJson::Segment::clear() +{ + value_.int64_ = 0; +} + +void OpenJson::Segment::toString() +{ + switch (type_) + { + case NIL: + content_ = "null"; + break; + case BOOL: + content_ = value_.bool_ ? "true" : "false"; + break; + case INT32: + { + char buffer[64] = { 0 }; + int32ToStr(value_.int32_, buffer, sizeof(buffer)); + content_ = buffer; + } + break; + case INT64: + { + char buffer[64] = { 0 }; + int64ToStr(value_.int64_, buffer, sizeof(buffer)); + content_ = buffer; + } + break; + case DOUBLE: + { + char buffer[64] = { 0 }; + doubleToStr(value_.double_, buffer, sizeof(buffer)); + content_ = buffer; + } + break; + case STRING: + break; + default: + content_.clear(); + break; + } +} + +//OpenJson +bool OpenJson::EnableLog_ = true; + +OpenJson OpenJson::NodeNull; +std::string OpenJson::StringNull; + +OpenJson::OpenJson(JsonType type) + :type_(type), + context_(0), + wcontext_(0), + box_(0), + idx_(0), + key_(0), + len_(0), + segment_(0) +{ +} + +OpenJson::~OpenJson() +{ + clear(); +} + +OpenJson* OpenJson::createNode(unsigned char code) +{ + JsonType ctype = UNKNOWN; + switch (code) + { + case '"': + case '\'': + ctype = STRING; + break; + case '{': + ctype = OBJECT; + break; + case '[': + ctype = ARRAY; + break; + default: + ctype = NUMBER; + break; + } + OpenJson* node = new OpenJson(ctype); + return node; +} + +OpenJson::JsonType OpenJson::codeToType(unsigned char code) +{ + JsonType ctype = UNKNOWN; + switch (code) + { + case '"': + case '\'': + ctype = STRING; + break; + case '{': + ctype = OBJECT; + break; + case '[': + ctype = ARRAY; + break; + default: + ctype = NUMBER; + break; + } + return ctype; +} + +const std::string& OpenJson::emptyString() +{ + if (context_) + { + context_->stringNull_.clear(); + return context_->stringNull_; + } + if (wcontext_) + { + wcontext_->stringNull_.clear(); + return wcontext_->stringNull_; + } + return OpenJson::StringNull; +} + +const std::string& OpenJson::key() +{ + if (key_) return key_->s(); + return emptyString(); +} + +const char* OpenJson::data() +{ + if (context_ && context_->data_) + { + if (idx_ < context_->size_) + { + return context_->data_ + idx_; + } + } + Log("JsonNode is Empty"); + return emptyString().c_str(); +} + +double OpenJson::stringToDouble() +{ + const char* str = data(); + double dval = 0; + if (!str || strlen(str) == 0) + dval = (float)(1e+300 * 1e+300) * 0.0F; + else if (strcmp(str, "true") == 0) + dval = 1.0; + else if (strcmp(str, "false") == 0) + dval = 0.0; + else + dval = atof(str); + return dval; +} + +int32_t OpenJson::stringToInt32() +{ + int32_t ret = atoi(data()); + return ret; +} + +int64_t OpenJson::stringToInt64() +{ + int64_t ret = atoll(data()); + return ret; +} + +const std::string& OpenJson::s() +{ + if (type_ == STRING) + { + if (!segment_) + { + segment_ = new Segment(Segment::STRING); + segment_->content_ = data(); + } + if (segment_->type_ == Segment::STRING) + { + return segment_->content_; + } + segment_->toString(); + return segment_->content_; + } + else if (type_ == NUMBER) + { + Log("JsonNode is no STRING"); + if (!segment_) + { + if (!context_ || !context_->data_ || len_ < 1) + { + return emptyString(); + } + segment_ = new Segment(Segment::NIL); + segment_->content_ = data(); + return segment_->content_; + } + if (segment_) + { + if (segment_->type_ != Segment::NIL) + { + segment_->toString(); + } + return segment_->content_; + } + } + else + { + Log("JsonNode is no STRING"); + } + return emptyString(); +} + +double OpenJson::d(double def) +{ + if (type_ != NUMBER) + { + Log("JsonNode is no NUMBER"); + return def; + } + if (segment_ == 0) + { + if (!context_ || !context_->data_ || len_ < 1) + { + return def; + } + segment_ = new Segment(Segment::DOUBLE); + segment_->value_.double_ = stringToDouble(); + } + if (segment_->type_ != Segment::DOUBLE) + { + if (!context_ || !context_->data_ || len_ < 1) + { + Log("JsonNode is no DOUBLE NUMBER"); + } + else + { + segment_->setType(Segment::DOUBLE); + segment_->value_.double_ = stringToDouble(); + } + } + switch (segment_->type_) + { + case OpenJson::Segment::BOOL: + return segment_->value_.bool_; + case OpenJson::Segment::INT32: + return (double)segment_->value_.int32_; + case OpenJson::Segment::INT64: + return (double)segment_->value_.int64_; + case OpenJson::Segment::DOUBLE: + return segment_->value_.double_; + case OpenJson::Segment::STRING: + return atof(segment_->content_.c_str()); + default: + break; + } + return def; +} + +bool OpenJson::b(bool def) +{ + if (type_ != NUMBER) + { + Log("JsonNode is no NUMBER"); + return def; + } + if (segment_ == 0) + { + if (!context_ || !context_->data_ || len_ < 1) + { + return def; + } + segment_ = new Segment(Segment::BOOL); + segment_->value_.bool_ = stringToDouble() != 0 ? true : false; + } + if (segment_->type_ != Segment::BOOL) + { + if (!context_ || !context_->data_ || len_ < 1) + { + Log("JsonNode is no BOOL NUMBER"); + } + else + { + segment_->setType(Segment::BOOL); + segment_->value_.bool_ = stringToDouble() != 0 ? true : false; + } + } + switch (segment_->type_) + { + case OpenJson::Segment::BOOL: + return segment_->value_.bool_; + case OpenJson::Segment::INT32: + return (bool)segment_->value_.int32_; + case OpenJson::Segment::INT64: + return (bool)segment_->value_.int64_; + case OpenJson::Segment::DOUBLE: + return (bool)segment_->value_.double_; + case OpenJson::Segment::STRING: + return segment_->content_.size() > 0; + default: + break; + } + return def; +} + +int32_t OpenJson::i32(int32_t def) +{ + if (type_ != NUMBER) + { + Log("JsonNode is no NUMBER"); + return def; + } + if (segment_ == 0) + { + if (!context_ || !context_->data_ || len_ < 1) + { + return def; + } + segment_ = new Segment(Segment::INT32); + segment_->value_.int32_ = stringToInt32(); + } + if (segment_->type_ != Segment::INT32) + { + if (!context_ || !context_->data_ || len_ < 1) + { + Log("JsonNode is no INT32 NUMBER"); + } + else + { + segment_->setType(Segment::INT32); + segment_->value_.int32_ = stringToInt32(); + } + } + switch (segment_->type_) + { + case OpenJson::Segment::BOOL: + return segment_->value_.bool_; + case OpenJson::Segment::INT32: + return segment_->value_.int32_; + case OpenJson::Segment::INT64: + return (int32_t)segment_->value_.int64_; + case OpenJson::Segment::DOUBLE: + return (int32_t)segment_->value_.double_; + case OpenJson::Segment::STRING: + return atoi(segment_->content_.c_str()); + default: + break; + } + return def; +} + +int64_t OpenJson::i64(int64_t def) +{ + if (type_ != NUMBER) + { + Log("JsonNode is no NUMBER"); + return def; + } + if (segment_ && segment_->type_ == Segment::NIL) + { + delete segment_; + segment_ = 0; + } + if (segment_ == 0) + { + if (!context_ || !context_->data_ || len_ < 1) + { + return def; + } + segment_ = new Segment(Segment::INT64); + segment_->value_.int64_ = stringToInt64(); + } + if (segment_->type_ != Segment::INT64) + { + Log("JsonNode is no INT64 NUMBER"); + } + switch (segment_->type_) + { + case OpenJson::Segment::BOOL: + return segment_->value_.bool_; + case OpenJson::Segment::INT32: + return segment_->value_.int32_; + case OpenJson::Segment::INT64: + return segment_->value_.int64_; + case OpenJson::Segment::DOUBLE: + return (int64_t)segment_->value_.double_; + case OpenJson::Segment::STRING: + return atoll(segment_->content_.c_str()); + default: + break; + } + return def; +} + +void OpenJson::operator=(const std::string& val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != STRING) type_ = STRING; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::STRING); + //const char* ptr = 0; + for (size_t i = 0; i < val.size(); ++i) + { + if (val[i] == '"' || val[i] == '\'') + { + segment_->content_.push_back('\\'); + } + segment_->content_.push_back(val[i]); + } +} + +void OpenJson::operator=(const char* val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != STRING) type_ = STRING; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::STRING); + segment_->content_.clear(); + const char* ptr = 0; + for (size_t i = 0; i < strlen(val); ++i) + { + ptr = val + i; + if (*ptr == '"' || *ptr == '\'') + { + segment_->content_.push_back('\\'); + } + segment_->content_.push_back(*ptr); + } +} + +void OpenJson::operator=(bool val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::BOOL); + segment_->value_.bool_ = val; +} + +void OpenJson::operator=(int32_t val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::INT32); + segment_->value_.int32_ = val; +} + +void OpenJson::operator=(uint32_t val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::INT32); + segment_->value_.int32_ = val; +} + +void OpenJson::operator=(int64_t val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::INT64); + segment_->value_.int64_ = val; +} + +void OpenJson::operator=(uint64_t val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container, not element"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::INT64); + segment_->value_.int64_ = val; +} + +void OpenJson::operator=(double val) +{ + if (type_ == OBJECT || type_ == ARRAY) + { + Log("JsonNode is a container"); + return; + } + if (type_ != NUMBER) type_ = NUMBER; + if (segment_ == 0) segment_ = new Segment; + segment_->setType(Segment::DOUBLE); + segment_->value_.double_ = val; +} + +OpenJson& OpenJson::array(size_t idx) +{ + if (type_ != ARRAY) + { + if (type_ == OBJECT) + { + Log("JsonNode must be ARRAY, not OBJECT"); + } + type_ = ARRAY; + } + else + { + assert(box_); + } + if (!box_) box_ = new Box; + if (idx >= box_->childs_.size()) + { + box_->childs_.resize(idx + 1, 0); + } + OpenJson* child = box_->childs_[idx]; + if (!child) + { + child = new OpenJson(); + box_->childs_[idx] = child; + } + return *child; +} + +OpenJson& OpenJson::object(const char* key) +{ + if (!key) + { + return NodeNull; + } + if (type_ != OBJECT) + { + if (type_ == ARRAY) + { + Log("JsonNode must be OBJECT, not ARRAY"); + } + type_ = OBJECT; + } + else + { + assert(box_); + } + if (!box_) box_ = new Box; + + OpenJson* child = 0; + for (size_t i = 0; i < box_->childs_.size(); ++i) + { + child = box_->childs_[i]; + if (child == 0) continue; + if (strcmp(child->key().c_str(), key) == 0) + { + return *child; + } + } + OpenJson* keyNode = new OpenJson(STRING); + *keyNode = key; + child = new OpenJson(); + child->key_ = keyNode; + size_t i = 0; + for (; i < box_->childs_.size(); ++i) + { + if (!box_->childs_[i]) + { + box_->childs_[i] = child; + break; + } + } + if (i >= box_->childs_.size()) + { + box_->childs_.push_back(child); + } + return *child; +} + +void OpenJson::addNode(OpenJson* node) +{ + if (!node) return; + if (type_ != OBJECT && type_ != ARRAY) + { + Log("JsonNode must be OBJECT or ARRAY"); + type_ = node->key_ ? OBJECT : ARRAY; + } + if (box_ == 0) box_ = new Box; + box_->add(node); +} + +void OpenJson::removeNode(size_t idx) +{ + if (box_ == 0) return; + if (idx >= box_->childs_.size()) return; + box_->remove(box_->childs_[idx]); +} + +void OpenJson::removeNode(const char* key) +{ + if (box_ == 0) return; + OpenJson* child = 0; + for (size_t i = 0; i < box_->childs_.size(); ++i) + { + child = box_->childs_[i]; + if (child == 0) continue; + if (strcmp(child->key().c_str(), key) == 0) + { + box_->remove(child); + break; + } + } +} + +void OpenJson::clear() +{ + if (segment_) + { + delete segment_; + segment_ = 0; + } + if (key_) + { + delete key_; + key_ = 0; + } + if (box_) + { + assert(type_ == OBJECT || type_ == ARRAY); + delete box_; + box_ = 0; + } + if (context_ != 0 && context_->root_ == this) + { + context_->root_ = 0; + delete context_; + } + context_ = 0; + if (wcontext_ != 0 && wcontext_->root_ == this) + { + wcontext_->root_ = 0; + delete wcontext_; + } + wcontext_ = 0; + type_ = EMPTY; + idx_ = 0; + len_ = 0; +} + +void OpenJson::trimSpace() +{ + if (!context_) return; + char code = 0; + for (size_t i = idx_; i < context_->size_; ++i) + { + code = context_->data_[i]; + if (code > ' ') + { + idx_ = i; + break; + } + } +} + +unsigned char OpenJson::getCharCode() +{ + if (!context_) return 0; + if (idx_ < context_->size_) + { + unsigned char tmp = (unsigned char)context_->data_[idx_]; + return tmp; + } + return 0; +} + +unsigned char OpenJson::getChar() +{ + unsigned char code = getCharCode(); + if (code <= ' ') + { + trimSpace(); + code = getCharCode(); + } + return code; +} + +unsigned char OpenJson::checkCode(unsigned char charCode) +{ + unsigned char code = getCharCode(); + if (code != charCode) + { + trimSpace(); + code = getCharCode(); + if (code != charCode) return 0; + } + ++idx_; + return code; +} + +size_t OpenJson::searchCode(unsigned char code) +{ + char* data = context_->data_; + for (size_t i = idx_; i < context_->size_; i++) + { + if (data[i] == code) + { + if (i > 0 && data[i - 1] != '\\') return i; + } + } + return -1; +} + +bool OpenJson::makeRContext() +{ + if (type_ != EMPTY) + { + if (context_ && context_->root_ != this) + { + PRINTF("OpenJson warn:JsonNode is no root or empty!"); + return false; + } + } + else + { + if (context_ && context_->root_ != this) + { + PRINTF("OpenJson warn:JsonNode is no root or empty!"); + return false; + }; + } + clear(); + context_ = new Context(); + context_->root_ = this; + context_->offset_ = 0; + context_->rbuffer_.clear(); + return true; +} + +bool OpenJson::decode(const std::string& buffer) +{ + if (!makeRContext()) return false; + context_->rbuffer_ = buffer; + context_->startRead(); + type_ = codeToType(getChar()); + try + { + read(context_, true); + } + catch (const char* error) + { + PRINTF("OpenJson warn:decode catch exception %s", error); + } + return true; +} + +bool OpenJson::decodeFile(const std::string& filePath) +{ + if (!makeRContext()) return false; + FILE* fp = 0; +#ifdef _MSC_VER + fopen_s(&fp, filePath.c_str(), "rb"); +#else + fp = fopen(filePath.c_str(), "rb"); +#endif + if (fp == 0) + { +#ifdef _MSC_VER + char buffer[1024] = { 0 }; + strerror_s(buffer, sizeof(buffer), errno); + PRINTF("OpenJson warn:decodeFile error:%s,%s\n", buffer, filePath.c_str()); +#else + PRINTF("OpenJson warn:decodeFile error:%s,%s\n", strerror(errno), filePath.c_str()); +#endif + return false; + } + fseek(fp, 0, SEEK_END); + /*size_t size = */ + ftell(fp); + fseek(fp, 0, SEEK_SET); + + size_t ret = 0; + char buff[1024 * 8] = { 0 }; + while (true) + { + ret = fread(buff, 1, sizeof(buff), fp); + if (ret < 0) + { +#ifdef _MSC_VER + char buffer[1024] = { 0 }; + strerror_s(buffer, sizeof(buffer), errno); + PRINTF("OpenJson warn:decodeFile error:%s,%s\n", buffer, filePath.c_str()); +#else + PRINTF("OpenJson warn:decodeFile error:%s,%s\n", strerror(errno), filePath.c_str()); +#endif + fclose(fp); + return false; + } + else if(ret == 0) break; + context_->rbuffer_.append(buff, ret); + } + fclose(fp); + + context_->startRead(); + type_ = codeToType(getChar()); + try + { + read(context_, true); + } + catch (const char* error) + { + PRINTF("OpenJson warn:decodeFile catch exception %s", error); + } + return true; +} + +const std::string& OpenJson::encode() +{ + if (wcontext_ == 0) + { + wcontext_ = new Context(); + wcontext_->root_ = this; + } + wcontext_->startWrite(); + write(wcontext_, true); + return wcontext_->wbuffer_; +} + +void OpenJson::encodeFile(const std::string& filePath) +{ + FILE* fp = 0; +#ifdef _MSC_VER + fopen_s(&fp, filePath.c_str(), "wb"); +#else + fp = fopen(filePath.c_str(), "wb"); +#endif + if (fp == 0) + { +#ifdef _MSC_VER + char buffer[1024] = { 0 }; + strerror_s(buffer, sizeof(buffer), errno); + PRINTF("OpenJson warn:encodeFile error:%s,%s\n", buffer, filePath.c_str()); +#else + PRINTF("OpenJson warn:encodeFile error:%s,%s\n", strerror(errno), filePath.c_str()); +#endif + return; + } + fseek(fp, 0, SEEK_SET); + const std::string& buffer = encode(); + fwrite(buffer.data(), buffer.size(), 1, fp); + fclose(fp); +} + +void OpenJson::read(Context* context, bool isRoot) +{ + if (context_) + { + if (isRoot) + { + assert(context_ == context); + assert(context_->root_ == this); + } + else + { + assert(context_->root_ != this); + if (context_->root_ == this) return; + } + } + len_ = 0; + context_ = context; + idx_ = context->offset_; + switch (type_) + { + case EMPTY: + break; + case STRING: + readString(); + break; + case NUMBER: + readNumber(); + break; + case OBJECT: + readObject(); + break; + case ARRAY: + readArray(); + break; + case UNKNOWN: + break; + default: + break; + } +} +void OpenJson::readNumber() +{ + assert(type_ == NUMBER); + unsigned char code = 0; + size_t sidx = idx_; + size_t len = context_->size_; + char* data = context_->data_; + for (; idx_ < len; idx_++) + { + code = data[idx_]; + if (code == ',' || code == '}' || code == ']') + { + idx_--; + break; + } + } + if (idx_ < sidx) + { + throwError("lost number value"); + return; + } + len_ = idx_ - sidx + 1; + idx_ = sidx; +} +void OpenJson::readString() +{ + assert(type_ == STRING); + unsigned char code = '"'; + if (!checkCode(code)) + { + code = '\''; + if (!checkCode(code)) + { + throwError("lost '\"' or \"'\""); + return; + } + } + size_t sidx = idx_; + size_t eidx = searchCode(code); + if (eidx < 0) + { + throwError("lost '\"' or \"'\""); + return; + } + idx_ = sidx; + len_ = eidx - sidx + 1; + context_->data_[eidx] = 0; +} +void OpenJson::readObject() +{ + assert(type_ == OBJECT); + if (!checkCode('{')) + { + throwError("lost '{'"); + return; + } + unsigned char code = 0; + OpenJson* keyNode = 0; + OpenJson* valNode = 0; + size_t oidx = idx_; + while (idx_ < context_->size_) + { + code = getChar(); + if (code == 0) + { + throwError("lost '}'"); + return; + } + if (checkCode('}')) break; + keyNode = createNode(code); + if (keyNode->type_ != STRING) + { + throwError("lost key"); + return; + } + context_->offset_ = idx_; + keyNode->read(context_); + idx_ = keyNode->idx_ + keyNode->len_; + if (!checkCode(':')) + { + throwError("lost ':'"); + return; + } + code = getChar(); + valNode = createNode(code); + valNode->key_ = keyNode; + context_->offset_ = idx_; + valNode->read(context_); + idx_ = valNode->idx_ + valNode->len_; + addNode(valNode); + + if (checkCode('}')) + { + context_->data_[idx_ - 1] = 0; + break; + } + if (!checkCode(',')) + { + throwError("lost ','"); + return; + } + context_->data_[idx_ - 1] = 0; + } + len_ = idx_ - oidx; + idx_ = oidx; +} +void OpenJson::readArray() +{ + assert(type_ == ARRAY); + if (!checkCode('[')) + { + throwError("lost '['"); + return; + } + unsigned char code = 0; + OpenJson* valNode = 0; + size_t oidx = idx_; + while (idx_ < context_->size_) + { + code = getChar(); + if (code == 0) + { + throwError("lost ']'"); + return; + } + if (checkCode(']')) break; + valNode = createNode(code); + context_->offset_ = idx_; + valNode->read(context_); + idx_ = valNode->idx_ + valNode->len_; + addNode(valNode); + + if (checkCode(']')) + { + context_->data_[idx_ - 1] = 0; + break; + } + if (!checkCode(',')) + { + throwError("lost ','"); + return; + } + context_->data_[idx_ - 1] = 0; + } + len_ = idx_ - oidx; + idx_ = oidx; +} + +void OpenJson::write(Context* context, bool isRoot) +{ + if (wcontext_) + { + if (isRoot) + { + assert(wcontext_ == context); + assert(wcontext_->root_ == this); + } + else + { + assert(wcontext_->root_ != this); + if (wcontext_->root_ == this) return; + } + } + wcontext_ = context; + switch (type_) + { + case EMPTY: + break; + case STRING: + writeString(); + break; + case NUMBER: + writeNumber(); + break; + case OBJECT: + writeObject(); + break; + case ARRAY: + writeArray(); + break; + case UNKNOWN: + break; + default: + break; + } +} +void OpenJson::writeNumber() +{ + assert(type_ == NUMBER); + if (key_) + { + wcontext_->wbuffer_.append("\"" + key() + "\":"); + } + if (segment_) + { + segment_->toString(); + wcontext_->wbuffer_.append(segment_->content_); + } + else + { + wcontext_->wbuffer_.append(data()); + } +} +void OpenJson::writeString() +{ + assert(type_ == STRING); + if (key_) + { + wcontext_->wbuffer_.append("\"" + key() + "\":"); + } + wcontext_->wbuffer_.append("\"" + s() + "\""); +} +void OpenJson::writeObject() +{ + assert(type_ == OBJECT); + if (key_) + wcontext_->wbuffer_.append("\"" + key() + "\":{"); + else + wcontext_->wbuffer_.append("{"); + + if (box_ != 0) + { + size_t idx = 0; + size_t size = box_->size(); + for (size_t i = 0; i < size; ++i) + { + if (!(*box_)[i]) continue; + if (idx > 0) + { + wcontext_->wbuffer_.append(","); + } + (*box_)[i]->write(wcontext_); + ++idx; + } + } + wcontext_->wbuffer_.append("}"); +} +void OpenJson::writeArray() +{ + assert(type_ == ARRAY); + if (key_) + wcontext_->wbuffer_.append("\"" + key() + "\":["); + else + wcontext_->wbuffer_.append("["); + + if (box_ != 0) + { + size_t idx = 0; + size_t size = box_->size(); + for (size_t i = 0; i < size; ++i) + { + if (!(*box_)[i]) continue; + if (idx > 0) + { + wcontext_->wbuffer_.append(","); + } + (*box_)[i]->write(wcontext_); + ++idx; + } + } + wcontext_->wbuffer_.append("]"); +} + +void OpenJson::EnableLog(bool enable) +{ + EnableLog_ = enable; +} + +void OpenJson::Log(const char* format, ...) +{ + if (!EnableLog_) return; + va_list ap; + va_start(ap, format); + char tmp[1024] = { 0 }; + vsnprintf(tmp, sizeof(tmp), format, ap); + va_end(ap); + PRINTF("OpenJson WARN:%s\n", tmp); +} +void OpenJson::throwError(const char* errMsg) +{ + static const char* InfoTags[6] = { "EMPTY", "STRING", "NUMBER", "OBJECT", "ARRAY", "UNKNOWN" }; + size_t len = sizeof(InfoTags) / sizeof(InfoTags[0]); + const char* tab = type_ < len ? InfoTags[type_] : InfoTags[5]; + PRINTF("OpenJson:throwError [%s] Error: %s\n", tab, errMsg); + + char tmp[126] = { 0 }; + len = context_->size_ - context_->offset_; + len = len > 64 ? 64 : len; + memcpy(tmp, context_->data_ + idx_, len); + PRINTF("OpenJson:throwError content:%s\n", tmp); + throw errMsg; +} + diff --git a/applications/ems_datahubs/openjson.h b/applications/ems_datahubs/openjson.h new file mode 100644 index 0000000..ff27449 --- /dev/null +++ b/applications/ems_datahubs/openjson.h @@ -0,0 +1,276 @@ +/*************************************************************************** + * Copyright (C) 2023-, openlinyou, + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + ***************************************************************************/ + +#ifndef HEADER_OPEN_JSON_H +#define HEADER_OPEN_JSON_H + +#include +#include +#include +#include + + +#ifdef _MSC_VER +#pragma warning( disable: 4251 ) +# if _MSC_VER >= 1600 || defined(__MINGW32__) +# else +# if (_MSC_VER < 1300) +typedef signed char int8_t; +typedef signed short int16_t; +typedef signed int int32_t; +typedef unsigned char uint8_t; +typedef unsigned short uint16_t; +typedef unsigned int uint32_t; +# else +typedef signed __int8 int8_t; +typedef signed __int16 int16_t; +typedef signed __int32 int32_t; +typedef unsigned __int8 uint8_t; +typedef unsigned __int16 uint16_t; +typedef unsigned __int32 uint32_t; +# endif +typedef signed __int64 int64_t; +typedef unsigned __int64 uint64_t; +# endif +#endif // _MSC_VER + +//#pragma warning(disable:4100) + +class OpenJson +{ + enum JsonType + { + EMPTY = 0, + STRING, + NUMBER, + OBJECT, + ARRAY, + UNKNOWN + }; + class Box + { + std::vector childs_; + public: + Box(); + ~Box(); + inline void clear() + { + childs_.clear(); + } + inline bool empty() + { + return childs_.empty(); + } + inline size_t size() + { + return childs_.size(); + } + inline void add(OpenJson* node) + { + childs_.push_back(node); + } + inline OpenJson* operator[](size_t idx) + { + return childs_[idx]; + } + bool remove(OpenJson* node); + friend class OpenJson; + }; + class Context + { + char* data_; + size_t size_; + size_t offset_; + + OpenJson* root_; + std::string rbuffer_; + std::string wbuffer_; + + std::string stringNull_; + public: + Context(); + ~Context(); + void startRead(); + void startWrite(); + friend class OpenJson; + }; + class Segment + { + public: + enum SegmentType + { + NIL = 0, + BOOL, + INT32, + INT64, + DOUBLE, + STRING + }; + SegmentType type_; + std::string content_; + union + { + bool bool_; + int32_t int32_; + int64_t int64_; + double double_; + } value_; + + Segment(SegmentType type = NIL); + ~Segment(); + + void clear(); + void toString(); + void setType(SegmentType type); + }; + + JsonType type_; + Context* context_; + Context* wcontext_; + size_t idx_; + size_t len_; + Box* box_; + OpenJson* key_; + Segment* segment_; + + void trimSpace(); + bool makeRContext(); + OpenJson* createNode(unsigned char code); + JsonType codeToType(unsigned char code); + unsigned char getCharCode(); + unsigned char getChar(); + unsigned char checkCode(unsigned char charCode); + size_t searchCode(unsigned char charCode); + void throwError(const char* errMsg); + + const char* data(); + int32_t stringToInt32(); + int64_t stringToInt64(); + double stringToDouble(); + + OpenJson& array(size_t idx); + OpenJson& object(const char* key); + void addNode(OpenJson* node); + void removeNode(size_t idx); + void removeNode(const char* key); + OpenJson(OpenJson& /*json*/) {} + OpenJson(const OpenJson& /*json*/) {} + void operator=(OpenJson& /*json*/) {} + void operator=(const OpenJson& /*json*/) {} + + const std::string& emptyString(); + static OpenJson NodeNull; + static std::string StringNull; +public: + OpenJson(JsonType type = EMPTY); + ~OpenJson(); + + inline size_t size() + { + return box_ ? box_->size() : 0; + } + inline bool empty() + { + return box_ ? box_->empty() : true; + } + inline OpenJson& operator[] (int idx) + { + return array(idx); + } + inline OpenJson& operator[] (size_t idx) + { + return array(idx); + } + inline OpenJson& operator[] (const char* key) + { + return object(key); + } + inline OpenJson& operator[] (const std::string& key) + { + return object(key.c_str()); + } + + inline void remove(int idx) + { + removeNode(idx); + } + inline void remove(size_t idx) + { + removeNode(idx); + } + inline void remove(const char* key) + { + removeNode(key); + } + inline void remove(const std::string& key) + { + removeNode(key.c_str()); + } + void clear(); + + inline bool isNull() + { + return type_ == EMPTY; + } + inline bool isNumber() + { + return type_ == NUMBER; + } + inline bool isString() + { + return type_ == STRING; + } + inline bool isObject() + { + return type_ == OBJECT; + } + inline bool isArray() + { + return type_ == ARRAY; + } + + bool b(bool def = false); + int32_t i32(int32_t def = 0); + int64_t i64(int64_t def = 0); + double d(double def = 0); + const std::string& s(); + const std::string& key(); + + void operator=(bool val); + void operator=(int32_t val); + void operator=(uint32_t val); + void operator=(int64_t val); + void operator=(uint64_t val); + void operator=(double val); + void operator=(const char* val); + void operator=(const std::string& val); + + bool decode(const std::string& buffer); + bool decodeFile(const std::string& filePath); + const std::string& encode(); + void encodeFile(const std::string& filePath); + + static void EnableLog(bool enable); +private: + static bool EnableLog_; + static void Log(const char* format, ...); + void read(Context* context, bool isRoot = false); + void readNumber(); + void readString(); + void readObject(); + void readArray(); + + void write(Context* context, bool isRoot = false); + void writeNumber(); + void writeString(); + void writeObject(); + void writeArray(); +}; + + +#endif /* HEADER_OPEN_JSON_H */ diff --git a/applications/ems_datahubs/opmysql.cpp b/applications/ems_datahubs/opmysql.cpp new file mode 100644 index 0000000..6160bc0 --- /dev/null +++ b/applications/ems_datahubs/opmysql.cpp @@ -0,0 +1,102 @@ +#include "opmysql.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "openjson.h" + +OpDatabase OpDatabase::m_instance; + +OpDatabase::OpDatabase() + :m_pDbConnection(nullptr) +{ +} + +OpDatabase::~OpDatabase() +{ + CloseDatabase(); +} + +OpDatabase* OpDatabase::getInstance() +{ + return &m_instance; +} + +bool OpDatabase::OpenDatabase(const std::string& server, const std::string& dbuser, const std::string& database) +{ + // ݿ + try + { + // ݿ + //std::string server = "tcp://127.0.0.1:3306"; + //std::string dbuser = "root"; + std::string password = "Hj57471000"; + //std::string database = "hjems"; + + // + sql::mysql::MySQL_Driver* driver; + driver = sql::mysql::get_mysql_driver_instance(); + //m_pDbConnection.reset(driver->connect(server, dbuser, password)); + m_pDbConnection = driver->connect(server, dbuser, password); + + if (!m_pDbConnection) + { + hloge("Failed to connect to database."); + return false; + } + + // Ϊʹָݿ + m_pDbConnection->setSchema(database); + + return true; + } + catch (sql::SQLException& e) + { + std::ostringstream ss; + ss << "SQLException: " << e.what(); + ss<< " (MySQL error code: " << e.getErrorCode(); + ss << ", SQLState: " << e.getSQLState() << " )"; + hloge("Failed to connect to database: %s",ss.str().c_str()); + return false; + } +} + +// رݿ +void OpDatabase::CloseDatabase() +{ + //m_pDbConnection->close(); + //delete m_pDbConnection; +} + + +void OpDatabase::InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id) +{ + const char* insertSql = "INSERT INTO tbl_data (data_timestamp, data_type, FsuCode, data_content, topic, device_id) VALUES (?,?,?,?,?,?);"; + // Ԥprepared statement + std::unique_ptr insertStmt(m_pDbConnection->prepareStatement(insertSql)); + + // 󶨲ݵIJ + insertStmt->setString(1, ts); + insertStmt->setString(2, msg_type); + insertStmt->setString(3, fsu); + insertStmt->setString(4, content); + insertStmt->setInt(5, topic); + insertStmt->setInt(6, dev_id); + + // ִв + int rowsAffected = insertStmt->executeUpdate(); + + // Ƿɹ + if (rowsAffected > 0) + { + hlogi( "New message inserted successfully." ); + } + else + { + hloge( "No rows affected during insert operation." ); + } +} diff --git a/applications/ems_datahubs/opmysql.h b/applications/ems_datahubs/opmysql.h new file mode 100644 index 0000000..c258500 --- /dev/null +++ b/applications/ems_datahubs/opmysql.h @@ -0,0 +1,27 @@ +#ifndef __MY_OPERATE_MYSQL__ +#define __MY_OPERATE_MYSQL__ + +#include +#include + + +class OpDatabase +{ +protected: + OpDatabase(); +public: + ~OpDatabase(); + static OpDatabase* getInstance(); + + void CloseDatabase(); + bool OpenDatabase(const std::string& server = "tcp://127.0.0.1:3306", const std::string& dbuser = "root", const std::string& database="hjems"); + void InsertMessage(const std::string& ts, const std::string& msg_type, const std::string& fsu, const std::string& content, int topic, int dev_id); + +protected: + //std::unique_ptr m_pDbConnection; + sql::Connection* m_pDbConnection; +private: + static OpDatabase m_instance; +}; + +#endif