py-sim-cabinets/gwsimulator.py

292 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# __author__ = 'HKC'
import io
import logging
from logging.handlers import RotatingFileHandler
import json
import datetime
import time
import sys
import os
from paho.mqtt import client as mqtt_client
import random
import getopt
# 日志文件路径
LOG_FILE_PATH = "./gwsimulator.log"
CFG_FILE_PATH = "./config.json"
# create logger
logger = logging.getLogger(__name__)
DAEMON_MODE = 0
def log_setup():
if len(logger.handlers) == 0:
level = logging.DEBUG
if os.path.exists(LOG_FILE_PATH):
file_size = os.path.getsize(LOG_FILE_PATH)
if file_size > 15000000:
timestamp = time.mktime(datetime.datetime.now().timetuple())
time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M")
new_name = LOG_FILE_PATH + "." + time_fmt
os.rename(LOG_FILE_PATH, new_name)
time.sleep(1)
handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None,
delay=False)
formatter = logging.Formatter(
"[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s")
handler.setFormatter(formatter)
if DAEMON_MODE == 0:
console_handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(handler)
logger.setLevel(level)
logger.info("===== log start, Welcome to the Earth ====")
# 保存异常信息
def dump_exception_msg(e, ip="127.0.0.1"):
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value)
logger.error( msg )
logger.exception(e)
def rand(lower_limit=None, upper_limit=None):
return random.randint(lower_limit, upper_limit)
class MqttConfigure:
def __init__(self):
self.broker_ip = '127.0.0.1'
self.port = 8764
self.username = 'username'
self.passwd = 'passwd'
self.publish_topic = 'public'
self.publish_interval = 5000
self.publish_qos = 0
self.subscribe_qos = 0
self.subscriber_topic = ''
self.client_id = f'python-mqtt-{random.randint(0, 1000)}'
self.max_publish_count = 0
self.fsuCodes = []
self.FsuCodePrefix = 1111111000
self.FsuCodeStart = 1
self.FsuCodeEnd = 999
class PublishData:
def __init__(self):
self.publish_data = {}
def read_configure(mqtt_conf=None, publish_data=None):
abspath = os.path.abspath(__file__)
json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json'
try:
with io.open(json_path, 'r') as f:
json_data = json.load(f)
mqtt_conf.broker_ip = json_data['broker_ip']
mqtt_conf.port = json_data['port']
mqtt_conf.username = json_data['username']
mqtt_conf.passwd = json_data['passwd']
mqtt_conf.publish_topic = json_data['publish_topic']
mqtt_conf.publish_interval = json_data['publish_interval']
mqtt_conf.publish_qos = json_data['publish_qos']
mqtt_conf.subscribe_qos = json_data['subscribe_qos']
mqtt_conf.subscriber_topic = json_data['subcriber_topic']
mqtt_conf.client_id = json_data['client_id']
# 2025.03.27 增加读取多个FsuCode用于模拟多个不同的设备
mqtt_conf.FsuCodePrefix = json_data['FsuCodePrefix']
mqtt_conf.FsuCodeStart = json_data['FsuCodeStart']
mqtt_conf.FsuCodeEnd = json_data['FsuCodeEnd']
mqtt_conf.fsuCodes = json_data['FsuCodes']
publish_data.publish_data = json_data['publish_data']
except Exception as e:
dump_exception_msg(e)
def pub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
def on_disconnect(client, userdata, rc):
"""断开连接回调"""
logger.info(f"[{time.ctime()}] 连接断开")
if rc != 0:
logger.info("尝试重新连接...")
reconnect()
def on_publish(client, userdata, mid):
"""消息发布回调"""
logger.info(f"[{time.ctime()}] 消息已确认送达 (MID: {mid})")
def reconnect():
"""自动重连机制"""
reconnect_delay = 5
try:
logger.info(f"等待{reconnect_delay}秒后尝试重连...")
time.sleep(reconnect_delay)
client.reconnect()
except Exception as e:
logger.info(f"重连失败:{str(e)}")
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1, protocol=4,
client_id=mqtt_conf.client_id, reconnect_on_failure=True)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
def sub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
def on_message(client, userdata, msg):
logger.info(msg.topic + " " + str(msg.payload))
client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
# 根据配置信息构建发布的报文消息
# 2025.03.11 增加函数参数处理制定的fsucode
def build_publish_msg(fsu_code=None):
pd = publish_data.publish_data
msg = {"FsuCode": fsu_code, "type": pd["type"], "IdCodeContent": [],
"TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
for one in pd["IdCodeContent"]:
idcode = one["OID"]
factor = one["Factor"]
t = one["type"]
value = one["Value"]
if t == "random_number":
lo = one["RND"][0]
up = one["RND"][1]
value = rand(lo, up)
msg['IdCodeContent'].append({
"OID": idcode,
"Factor": factor,
"Value": value
})
json_data = json.dumps(msg, ensure_ascii=False)
return json_data
def publish(client):
msg_count = 0
# 2025.03.11 增加循环处理多个FsuCode用于模拟多个不同的设备
fsu_codes_result = []
for num in range(mqtt_conf.FsuCodeStart, mqtt_conf.FsuCodeEnd+1):
suffix = f"{num:04d}"
fsu_codes_result.append(mqtt_conf.FsuCodePrefix + suffix)
while True:
if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count:
break
# for fsu_code in mqtt_conf.fsuCodes:
for fsu_code in fsu_codes_result:
logger.info("Processing fsuCode: " + fsu_code)
msg = build_publish_msg(fsu_code)
msg_info = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos)
# msg_info : [0, 1]
status = msg_info[0]
mid = msg_info[1]
msg_info.wait_for_publish()
try:
b = msg_info.is_published()
if b:
logger.info(f">> `#{msg_count}` Message #{mid} is published!")
else:
logger.error(f">> `#{msg_count}` Message failed to be published!")
except Exception as e:
dump_exception_msg(e)
msg_count += 1
if status == mqtt_client.MQTT_ERR_SUCCESS:
# logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ")
logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}` successfully")
else:
logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}")
time.sleep(mqtt_conf.publish_interval/1000)
if __name__ == '__main__':
log_setup()
try:
running_mode = "pub"
argv = sys.argv
if len(argv) == 1:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="])
for opt, arg in opts:
if opt in ("-m", "--mode"):
running_mode = arg
elif opt in("-c", "--config"):
CFG_FILE_PATH = arg
else:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
logger.info(f"Running in {running_mode} mode")
mqtt_conf = MqttConfigure()
publish_data = PublishData()
if sys.version_info >= (3, 0):
read_configure(mqtt_conf, publish_data)
else:
logger.error("I need python version > 3.0.0")
sys.exit(2)
if running_mode.lower() == "pub":
client = pub_connect_mqtt()
client.loop_start()
publish(client)
else:
client = sub_connect_mqtt()
client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos)
client.loop_forever()
except KeyboardInterrupt:
logger.error("Disconnected from broker and EXIT")
client.disconnect()
logger.info("===== log end, I will be back ====")