README.TXT
2023/6/15
V1.0 實現(xiàn)了單個點位數(shù)據(jù)通信、數(shù)據(jù)解析、數(shù)據(jù)存儲
2023/6/17
V2.0 實現(xiàn)了多個點位數(shù)據(jù)通信、數(shù)據(jù)解析、數(shù)據(jù)存儲
2023/6/19
V2.1 完善log存儲,僅保留近3天的log記錄,避免不必要的存儲;限制log大小,2MB。
架構(gòu)介紹
使用Python開發(fā)服務(wù)器程序,實現(xiàn)以下功能:
- 采用問詢的方式讀取各類傳感器數(shù)據(jù)
- 正確高速解析各類傳感器的數(shù)據(jù)
- 存儲解析后的各類傳感器數(shù)據(jù)
- 存儲程序運行過程中的log
- 管理log,超過一定量、一定時間自動刪除log
-
打包發(fā)布 或者在后臺運行py服務(wù)器程序
硬件介紹 傳感器 串口服務(wù)器 采集模塊 5G通信模塊
主要傳感器
485電子水尺傳感器
該傳感器支持485通信
其他硬件:
485漏液偵測傳感器
485采集模塊
串口服務(wù)器
5G通信模塊
調(diào)試工具
采集模塊調(diào)試工具
485電子水尺調(diào)試工具
串口調(diào)試助手,主要用于 漏液傳感器
其他工具
CRC計算器,不過我一般都是自己寫CRC校驗程序,CRC校驗程序放在后面完整代碼了
WIN10自帶的計算器
文章來源:http://www.zghlxwxcb.cn/news/detail-562724.html
調(diào)試過程
文章來源地址http://www.zghlxwxcb.cn/news/detail-562724.html
完整代碼 LOG存儲功能、數(shù)據(jù)讀取解析存儲功能
#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File : main0620.py
# @Author:Romulushe
# @Time : 2023/6/20 13:53
# @Software: PyCharm
# @Use: PyCharm
import os
import sys
import logging
import time
from logging.handlers import RotatingFileHandler
import shutil
import socket
import threading
import pymysql
import binascii
import time
import datetime
base_path = os.path.dirname(os.path.realpath(sys.argv[0]))
def get_log_path():
return os.path.join(base_path, 'logs')
def cleanup_logs():
log_path = get_log_path()
current_time = time.time()
for file_name in os.listdir(log_path):
file_path = os.path.join(log_path, file_name)
if os.path.isfile(file_path):
creation_time = os.path.getctime(file_path)
if current_time - creation_time > (3 * 24 * 60 * 60):
os.remove(file_path)
def configure_logging():
log_path = get_log_path()
os.makedirs(log_path, exist_ok=True)
log_filename = get_log_filename()
log_file = os.path.join(log_path, log_filename)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename=log_file)
def get_log_filename():
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d_%H-%M.log")
def create_new_log():
log_path = get_log_path()
log_files = os.listdir(log_path)
if len(log_files) >= 20:
oldest_file = min(log_files)
os.remove(os.path.join(log_path, oldest_file))
log_filename = get_log_filename()
log_filepath = os.path.join(log_path, log_filename)
return log_filepath
def check_log_size(log_filepath):
log_size = os.path.getsize(log_filepath)
if log_size > 2 * 1024 * 1024:
# 創(chuàng)建新的日志文件
new_log_filepath = create_new_log()
try:
shutil.move(log_filepath, new_log_filepath)
return new_log_filepath
except PermissionError:
insert_log(logger, f'{log_filepath} {PermissionError}', log_filepath)
time.sleep(0.1)
return log_filepath
return log_filepath
def insert_log(logger, log_message, log_filepath):
log_filepath = check_log_size(log_filepath)
# 創(chuàng)建文件處理器
file_handler = RotatingFileHandler(log_filepath, maxBytes=2 * 1024 * 1024, backupCount=1)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# 添加文件處理器到日志記錄器
logger.addHandler(file_handler)
try:
logger.debug(log_message)
except PermissionError:
insert_log(logger, f'{log_message} {PermissionError}', log_filepath)
time.sleep(0.1) # 延遲0.1秒
# 移除文件處理器
logger.removeHandler(file_handler)
# 定義個函數(shù),使其專門重復(fù)處理客戶端的請求數(shù)據(jù)(也就是重復(fù)接受一個用戶的消息并且重復(fù)回答,直到用戶選擇下線)
def client_request_1(tcp_client_1, tcp_client_address):
#print(tcp_client_1)
# 循環(huán)接收和發(fā)送數(shù)據(jù)
while True:
# #print(datetime.datetime.today())
# msg=["010300000005859C","02020000000879FF"]
# msg=['010300010001D5CA',"02020000000879FF"]
# msg=["010300000001840A","02020000000879FF"]#8點位
msg=msg_address(tcp_client_address)
# interval =60
interval = int(60 / len(msg))
# print(interval,type(interval))
for i in msg:
try:
tcp_client_1.send(binascii.unhexlify(i))
recv_data = tcp_client_1.recv(4096)
# 有消息就回復(fù)數(shù)據(jù),消息長度為0就是說明客戶端下線了
if recv_data:
# #print(f"recv_data:{recv_data}")
bistr = binascii.hexlify(recv_data).decode('utf8')
# print("==================================================")
# print(f"讀數(shù):【{bistr}】,{len(bistr)}")
try:
if (bistr.startswith('010302') or bistr.startswith('030302') ) and len(bistr) >=14:
bistr=bistr[:15]
# # 輸入數(shù)據(jù)
input_data = bistr[:-4]
# # 計算帶有校驗碼的結(jié)果
result = calculate_checksum(input_data)
if bistr == result:
waterLevel = int(bistr[6:10], 16)
print("水深", waterLevel, "cm", bistr, bistr[6:10], bistr[:-4], bistr[-4:], "input_data:",
{input_data}, " result:", {result})
# #print(bistr==result)
ip_address(tcp_client_address[0], tcp_client_address[1], waterLevel, 'water')
if bistr.startswith("020201") and len(bistr) >= 12:
bistr = bistr[:13]
# 輸入數(shù)據(jù)
input_data = bistr[:-4]
# 計算帶有校驗碼的結(jié)果
result = calculate_checksum(input_data)
# #print(result==bistr)
if result == bistr:
data = data_address(bistr)
#print(f"水泵采集模塊狀態(tài):{data}")
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'status')
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'fault')
if bistr.startswith("0104") and len(bistr) >= 14:
bistr = bistr[:15]
# 輸入數(shù)據(jù)
input_data = bistr[:-4]
# # 計算帶有校驗碼的結(jié)果
result = calculate_checksum(input_data)
if result == bistr:
# #print(f"bistr[6:10]:{bistr[6:10]}")
data =int(bistr[6:10], 16)
#print(f"漏液檢測狀態(tài):{data}")
ip_address(tcp_client_address[0], tcp_client_address[1], data, 'liquid')
except Exception as e:
insert_log(logger, f'{tcp_client_address} {e}', log_filepath)
#print(e)
else:
#print("%s 客戶端下線了..." % tcp_client_address[1])
insert_log(logger, f'{tcp_client_address[1]} OFFLINE', log_filepath)
tcp_client_1.close()
break
# time.sleep(interval)
time.sleep(2)
except Exception as e:
insert_log(logger, f'{tcp_client_address} {i} {e}', log_filepath)
#print(f"{tcp_client_address} {i} {e}")
# 連接數(shù)據(jù)庫
def connect_database(host, port, user, password, db_name):
try:
conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db_name)
# print("成功連接到數(shù)據(jù)庫")
return conn
except pymysql.Error as e:
insert_log(logger, f'DATABASE CONNECT FAILED', log_filepath)
# print(f"數(shù)據(jù)庫連接失敗: {e}")
# 插入數(shù)據(jù)
def insert_data(conn, types,table_name, create_time, location_no, parameter_desc, location, param_value):
try:
cursor = conn.cursor()
sql = f"INSERT INTO {table_name} (TYPE, CREATE_TIME, LOCATION_NO, PARAMETER_DESC, LOCATION, PARAMVALUE) " \
f"VALUES (%s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (types,create_time, location_no, parameter_desc, location, param_value))
conn.commit()
# print(f"數(shù)據(jù)插入成功: {create_time}, {parameter_desc}: {param_value}")
cursor.close()
except pymysql.Error as e:
insert_log(logger, f'DATABASE INSERT DATA FAILED', log_filepath)
# print(f"數(shù)據(jù)插入失敗: {e}")
def msg_address(tcp_client_address):
if tcp_client_address[0] == '根據(jù)實際IP修改' or tcp_client_address[0] == '根據(jù)實際IP修改' or tcp_client_address[0] == '根據(jù)實際IP修改' :
msg = ["010300000001840A", "02020000000879FF"] # 8點位無其他,水位01,狀態(tài)02
return msg
if (tcp_client_address[0] == '根據(jù)實際IP修改' and tcp_client_address[1] == 10010 )or tcp_client_address[0] == '根據(jù)實際IP修改' or (tcp_client_address[0] == '根據(jù)實際IP修改' and tcp_client_address[1] == 10000 ) or tcp_client_address[0] == '根據(jù)實際IP修改' or tcp_client_address[0] == '根據(jù)實際IP修改' :
msg =["010300000001840A", "02020000000879FF"] # 4點位非漏液,水位01,狀態(tài)02
return msg
if (tcp_client_address[0] =='根據(jù)實際IP修改' and tcp_client_address[1] == 10000 )or (tcp_client_address[0] == '根據(jù)實際IP修改' and tcp_client_address[1] == 10010):
msg = ["01040000000131CA"] # 4點位漏液
return msg
if tcp_client_address[0] == '根據(jù)實際IP修改' :
msg = ["010300010001D5CA","01040000000131CA"] # 2點位漏液,漏液01,水位02
return msg
if tcp_client_address[0] == '根據(jù)實際IP修改' :
msg = ["03030000000185E8","02020000000879FF"] # 展廳漏液01,水位03,狀態(tài)02
# msg = ["03030000000185E8", "02020000000879FF", "01040000000131CA"] # 展廳漏液01,水位03,狀態(tài)02
return msg
#ip判斷
def ip_address(ip,port,param_value,notes):
host = ''#根據(jù)實際情況自定義
port = 3306
user =''#根據(jù)實際情況自定義
password = ''#根據(jù)實際情況自定義
db_name = ''#根據(jù)實際情況自定義
table_name =''#根據(jù)實際情況自定義
# 連接數(shù)據(jù)庫
conn = connect_database(host, port, user, password, db_name)
#8點位采集+水尺
if str(ip)==''#根據(jù)實際情況自定義:
#只有1號端口
#水位尺數(shù)據(jù)
#采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes=='water':
location_no =''#根據(jù)實際情況自定義
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes=='status':
data=param_value
# print("data:",data)
status1=data[0]#1
status2=data[1]#2
status3=data[4]#5
status4=data[6]#7
# print("status1,status2,status3,status4:",status1,status2,status3,status4)
# # 插入1號數(shù)據(jù)
location_no = 'D04-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D04-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3號數(shù)據(jù)
location_no = 'D04-03'
parameter_desc = '3號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4號數(shù)據(jù)
location_no = 'D04-04'
parameter_desc = '4號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes=='fault':
data = param_value
fault1 = data[2]#3
fault2 = data[3]#4
fault3 = data[5]#6
fault4 = data[7]#8
# print("fault1,fault2,fault3,fault4:",fault1,fault2,fault3,fault4)
# 插入1號數(shù)據(jù)
location_no = 'D04-05'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D04-06'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3號數(shù)據(jù)
location_no = 'D04-07'
parameter_desc = '3號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4號數(shù)據(jù)
location_no = 'D04-08'
parameter_desc = '4號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 8點位采集+水尺
if str(ip)==''#根據(jù)實際情況自定義:
#只有1號端口
#水位尺數(shù)據(jù)
#采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes=='water':
location_no = 'D07'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes=='status':
data=param_value
status1=data[0]
status2=data[1]
status3=data[2]
status4=data[3]
#print("status1,status2,status3,status4:",status1,status2,status3,status4)
# 插入1號數(shù)據(jù)
location_no = 'D07-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D07-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3號數(shù)據(jù)
location_no = 'D07-03'
parameter_desc = '3號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4號數(shù)據(jù)
location_no = 'D07-04'
parameter_desc = '4號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes=='fault':
data=param_value
fault1 = data[4]
fault2 = data[5]
fault3 = data[6]
fault4 = data[7]
#print("fault1,fault2,fault3,fault4:",fault1,fault2,fault3,fault4)
# 插入1號數(shù)據(jù)
location_no = 'D07-05'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D07-06'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3號數(shù)據(jù)
location_no = 'D07-07'
parameter_desc = '3號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4號數(shù)據(jù)
location_no = 'D07-08'
parameter_desc = '4號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 8點位采集+水尺
if str(ip) == ''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D08'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
status3 = data[2]
status4 = data[3]
#print("status1,status2,status3,status4:", status1, status2, status3, status4)
# 插入1號數(shù)據(jù)
location_no = 'D08-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D08-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
# 插入3號數(shù)據(jù)
location_no = 'D08-03'
parameter_desc = '3號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status3)
# 插入4號數(shù)據(jù)
location_no = 'D08-04'
parameter_desc = '4號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status4)
if notes == 'fault':
data=param_value
fault1 = data[4]
fault2 = data[5]
fault3 = data[6]
fault4 = data[7]
#print("fault1,fault2,fault3,fault4:", fault1, fault2, fault3, fault4)
# 插入1號數(shù)據(jù)
location_no = 'D08-05'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D08-06'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 插入3號數(shù)據(jù)
location_no = 'D08-07'
parameter_desc = '3號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault3)
# 插入4號數(shù)據(jù)
location_no = 'D08-08'
parameter_desc = '4號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault4)
# 4點位采集+水尺
if str(ip) ==''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D09'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1號數(shù)據(jù)
location_no = 'D09-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D09-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1號數(shù)據(jù)
location_no = 'D09-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D09-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液數(shù)據(jù)
location_no = 'F03'
parameter_desc = ''#根據(jù)實際情況自定義
location=''#根據(jù)實際情況自定義
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 4點位采集+水尺
if str(ip) ==''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D06'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1號數(shù)據(jù)
location_no = 'D06-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D06-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1號數(shù)據(jù)
location_no = 'D06-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D06-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4點位采集+水尺
if str(ip) ==''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D05'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
# 插入1號數(shù)據(jù)
location_no = 'D05-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D05-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
# 插入1號數(shù)據(jù)
location_no = 'D05-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D05-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液數(shù)據(jù)
location_no = 'F02'
parameter_desc =''#根據(jù)實際情況自定義
location=''#根據(jù)實際情況自定義
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 4點位采集+水尺
if str(ip) == ''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location = ''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D03'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1號數(shù)據(jù)
location_no = 'D03-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D03-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1號數(shù)據(jù)
location_no = 'D03-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D03-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4點位采集+水尺
if str(ip) == ''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D01'
parameter_desc = '水位值'
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
#print("status1,status2:", status1, status2)
# 插入1號數(shù)據(jù)
location_no = 'D01-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D01-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
#print("fault1,fault2:", fault1, fault2)
# 插入1號數(shù)據(jù)
location_no = 'D01-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D01-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
# 4點位采集+水尺
if str(ip) == ''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
location =''#根據(jù)實際情況自定義
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D02'
parameter_desc = '水位值'
print( 'water:',param_value)
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'status':
data=param_value
status1 = data[0]
status2 = data[1]
print("status1,status2:", status1, status2)
# 插入1號數(shù)據(jù)
location_no = 'D02-01'
parameter_desc = '1號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status1)
# 插入2號數(shù)據(jù)
location_no = 'D02-02'
parameter_desc = '2號泵浦狀態(tài)'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, status2)
if notes == 'fault':
data=param_value
fault1 = data[2]
fault2 = data[3]
print("fault1,fault2:", fault1, fault2)
# 插入1號數(shù)據(jù)
location_no = 'D02-03'
parameter_desc = '1號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault1)
# 插入2號數(shù)據(jù)
location_no = 'D02-04'
parameter_desc = '2號泵浦故障'
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, fault2)
if notes == 'liquid':
data_liquid = param_value
print('liquid:'.data_liquid)
# 插入漏液數(shù)據(jù)
location_no = 'F01'
parameter_desc =''#根據(jù)實際情況自定義
location=''#根據(jù)實際情況自定義
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
# 2點位采集+水尺
if str(ip) == ''#根據(jù)實際情況自定義:
# 只有1號端口
# 水位尺數(shù)據(jù)
# 采集模塊數(shù)據(jù)
# 數(shù)據(jù)庫連接信息
types = '防汛'
create_time = datetime.datetime.now()
# 模擬數(shù)據(jù)
if notes == 'water':
location_no = 'D10'
parameter_desc = '水位值'
location =''#根據(jù)實際情況自定義
# 插入數(shù)據(jù)
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)
if notes == 'liquid':
data_liquid = param_value
# 插入漏液數(shù)據(jù)
location_no = 'F04'
parameter_desc = ''#根據(jù)實際情況自定義
location = ''#根據(jù)實際情況自定義
insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, data_liquid)
def data_address(data):
extracted_data = data[6:8]
binary_data = bin(int(extracted_data, 16))[2:].zfill(8)
reversed_data = binary_data[::-1]
return reversed_data
#CRC校驗
def calculate_crc(data):
crc = 0xFFFF
polynomial = 0xA001
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= polynomial
else:
crc >>= 1
return crc
def calculate_checksum(data):
# 將輸入數(shù)據(jù)轉(zhuǎn)換為字節(jié)列表
data_bytes = bytes.fromhex(data)
# 計算校驗碼
crc_value = calculate_crc(data_bytes)
crc_bytes = crc_value.to_bytes(2, 'big') # 將校驗碼轉(zhuǎn)換為2字節(jié)的大端字節(jié)序
checksum = ''.join(format(byte, '02x') for byte in reversed(crc_bytes)) # 轉(zhuǎn)換為十六進制字符串,并反轉(zhuǎn)字節(jié)順序
# 將校驗碼插入到原數(shù)據(jù)之后
result = data + checksum
return result
if __name__ == '__main__':
configure_logging()
cleanup_logs()
log_filepath = create_new_log()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
oldtime = datetime.datetime.today()
# 創(chuàng)建服務(wù)端套接字對象
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 設(shè)置端口復(fù)用,使程序退出后端口馬上釋放
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# 綁定端口
tcp_server.bind(("", 8050))
# 設(shè)置監(jiān)聽
tcp_server.listen(128)
# 循環(huán)等待客戶端連接請求
while True:
tcp_client_1, tcp_client_address = tcp_server.accept()
tcp_client_1.settimeout(10)
# 創(chuàng)建多線程對象
thd = threading.Thread(target=client_request_1, args=(tcp_client_1, tcp_client_address))
# #print(thd)
# 設(shè)置守護主線程
thd.setDaemon(True)
# 啟動子線程對象
thd.start()
# 關(guān)閉服務(wù)器套接字
# tcp_server.close()`
到了這里,關(guān)于用Python采用Modbus-Tcp的方式讀取485電子水尺數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!