py-sim-cabinets/gwsimulator.py

292 lines
10 KiB
Python
Raw Permalink Normal View History

2025-03-11 14:54:07 +08:00
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# __author__ = 'HKC'
import io
import logging
from logging.handlers import RotatingFileHandler
import json
import datetime
import time
import sys
import os
from paho.mqtt import client as mqtt_client
import random
import getopt
# 日志文件路径
LOG_FILE_PATH = "./gwsimulator.log"
CFG_FILE_PATH = "./config.json"
# create logger
logger = logging.getLogger(__name__)
DAEMON_MODE = 0
def log_setup():
if len(logger.handlers) == 0:
level = logging.DEBUG
if os.path.exists(LOG_FILE_PATH):
file_size = os.path.getsize(LOG_FILE_PATH)
if file_size > 15000000:
timestamp = time.mktime(datetime.datetime.now().timetuple())
time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M")
new_name = LOG_FILE_PATH + "." + time_fmt
os.rename(LOG_FILE_PATH, new_name)
time.sleep(1)
handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None,
delay=False)
formatter = logging.Formatter(
"[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s")
handler.setFormatter(formatter)
if DAEMON_MODE == 0:
console_handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(handler)
logger.setLevel(level)
logger.info("===== log start, Welcome to the Earth ====")
# 保存异常信息
def dump_exception_msg(e, ip="127.0.0.1"):
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value)
logger.error( msg )
logger.exception(e)
def rand(lower_limit=None, upper_limit=None):
return random.randint(lower_limit, upper_limit)
class MqttConfigure:
def __init__(self):
self.broker_ip = '127.0.0.1'
self.port = 8764
self.username = 'username'
self.passwd = 'passwd'
self.publish_topic = 'public'
self.publish_interval = 5000
self.publish_qos = 0
self.subscribe_qos = 0
self.subscriber_topic = ''
self.client_id = f'python-mqtt-{random.randint(0, 1000)}'
self.max_publish_count = 0
self.fsuCodes = []
2025-03-27 12:08:59 +08:00
self.FsuCodePrefix = 1111111000
self.FsuCodeStart = 1
self.FsuCodeEnd = 999
2025-03-11 14:54:07 +08:00
class PublishData:
def __init__(self):
self.publish_data = {}
def read_configure(mqtt_conf=None, publish_data=None):
abspath = os.path.abspath(__file__)
json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json'
try:
with io.open(json_path, 'r') as f:
json_data = json.load(f)
mqtt_conf.broker_ip = json_data['broker_ip']
mqtt_conf.port = json_data['port']
mqtt_conf.username = json_data['username']
mqtt_conf.passwd = json_data['passwd']
mqtt_conf.publish_topic = json_data['publish_topic']
mqtt_conf.publish_interval = json_data['publish_interval']
mqtt_conf.publish_qos = json_data['publish_qos']
mqtt_conf.subscribe_qos = json_data['subscribe_qos']
mqtt_conf.subscriber_topic = json_data['subcriber_topic']
mqtt_conf.client_id = json_data['client_id']
2025-03-27 12:08:59 +08:00
# 2025.03.27 增加读取多个FsuCode用于模拟多个不同的设备
mqtt_conf.FsuCodePrefix = json_data['FsuCodePrefix']
mqtt_conf.FsuCodeStart = json_data['FsuCodeStart']
mqtt_conf.FsuCodeEnd = json_data['FsuCodeEnd']
mqtt_conf.fsuCodes = json_data['FsuCodes']
2025-03-11 14:54:07 +08:00
publish_data.publish_data = json_data['publish_data']
except Exception as e:
dump_exception_msg(e)
def pub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
2025-03-12 14:21:14 +08:00
def on_disconnect(client, userdata, rc):
"""断开连接回调"""
2025-03-27 12:08:59 +08:00
logger.info(f"[{time.ctime()}] 连接断开")
2025-03-12 14:21:14 +08:00
if rc != 0:
logger.info("尝试重新连接...")
reconnect()
def on_publish(client, userdata, mid):
"""消息发布回调"""
logger.info(f"[{time.ctime()}] 消息已确认送达 (MID: {mid})")
def reconnect():
"""自动重连机制"""
reconnect_delay = 5
try:
logger.info(f"等待{reconnect_delay}秒后尝试重连...")
time.sleep(reconnect_delay)
client.reconnect()
except Exception as e:
logger.info(f"重连失败:{str(e)}")
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1, protocol=4,
client_id=mqtt_conf.client_id, reconnect_on_failure=True)
2025-03-11 14:54:07 +08:00
client.on_connect = on_connect
2025-03-12 14:21:14 +08:00
client.on_disconnect = on_disconnect
client.on_publish = on_publish
2025-03-11 14:54:07 +08:00
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
def sub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
def on_message(client, userdata, msg):
logger.info(msg.topic + " " + str(msg.payload))
client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
# 根据配置信息构建发布的报文消息
# 2025.03.11 增加函数参数处理制定的fsucode
def build_publish_msg(fsu_code=None):
2025-03-11 14:54:07 +08:00
pd = publish_data.publish_data
msg = {"FsuCode": fsu_code, "type": pd["type"], "IdCodeContent": [],
2025-03-11 14:54:07 +08:00
"TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
for one in pd["IdCodeContent"]:
idcode = one["OID"]
factor = one["Factor"]
t = one["type"]
value = one["Value"]
if t == "random_number":
lo = one["RND"][0]
up = one["RND"][1]
value = rand(lo, up)
msg['IdCodeContent'].append({
"OID": idcode,
"Factor": factor,
"Value": value
})
json_data = json.dumps(msg, ensure_ascii=False)
return json_data
def publish(client):
msg_count = 0
2025-03-27 12:08:59 +08:00
# 2025.03.11 增加循环处理多个FsuCode用于模拟多个不同的设备
fsu_codes_result = []
for num in range(mqtt_conf.FsuCodeStart, mqtt_conf.FsuCodeEnd+1):
suffix = f"{num:04d}"
fsu_codes_result.append(mqtt_conf.FsuCodePrefix + suffix)
2025-03-11 14:54:07 +08:00
while True:
if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count:
break
2025-03-27 12:08:59 +08:00
# for fsu_code in mqtt_conf.fsuCodes:
for fsu_code in fsu_codes_result:
logger.info("Processing fsuCode: " + fsu_code)
msg = build_publish_msg(fsu_code)
2025-03-12 14:21:14 +08:00
msg_info = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos)
# msg_info : [0, 1]
status = msg_info[0]
mid = msg_info[1]
msg_info.wait_for_publish()
try:
b = msg_info.is_published()
if b:
logger.info(f">> `#{msg_count}` Message #{mid} is published!")
else:
logger.error(f">> `#{msg_count}` Message failed to be published!")
except Exception as e:
dump_exception_msg(e)
msg_count += 1
2025-03-12 14:21:14 +08:00
if status == mqtt_client.MQTT_ERR_SUCCESS:
2025-03-27 12:08:59 +08:00
# logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ")
logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}` successfully")
else:
logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}")
2025-03-11 14:54:07 +08:00
time.sleep(mqtt_conf.publish_interval/1000)
if __name__ == '__main__':
log_setup()
try:
running_mode = "pub"
argv = sys.argv
if len(argv) == 1:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="])
for opt, arg in opts:
if opt in ("-m", "--mode"):
running_mode = arg
elif opt in("-c", "--config"):
CFG_FILE_PATH = arg
else:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
logger.info(f"Running in {running_mode} mode")
mqtt_conf = MqttConfigure()
publish_data = PublishData()
if sys.version_info >= (3, 0):
read_configure(mqtt_conf, publish_data)
else:
logger.error("I need python version > 3.0.0")
sys.exit(2)
if running_mode.lower() == "pub":
client = pub_connect_mqtt()
client.loop_start()
publish(client)
else:
client = sub_connect_mqtt()
client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos)
client.loop_forever()
except KeyboardInterrupt:
logger.error("Disconnected from broker and EXIT")
client.disconnect()
logger.info("===== log end, I will be back ====")