在實現(xiàn)ia業(yè)務(wù)服務(wù)器時需要構(gòu)建一個python-socket客戶端,1、要求能與服務(wù)器保持心跳連接,每10秒鐘發(fā)送一次心跳信號;2、要求能根據(jù)socket服務(wù)器發(fā)送的指令創(chuàng)建或終止一個定時任務(wù)。
為此以3個類實現(xiàn)該功能,分別為socket通信類(用于實現(xiàn)通信連接與任務(wù)創(chuàng)建)、任務(wù)池類(用于管理任務(wù))、任務(wù)類(用于實現(xiàn)具體任務(wù))。
1、socket通信客戶端
這里定義的MySocket類主體結(jié)構(gòu)如下圖所示,共包含4個函數(shù),2個線程(其本身繼承Thread類實現(xiàn)主任務(wù)流程——run函數(shù)、接收服務(wù)器信息并創(chuàng)建任務(wù)添加到任務(wù)池
;同時又在__init__函數(shù)中將self.thread_msg類封裝為一個線程,每隔10秒鐘向socket服務(wù)器發(fā)送一次心跳包
)。check_connection函數(shù)用于檢測socket是否與服務(wù)器斷開連接,在send_msg函數(shù)中調(diào)用,當發(fā)現(xiàn)客戶端掉線后則立刻進行重連。send_msg函數(shù)用于發(fā)送信息給服務(wù)器,因為run函數(shù)與thread_msg函數(shù)2個線程都需要調(diào)用連接與服務(wù)器發(fā)送數(shù)據(jù),為避免沖突故而定義為函數(shù)在內(nèi)部進行加鎖。
#socket客戶端
class MySocket(Thread):
def __init__(self,config):
super().__init__()
# 1.創(chuàng)建套接字
self.tcp_socket = socket(AF_INET,SOCK_STREAM)
self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客戶端開啟心跳維護
# 2.準備連接服務(wù)器,建立連接
self.serve_ip = config["serve_ip"]#當前"118.24.111,149"
self.serve_port = config["serve_port"] #端口當前7900
self.sleep_time = config["sleep_time"]
print("connect to : ",self.serve_ip,self.serve_port)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
self.lock = threading.RLock()
self.taskpool=TaskPool()
task_msg=threading.Thread(target=self.thread_msg)
task_msg.daemon = True
task_msg.start()
#定時發(fā)送信息
def run(self):
while True:
a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
a=a.decode('utf-8')
print("------主線程-----",a)
jdata=json.loads(a)
#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
task=OCRTask(jdata)
self.taskpool.append(task)
json_data={
"type":"OCR_STATE_ACK",
"timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
"streamAddr": jdata["streamAddr"]
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
self.send_msg(data)
def check_connection(self):
try:
self.tcp_socket.getpeername()
return True
except socket.error:
return False
#定時發(fā)送心跳信息
def thread_msg(self):
while True:
#message=input('You can say:')
#json標注的模板
json_data={
"timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
"type":"HEARBEAT"
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
#進行定時發(fā)送
self.send_msg(data)
# self.lock.acquire()
# self.tcp_socket.send(data)#將發(fā)送的數(shù)據(jù)進行編碼
# self.lock.release()
try:
#進行定時發(fā)送
self.lock.acquire()
a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
self.lock.release()
time.sleep(self.sleep_time)
print("ack: ",a.decode('utf-8'))
except ConnectionRefusedError:
print('服務(wù)器拒絕本次連接!?。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except TimeoutError:
print('連接超時?。。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
print('智能終端無網(wǎng)絡(luò)連接!?。。?!')
def send_msg(self,msg):
if self.check_connection() is False:
print('服務(wù)器掉線!?。。。?)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
try:
#進行定時發(fā)送
self.lock.acquire()
self.tcp_socket.send(msg)
self.lock.release()
except ConnectionRefusedError:
print('服務(wù)器拒絕本次連接?。。。。?)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except TimeoutError:
print('連接超時?。。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
print('智能終端無網(wǎng)絡(luò)連接?。。。?!')
2、任務(wù)池實現(xiàn)
任務(wù)池的實現(xiàn)代碼如下所示,主要包含3個函數(shù)(其中將remove_task封裝為一個子線程,用于實時移除已經(jīng)完成計算任務(wù)的線程),append函數(shù)用于將新創(chuàng)建的任務(wù)添加大任務(wù)池pool中,stop函數(shù)用于停止并移除正在運行中的任務(wù)。
其具體實現(xiàn)代碼如下所示,其作為MySocket類中的一個成員屬性,每當MySocket接收到服務(wù)器信息創(chuàng)建任務(wù)ocrtask后都調(diào)用TaskPool.append(ocrtask)將任務(wù)添加到任務(wù)池中。由任務(wù)池管理任務(wù)的聲明周期,具體可見其append函數(shù)可以啟動task或終止task。remove_task線程會自動將已經(jīng)完成的任務(wù)移除。
#ocr任務(wù)線程池
class TaskPool:
def __init__(self,sleep_time=0.5):
self.pool=[]
self.sleep_time=sleep_time
task_msg=threading.Thread(target=self.remove_task)
task_msg.daemon = True
task_msg.start()
#刪除已經(jīng)結(jié)束的任務(wù)
def remove_task(self):
while True:
names=[]
for task in self.pool:
if task.get_count()==0: #生存時間為0,認為該任務(wù)已經(jīng)完成需要被刪除
task.stop()
self.pool.remove(task)
else:
names.append(task.taskname)
if len(names)>0:
print(names)
time.sleep(self.sleep_time)
def append(self,ocrtask):
if ocrtask.state==0:
#終止任務(wù)
self.stop(ocrtask)
else:
#啟動任務(wù)
ocrtask.start()
self.pool.append(ocrtask)
#終止任務(wù)
def stop(self,ocrtask):
for task in self.pool:
if task.taskname==ocrtask.taskname:
task.stop()
self.pool.remove(task)
3、具體任務(wù)線程
任務(wù)的實現(xiàn)代碼如下所示,其支持3中任務(wù)模式,使用state區(qū)分任務(wù),state為0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別(故而在state為2時將count設(shè)置的特別大)。這里以count控制任務(wù)的運行,任務(wù)每運行一次count減少1。當count小于等于0,則表示任務(wù)運行完成。在TaskPool的remove_task中檢測到count為0時則會自動刪除任務(wù)。
#ocr任務(wù)
class OCRTask(Thread):
def __init__(self,json):
super().__init__()
self.streamAddr=json["streamAddr"]
self.state=json["state"] # 0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別
if json["state"]==2:
self.count=9999999999999999999999999
else:
self.count=json["count"]
if "taskname" in json.keys():
self.taskname=json["taskname"]
else:
self.taskname=json["streamAddr"]
self.jsonname=json["jsonname"]
self.lock = threading.RLock()
def run(self):
while self.get_count()>0:
print('run %s'%self.taskname,end='*')
time.sleep(2)
self.lock.acquire()
self.count-=1
self.lock.release()
print('%s finish!! '%self.taskname)
#獲取任務(wù)的生存時間
def get_count(self):
self.lock.acquire()
now_count=self.count
self.lock.release()
#削減count
return now_count
#停止任務(wù)
def stop(self):
self.lock.acquire()
self.count=-1
self.lock.release()
#停止任務(wù)
pass
4、完整代碼與使用效果
完整代碼如下所示
from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
def hex_to_bytes(hex_str):
"""
:param hex_str: 16進制字符串
:return: byte_data 字節(jié)流數(shù)據(jù)
"""
bytes_data = bytes()
while hex_str :
"""16進制字符串轉(zhuǎn)換為字節(jié)流"""
temp = hex_str[0:2]
s = int(temp, 16)
bytes_data += struct.pack('B', s)
hex_str = hex_str[2:]
return bytes_data
# 讀取Yaml文件方法
def read_yaml(yaml_path):
with open(yaml_path, encoding="utf-8", mode="r") as f:
result = yaml.load(stream=f,Loader=yaml.FullLoader)
return result
#ocr任務(wù)
class OCRTask(Thread):
def __init__(self,json):
super().__init__()
self.streamAddr=json["streamAddr"]
self.state=json["state"] # 0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別
if json["state"]==2:
self.count=9999999999999999999999999
else:
self.count=json["count"]
if "taskname" in json.keys():
self.taskname=json["taskname"]
else:
self.taskname=json["streamAddr"]
self.jsonname=json["jsonname"]
self.lock = threading.RLock()
def run(self):
while self.get_count()>0:
print('run %s'%self.taskname,end='*')
time.sleep(2)
self.lock.acquire()
self.count-=1
self.lock.release()
print('%s finish!! '%self.taskname)
#獲取任務(wù)的生存時間
def get_count(self):
self.lock.acquire()
now_count=self.count
self.lock.release()
#削減count
return now_count
#停止任務(wù)
def stop(self):
self.lock.acquire()
self.count=-1
self.lock.release()
#停止任務(wù)
pass
#ocr任務(wù)線程池
class TaskPool:
def __init__(self,sleep_time=0.5):
self.pool=[]
self.sleep_time=sleep_time
task_msg=threading.Thread(target=self.remove_task)
task_msg.daemon = True
task_msg.start()
#刪除已經(jīng)結(jié)束的任務(wù)
def remove_task(self):
while True:
names=[]
for task in self.pool:
if task.get_count()==0:
task.stop()
self.pool.remove(task)
else:
names.append(task.taskname)
if len(names)>0:
print(names)
time.sleep(self.sleep_time)
def append(self,ocrtask):
if ocrtask.state==0:
#終止任務(wù)
self.stop(ocrtask)
else:
#啟動任務(wù)
ocrtask.start()
self.pool.append(ocrtask)
#終止任務(wù)
def stop(self,ocrtask):
for task in self.pool:
if task.taskname==ocrtask.taskname:
task.stop()
self.pool.remove(task)
#socket客戶端
class MySocket(Thread):
def __init__(self,config):
super().__init__()
# 1.創(chuàng)建套接字
self.tcp_socket = socket(AF_INET,SOCK_STREAM)
self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客戶端開啟心跳維護
# 2.準備連接服務(wù)器,建立連接
self.serve_ip = config["serve_ip"]#當前"118.24.111,149"
self.serve_port = config["serve_port"] #端口當前7900
self.sleep_time = config["sleep_time"]
print("connect to : ",self.serve_ip,self.serve_port)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
self.lock = threading.RLock()
self.taskpool=TaskPool()
task_msg=threading.Thread(target=self.thread_msg)
task_msg.daemon = True
task_msg.start()
#定時發(fā)送信息
#通信線程-用于接收服務(wù)器的指令
def run(self):
while True:
a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
a=a.decode('utf-8')
print("------主線程-----",a)
jdata=json.loads(a)
#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
task=OCRTask(jdata)
self.taskpool.append(task)
json_data={
"type":"OCR_STATE_ACK",
"timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
"streamAddr": jdata["streamAddr"]
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
self.send_msg(data)
#檢測socket連接是否斷開
def check_connection(self):
try:
self.tcp_socket.getpeername()
return True
except socket.error:
return False
#定時發(fā)送心跳信息--子線程
def thread_msg(self):
while True:
#message=input('You can say:')
#json標注的模板
json_data={
"timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
"type":"HEARBEAT"
}
#print( json_data)
message = json.dumps(json_data)
data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
data=hex_to_bytes(data)
#進行定時發(fā)送
self.send_msg(data)
# self.lock.acquire()
# self.tcp_socket.send(data)#將發(fā)送的數(shù)據(jù)進行編碼
# self.lock.release()
try:
#進行定時發(fā)送
self.lock.acquire()
a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
self.lock.release()
time.sleep(self.sleep_time)
print("ack: ",a.decode('utf-8'))
except ConnectionRefusedError:
print('服務(wù)器拒絕本次連接!?。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except TimeoutError:
print('連接超時?。。。。?)
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
print('智能終端無網(wǎng)絡(luò)連接?。。。?!')
#發(fā)送信息
def send_msg(self,msg):
if self.check_connection() is False:
print('服務(wù)器掉線?。。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
try:
#進行定時發(fā)送
self.lock.acquire()
self.tcp_socket.send(msg)
self.lock.release()
except ConnectionRefusedError:
print('服務(wù)器拒絕本次連接?。。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except TimeoutError:
print('連接超時?。。。?!')
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
except OSError:
self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 連接服務(wù)器,建立連接,參數(shù)是元組形式
print('智能終端無網(wǎng)絡(luò)連接?。。。?!')
if "__main__"==__name__:
#進行定時通信測試
config=read_yaml("config.yaml")
socket_client=MySocket(config)
socket_client.start()
使用效果如下所示,這里基于socket調(diào)試工具作為客戶端文章來源:http://www.zghlxwxcb.cn/news/detail-835803.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-835803.html
到了這里,關(guān)于基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!