diff --git a/config4.json b/config4.json index 2617eb4..fac1330 100644 --- a/config4.json +++ b/config4.json @@ -1,8 +1,8 @@ { - "client_id": "gw_simulator_888", - "broker_ip": "127.0.0.1", - "port": 1883, - "username": "hjemq-c2", + "client_id": "gw_simulator_999", + "broker_ip": "116.236.50.106", + "port": 8764, + "username": "hjemq_sim_icm", "passwd": "Hj57471000", "publish_topic": "GateWayPublicTopic_Server", "publish_interval": 5000, @@ -10,7 +10,7 @@ "publish_qos": 0, "subscribe_qos": 0, "subcriber_topic": "ServerPublicTopic_GateWay", - "FsuCodes": ["11010110100111","11010110100112","11010110100113","11010110100114"], + "FsuCodes": ["11111110000001","11111110000002","11111110000003","11111110000004"], "publish_data": { "FsuCode": "11010110100999", "type": "gateway-data", @@ -163,8 +163,8 @@ "Factor": "100", "type": "random_number", "RND": [ - 4980, - 5210 + 5001, + 5040 ] }, { @@ -174,7 +174,7 @@ "type": "random_number", "RND": [ 3000, - 4600 + 3600 ] }, { @@ -183,8 +183,8 @@ "Factor": "100", "type": "random_number", "RND": [ - 3100, - 6600 + 5100, + 7600 ] }, { @@ -193,8 +193,8 @@ "Factor": "100", "type": "random_number", "RND": [ - 80, - 85 + 88, + 89 ] }, { @@ -284,7 +284,7 @@ "type": "random_number", "RND": [ 20, - 56 + 36 ] }, { diff --git a/gwsimulator.py b/gwsimulator.py index 6344291..26ce7b0 100644 --- a/gwsimulator.py +++ b/gwsimulator.py @@ -117,8 +117,34 @@ def pub_connect_mqtt(): else: logger.error("Failed to connect, return code %d", rc) - client = mqtt_client.Client(protocol=3, client_id=mqtt_conf.client_id) + def on_disconnect(client, userdata, rc): + """断开连接回调""" + logger.info(f"[{time.ctime()}] 连接断开,原因:{client.error_string(rc)}") + if rc != 0: + logger.info("尝试重新连接...") + reconnect() + + def on_publish(client, userdata, mid): + """消息发布回调""" + logger.info(f"[{time.ctime()}] 消息已确认送达 (MID: {mid})") + + def reconnect(): + """自动重连机制""" + reconnect_delay = 5 + try: + logger.info(f"等待{reconnect_delay}秒后尝试重连...") + time.sleep(reconnect_delay) + client.reconnect() + except Exception as e: + logger.info(f"重连失败:{str(e)}") + + client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1, protocol=4, + client_id=mqtt_conf.client_id, reconnect_on_failure=True) client.on_connect = on_connect + + client.on_disconnect = on_disconnect + client.on_publish = on_publish + client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd) client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60) time.sleep(1) @@ -181,11 +207,24 @@ def publish(client): for fsu_code in mqtt_conf.fsuCodes: logger.info("Processing fsuCode: " + fsu_code) msg = build_publish_msg(fsu_code) - result = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos) - # result: [0, 1] - status = result[0] + msg_info = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos) + # msg_info : [0, 1] + status = msg_info[0] + mid = msg_info[1] + + msg_info.wait_for_publish() + + try: + b = msg_info.is_published() + if b: + logger.info(f">> `#{msg_count}` Message #{mid} is published!") + else: + logger.error(f">> `#{msg_count}` Message failed to be published!") + except Exception as e: + dump_exception_msg(e) + msg_count += 1 - if status == 0: + if status == mqtt_client.MQTT_ERR_SUCCESS: logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ") else: logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9828d28 Binary files /dev/null and b/requirements.txt differ