源碼將于最后一遍文章給出下載
監(jiān)測(cè)數(shù)據(jù)采集物聯(lián)網(wǎng)應(yīng)用開(kāi)發(fā)步驟(6)
串口(COM)通訊開(kāi)發(fā)
本章節(jié)測(cè)試使用了 Configure Virtual Serial Port Driver虛擬串口工具和本人自寫的串口調(diào)試工具,請(qǐng)自行baidu下載對(duì)應(yīng)工具
在com.zxy.common.Com_Para.py中添加如下內(nèi)容
#RS232串口通訊列表 串口號(hào),波特率,數(shù)據(jù)位,索引(A,B,C,D區(qū)分),多串口分割符;
ComPortList = "" #linux參考:/dev/ttyS0,9600,8,0,A;/dev/ttyS1.9600,8,0,B windwows參考:COM1,9600,8,0;COM2,9600,8,2
#串口通訊全局變量hashtable <String, seria>串口索引---串口對(duì)象
htComPort = {}
?在com.zxy.main.Init_Page.py中添加如下內(nèi)容
@staticmethod
def Start_ComPort():
iIndex = 0
for temComPort in Com_Para.ComPortList.split(";"):
iIndex = iIndex + 1
temComPortInfo = temComPort.split(",")
try:
if len(temComPortInfo) == 5 and Com_Fun.GetHashTableNone(Com_Para.htComPort, temComPortInfo[4]) is None:
temCD = ComDev(temComPortInfo[0], int(temComPortInfo[1]), int(temComPortInfo[2]), int(temComPortInfo[3]), iIndex)
temCD.attPortName = temComPortInfo[4]
Com_Fun.SetHashTable(Com_Para.htComPort, temComPortInfo[4], temCD)
except Exception as e:
print("com link error:COM"+temComPortInfo[0]+"==>" + repr(e)+"=>"+str(e.__traceback__.tb_lineno))
finally:
Pass
創(chuàng)建串口設(shè)備管理類com.zxy.comport.ComDev.py
#! python3
# -*- coding: utf-8 -
'''
Created on 2017年05月10日
@author: zxyong 13738196011
'''
import datetime,threading,time,serial
from com.zxy.common.Com_Fun import Com_Fun
from com.zxy.adminlog.UsAdmin_Log import UsAdmin_Log
from com.zxy.common import Com_Para
from com.zxy.z_debug import z_debug
#監(jiān)測(cè)數(shù)據(jù)采集物聯(lián)網(wǎng)應(yīng)用--串口設(shè)備管理
class ComDev(z_debug):
attIndex = 0
attPort = 0
attBaudrate = 9600
attBytesize = 8
attSerial = None
#超時(shí)時(shí)間(秒) 為了驗(yàn)證測(cè)試效果,將時(shí)間設(shè)置為10秒
attTimeout = 10
#返回值
attReturnValue = None
attPortName = ""
#特殊插件處理
attProtocol = ""
#回發(fā)數(shù)據(jù)
attSendValue = None
#線程鎖
attLock = threading.Lock()
def __init__(self, inputPort,inputBaudrate,inputBytesize,inputparity,inputIndex):
self.attPort = inputPort
self.attBaudrate = inputBaudrate
self.attBytesize = inputBytesize
temParity = "N"
if str(inputparity) == "0": #無(wú)校驗(yàn)
temParity = "N"
elif str(inputparity) == "1": #偶校驗(yàn)
temParity = "E"
elif str(inputparity) == "2": #奇校驗(yàn)
temParity = "O"
elif str(inputparity) == "3":
temParity = "M"
elif str(inputparity) == "4":
temParity = "S"
self.attSerial = serial.Serial(port=self.attPort,baudrate=self.attBaudrate,bytesize=self.attBytesize,parity=temParity, stopbits=1)
self.attSerial.timeout = self.attTimeout
self.attIndex = inputIndex
self.OpenSeriaPort()
#打開(kāi)串口
def OpenSeriaPort(self):
try:
if not self.attSerial.isOpen():
self.attSerial.open()
t = threading.Thread(target=self.OnDataReceived, name="ComPortTh" + str(self.attIndex))
t.start()
uL = UsAdmin_Log(Com_Para.ApplicationPath,str("ComPortTh" + str(self.attIndex)))
uL.SaveFileDaySub("thread")
print("Open ComPortTh" + str(self.attIndex)+" COM:"+str(self.attSerial.port)+" "+Com_Fun.GetTimeDef()+" lenThreads:"+str(len(threading.enumerate())))
return True
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
return False
finally:
pass
#關(guān)閉串口
def CloseSeriaPort(self):
try:
if not self.attSerial.isOpen():
self.attSerial.close()
return True
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
return False
finally:
pass
#發(fā)送數(shù)據(jù)無(wú)返回
def WritePortDataImmed(self,inputByte):
try:
if not self.attSerial.isOpen():
self.OpenSeriaPort()
if self.attSerial.isOpen() and self.attLock.acquire():
self.attReturnValue = None
temNumber = self.attSerial.write(inputByte)
time.sleep(0.2)
self.attLock.release()
return temNumber
else:
return 0
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
return -1
#返回值為字節(jié),帶結(jié)束符
def WritePortDataFlag(self,inputByte,EndFlag):
try:
if not self.attSerial.isOpen():
self.OpenSeriaPort()
if self.attSerial.isOpen() and self.attLock.acquire():
self.attReturnValue = None
temNumber = self.attSerial.write(inputByte)
starttime = datetime.datetime.now()
endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)
while (self.attReturnValue is None or self.attReturnValue[len(self.attReturnValue) - len(EndFlag):len(self.attReturnValue)] != EndFlag.encode(Com_Para.U_CODE)) and starttime <= endtime:
starttime = datetime.datetime.now()
time.sleep(0.2)
self.attLock.release()
return temNumber
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
return -1
finally:
pass
#返回值為字節(jié)
def WritePortData(self,inputByte):
try:
if not self.attSerial.isOpen():
self.OpenSeriaPort()
if self.attSerial.isOpen() and self.attLock.acquire():
self.attReturnValue = None
temNumber = self.attSerial.write(inputByte)
starttime = datetime.datetime.now()
endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)
while self.attReturnValue is None and starttime <= endtime:
starttime = datetime.datetime.now()
time.sleep(0.2)
self.attLock.release()
return temNumber
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印異常信息
return -1
finally:
pass
#接收數(shù)據(jù)
def OnDataReceived(self):
try:
while self.attSerial.isOpen():
temNum = self.attSerial.inWaiting()
if temNum > 0:
if self.attReturnValue is None:
self.attReturnValue = self.attSerial.read(temNum)
else:
self.attReturnValue = self.attReturnValue + self.attSerial.read(temNum)
else:
time.sleep(1)
except Exception as e:
if str(type(self)) == "<class 'type'>":
self.debug_in(self, repr(e)+"=>"+str(e.__traceback__.tb_lineno))
else:
self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))
self.attReturnValue = None
串口通訊測(cè)試案例MonitorDataCmd.py主文件中編寫:
在該語(yǔ)句下添加
#串口配置參數(shù)
Com_Para.ComPortList = "COM2,9600,8,0,A;COM4,9600,8,2,B"
#串口連接初始化
Init_Page.Start_ComPort()
#測(cè)試串口數(shù)據(jù)發(fā)送和接收
temCP2 = Com_Fun.GetHashTable(Com_Para.htComPort,"A")#獲取串口2對(duì)象
temCP4 = Com_Fun.GetHashTable(Com_Para.htComPort,"B")#獲取串口4對(duì)象
temByte1 = ("AABBCCDDVV").encode(Com_Para.U_CODE) #發(fā)送字符串轉(zhuǎn)byte[]
temByte2 = ("11223344KM").encode(Com_Para.U_CODE) #發(fā)送字符串轉(zhuǎn)byte[]
print("開(kāi)始發(fā)送串口數(shù)據(jù)")
temRec1 = temCP2.WritePortData(temByte1)#往串口2發(fā)送數(shù)據(jù)
print("串口2發(fā)送數(shù)據(jù)長(zhǎng)度:"+str(temRec1))
strRec = ""
if temCP2.attReturnValue != None:
strRec = temCP2.attReturnValue.decode(Com_Para.U_CODE)#收到串口數(shù)據(jù)
print("串口2收到數(shù)據(jù)值:"+strRec)
temRec2 = temCP4.WritePortData(temByte2)#往串口4發(fā)送數(shù)據(jù)
print("串口3發(fā)送數(shù)據(jù)長(zhǎng)度:"+str(temRec2))
strRec = ""
if temCP4.attReturnValue != None:
strRec = temCP4.attReturnValue.decode(Com_Para.U_CODE)#收到串口數(shù)據(jù)
print("串口4收到數(shù)據(jù)值:"+strRec)
串口調(diào)試測(cè)試結(jié)果:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-680722.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-680722.html
- 監(jiān)測(cè)數(shù)據(jù)采集物聯(lián)網(wǎng)應(yīng)用開(kāi)發(fā)步驟(8.1)
到了這里,關(guān)于9、監(jiān)測(cè)數(shù)據(jù)采集物聯(lián)網(wǎng)應(yīng)用開(kāi)發(fā)步驟(7)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!