py-sim-cabinets/gwsimulator.py

242 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# __author__ = 'HKC'
import io
import logging
from logging.handlers import RotatingFileHandler
import json
import datetime
import time
import sys
import os
from paho.mqtt import client as mqtt_client
import random
import getopt
# 日志文件路径
LOG_FILE_PATH = "./gwsimulator.log"
CFG_FILE_PATH = "./config.json"
# create logger
logger = logging.getLogger(__name__)
DAEMON_MODE = 0
def log_setup():
if len(logger.handlers) == 0:
level = logging.DEBUG
if os.path.exists(LOG_FILE_PATH):
file_size = os.path.getsize(LOG_FILE_PATH)
if file_size > 15000000:
timestamp = time.mktime(datetime.datetime.now().timetuple())
time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M")
new_name = LOG_FILE_PATH + "." + time_fmt
os.rename(LOG_FILE_PATH, new_name)
time.sleep(1)
handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None,
delay=False)
formatter = logging.Formatter(
"[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s")
handler.setFormatter(formatter)
if DAEMON_MODE == 0:
console_handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(handler)
logger.setLevel(level)
logger.info("===== log start, Welcome to the Earth ====")
# 保存异常信息
def dump_exception_msg(e, ip="127.0.0.1"):
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value)
logger.error( msg )
logger.exception(e)
def rand(lower_limit=None, upper_limit=None):
return random.randint(lower_limit, upper_limit)
class MqttConfigure:
def __init__(self):
self.broker_ip = '127.0.0.1'
self.port = 8764
self.username = 'username'
self.passwd = 'passwd'
self.publish_topic = 'public'
self.publish_interval = 5000
self.publish_qos = 0
self.subscribe_qos = 0
self.subscriber_topic = ''
self.client_id = f'python-mqtt-{random.randint(0, 1000)}'
self.max_publish_count = 0
self.fsuCodes = []
class PublishData:
def __init__(self):
self.publish_data = {}
def read_configure(mqtt_conf=None, publish_data=None):
abspath = os.path.abspath(__file__)
json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json'
try:
with io.open(json_path, 'r') as f:
json_data = json.load(f)
mqtt_conf.broker_ip = json_data['broker_ip']
mqtt_conf.port = json_data['port']
mqtt_conf.username = json_data['username']
mqtt_conf.passwd = json_data['passwd']
mqtt_conf.publish_topic = json_data['publish_topic']
mqtt_conf.publish_interval = json_data['publish_interval']
mqtt_conf.publish_qos = json_data['publish_qos']
mqtt_conf.subscribe_qos = json_data['subscribe_qos']
mqtt_conf.subscriber_topic = json_data['subcriber_topic']
mqtt_conf.client_id = json_data['client_id']
# 2025.03.11 增加读取多个FsuCode用于模拟多个不同的设备
mqtt_conf.fsuCodes = json_data['FsuCodes']
publish_data.publish_data = json_data['publish_data']
except Exception as e:
dump_exception_msg(e)
def pub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
client = mqtt_client.Client(protocol=3, client_id=mqtt_conf.client_id)
client.on_connect = on_connect
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
def sub_connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d", rc)
def on_message(client, userdata, msg):
logger.info(msg.topic + " " + str(msg.payload))
client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
time.sleep(1)
return client
# 根据配置信息构建发布的报文消息
# 2025.03.11 增加函数参数处理制定的fsucode
def build_publish_msg(fsu_code=None):
pd = publish_data.publish_data
# msg = {"FsuCode": pd["FsuCode"], "type": pd["type"], "IdCodeContent": [],
# "TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
msg = {"FsuCode": fsu_code, "type": pd["type"], "IdCodeContent": [],
"TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
for one in pd["IdCodeContent"]:
idcode = one["OID"]
factor = one["Factor"]
t = one["type"]
value = one["Value"]
if t == "random_number":
lo = one["RND"][0]
up = one["RND"][1]
value = rand(lo, up)
msg['IdCodeContent'].append({
"OID": idcode,
"Factor": factor,
"Value": value
})
json_data = json.dumps(msg, ensure_ascii=False)
return json_data
def publish(client):
msg_count = 0
while True:
if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count:
break
# 2025.03.11 增加循环处理多个FsuCode用于模拟多个不同的设备
for fsu_code in mqtt_conf.fsuCodes:
logger.info("Processing fsuCode: " + fsu_code)
msg = build_publish_msg(fsu_code)
result = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos)
# result: [0, 1]
status = result[0]
msg_count += 1
if status == 0:
logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ")
else:
logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}")
time.sleep(mqtt_conf.publish_interval/1000)
if __name__ == '__main__':
log_setup()
try:
running_mode = "pub"
argv = sys.argv
if len(argv) == 1:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="])
for opt, arg in opts:
if opt in ("-m", "--mode"):
running_mode = arg
elif opt in("-c", "--config"):
CFG_FILE_PATH = arg
else:
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
sys.exit(2)
logger.info(f"Running in {running_mode} mode")
mqtt_conf = MqttConfigure()
publish_data = PublishData()
if sys.version_info >= (3, 0):
read_configure(mqtt_conf, publish_data)
else:
logger.error("I need python version > 3.0.0")
sys.exit(2)
if running_mode.lower() == "pub":
client = pub_connect_mqtt()
client.loop_start()
publish(client)
else:
client = sub_connect_mqtt()
client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos)
client.loop_forever()
except KeyboardInterrupt:
logger.error("Disconnected from broker and EXIT")
client.disconnect()
logger.info("===== log end, I will be back ====")