0. 需求
在不同進(jìn)程或者不同語(yǔ)言間傳遞攝像頭圖片數(shù)據(jù),比如從java實(shí)現(xiàn)的代碼中獲取攝像頭畫(huà)面數(shù)據(jù),將其傳遞給python實(shí)現(xiàn)的算法代碼中進(jìn)行處理。這里,提供基于http方式和基于redis方式這兩種方式進(jìn)行實(shí)現(xiàn),并比較兩者傳輸速度。
作為樣例,代碼均采用python實(shí)現(xiàn),運(yùn)行環(huán)境為ubuntu 18.04。
1. 基于http方式傳遞圖片數(shù)據(jù)
1.1 發(fā)送圖片數(shù)據(jù)
-
思路:創(chuàng)建兩個(gè)線程,一個(gè)線程利用Opencv通過(guò)rtsp地址獲得攝像頭畫(huà)面,一個(gè)線程將攝像頭圖片數(shù)據(jù)轉(zhuǎn)為字節(jié)流,并通過(guò)http方式發(fā)送。
-
實(shí)現(xiàn)
#coding=gb2312
# 文件名:http_send.py
import requests
import base64
import cv2
import time
import threading
from queue import LifoQueue
class rtspRead: # rtsp地址讀取
def __init__(self, rtsp, port):
self.rtsp = rtsp # 攝像頭的rtsp地址
self.addr = "http://127.0.0.1:{}/image_post".format(port) # 本地http傳輸?shù)刂?/span>
self.frameQueue = LifoQueue() # 視頻幀的隊(duì)列
self.frameLock = threading.Lock() # 視頻幀隊(duì)列的鎖
self.threadFlag = True #
def start(self): # 開(kāi)始
t1 = threading.Thread(target=self.sendFrame, args=(), daemon=True)
t2 = threading.Thread(target=self.readFrame, args=(), daemon=True)
t1.start()
t2.start()
t1.join()
t2.join()
def sendFrame(self): # 通過(guò)http發(fā)送圖片
num = 0 # 計(jì)算100次圖片發(fā)送到接受的平均時(shí)間,以及平均幀數(shù)
while self.threadFlag:
time.sleep(0.01)
is_get_frame = False # 沒(méi)有從隊(duì)列中獲得圖片
self.frameLock.acquire()
if self.frameQueue.qsize():
frame = self.frameQueue.get()
is_get_frame = True # 從隊(duì)列中獲得圖片
self.frameLock.release()
if is_get_frame:
# frame 是ndarray對(duì)象,這里是把原始ndarray轉(zhuǎn)成jpg的字節(jié)流,轉(zhuǎn)成其它格式直接替換jpg即可
img_str = cv2.imencode('.jpg', frame)[1].tobytes()
#使用b64encode對(duì)bytes-like類(lèi)型對(duì)象進(jìn)行編碼(加密)并返回bytes對(duì)象
img_data = base64.b64encode(img_str)
data = {'img': img_data}
resp = requests.post(self.addr, data=data) # 發(fā)送圖片數(shù)據(jù),并獲得http_receive.py的返回信息
print("結(jié)果:", resp.text)
def readFrame(self): # 通過(guò)rtsp讀取圖片
self.cap = cv2.VideoCapture(self.rtsp)
if self.cap.isOpened():
time.sleep(0.01)
print("成功獲得句柄!")
while self.threadFlag:
ret, frame = self.cap.read()
if ret:
self.frameLock.acquire()
while self.frameQueue.qsize() > 3: # 盡量確保隊(duì)列中為最新的圖片幀
self.frameQueue.get()
self.frameQueue.put(frame)
self.frameLock.release()
else:
print("句柄獲得失??!")
self.threadFlag = False
self.cap.release()
if __name__ == '__main__':
rtsp_read = rtspRead("rtsp://xx:xx@xx", 9322)
rtsp_read.start()
1.2 接收?qǐng)D片數(shù)據(jù)并可視化
-
思路:通過(guò)flask框架接收http請(qǐng)求,并將接收到的圖片數(shù)據(jù)的字節(jié)流轉(zhuǎn)為np格式,并進(jìn)一步轉(zhuǎn)為opencv格式。另起一個(gè)線程,接收opencv格式的圖片數(shù)據(jù),并做顯示。
-
實(shí)現(xiàn)
#coding=gb2312
# 文件名:http_receive.py
from flask import Flask, request
import base64
import numpy as np
import cv2
import threading
from queue import LifoQueue
import time
class RtspPlay():
def __init__(self):
self.frame = None # 視頻幀
self.threadFlag = True #
def start(self): # 開(kāi)始
t1 = threading.Thread(target=self.play, args=(), daemon=True)
t1.start()
#t1.join()
def play(self):
starttime = time.time()
while self.threadFlag:
time.sleep(0.01)
print("進(jìn)入展示線程")
if time.time() - starttime > 260:
self.threadFlag = False
if self.frame is not None:
print("展示frame")
cv2.imshow('http_pic', self.frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
def setFrame(self, img):
print("更新frame")
self.frame = img.copy()
rtspPlay = RtspPlay()
rtspPlay.start()
app = Flask(__name__)
@app.route('/image_post', methods=['POST'])
def img_post():
if request.method == 'POST':
# 獲取圖片數(shù)據(jù)
img_base64 = request.form.get('img')
# 把圖片的二進(jìn)制進(jìn)行轉(zhuǎn)化
img_data = base64.b64decode(img_base64) #將拿到的base64的圖片轉(zhuǎn)換回來(lái)
img_array = np.fromstring(img_data,np.uint8) # 轉(zhuǎn)換np序列
img = cv2.imdecode(img_array,cv2.COLOR_BGR2RGB) # 轉(zhuǎn)換Opencv格式
mat = cv2.resize(img, (600, 600))
print(mat.shape)
rtspPlay.setFrame(mat)
time.sleep(0.01)
return 'receive img sucess'
if __name__ == "__main__":
app.run(host="127.0.0.1", port=9322)
1.3 測(cè)試
# 先開(kāi)一個(gè)terminal窗口,啟用接收進(jìn)程
python http_receive.py
# 再開(kāi)一個(gè)terminal窗口,啟用發(fā)送進(jìn)程
python http_send.py
2. 基于redis方式傳遞圖片數(shù)據(jù)
2.1 發(fā)送圖片數(shù)據(jù)
-
思路:創(chuàng)建兩個(gè)線程,一個(gè)線程利用Opencv通過(guò)rtsp地址獲得攝像頭畫(huà)面,一個(gè)線程將攝像頭圖片數(shù)據(jù)轉(zhuǎn)為字節(jié)流,并通過(guò)redis方式發(fā)送。這里的redis方式具體指的是,redis是一個(gè)內(nèi)存數(shù)據(jù)庫(kù),通過(guò)鍵值對(duì)存儲(chǔ)數(shù)據(jù),通過(guò)訂閱/發(fā)布機(jī)制傳遞消息,所以將圖片字節(jié)流數(shù)據(jù)存入redis中,并將存入消息發(fā)布出去,實(shí)現(xiàn)發(fā)送效果。
-
實(shí)現(xiàn)
#coding=gb2312
# 文件名:redis_send.py
import redis
import cv2
import time
import base64
import threading
from queue import LifoQueue
class redisSendPic: # redis發(fā)布者
def __init__(self, cameraip):
print("redis init ...")
self.r = redis.Redis(host='127.0.0.1', port=6379,db=0) # 建立連接
self.topic = 'img0' # 訂閱的主題
self.cameraip = cameraip
def send(self, img):
#print("發(fā)送圖片")
img_str = cv2.imencode('.jpg', img)[1].tobytes()
data = base64.b64encode(img_str)
self.r.set(self.cameraip, data)
self.r.publish(self.topic, self.cameraip)
def getResult(self,):
result = self.r.get('result')
return result
def delete(self):
self.r.delete('result')
self.r.delete(self.cameraip)
class rtspRead: # rtsp地址讀取
def __init__(self, rtsp, cameraip):
self.rtsp = rtsp
self.cameraip = cameraip
self.frameQueue = LifoQueue() # 視頻幀的隊(duì)列
self.frameLock = threading.Lock() # 視頻幀隊(duì)列的鎖
self.threadFlag = True #
def start(self): # 開(kāi)始
t1 = threading.Thread(target=self.sendFrame, args=(), daemon=True)
t2 = threading.Thread(target=self.readFrame, args=(), daemon=True)
t1.start()
t2.start()
t1.join()
t2.join()
def sendFrame(self): # 通過(guò)redis發(fā)送圖片
self.sendpic = redisSendPic(self.cameraip)
self.sendpic.r.set('result', 'start')
num = 0 # 計(jì)算100次圖片發(fā)送到接受的平均時(shí)間,以及平均幀數(shù)
total_time = 0
while self.threadFlag:
time.sleep(0.01)
is_get_frame = False # 沒(méi)有從隊(duì)列中獲得圖片
self.frameLock.acquire()
if self.frameQueue.qsize():
frame = self.frameQueue.get()
is_get_frame = True # 從隊(duì)列中獲得圖片
self.frameLock.release()
result = self.sendpic.getResult() # 從接受進(jìn)程獲得是否中止的信號(hào)
if result is not None and str(result, 'utf-8') == 'stop':
self.threadFlag = False
break
if is_get_frame:
self.sendpic.r.delete('receive_time') # 刪除接受時(shí)間
send_time = time.time() # 發(fā)送時(shí)間
self.sendpic.send(frame) # 發(fā)送圖片
receive_time = self.sendpic.r.get('receive_time') # 獲得接受圖片時(shí)間
while receive_time is None: # 等待接受圖片
#print("等待接受")
time.sleep(0.01)
receive_time = self.sendpic.r.get('receive_time')
result = self.sendpic.getResult() # 從接受進(jìn)程獲得是否中止的信號(hào)
if result is not None and str(result, 'utf-8') == 'stop':
self.threadFlag = False
break
if receive_time is not None:
total_time += float(str(receive_time, 'utf-8')) - send_time
#print("圖像發(fā)送到接受時(shí)間:", float(str(receive_time, 'utf-8')) - send_time)
num += 1
if num > 100:
print("發(fā)送收發(fā)100次,平均耗時(shí){}s,平均速度為{}幀/秒".format(total_time/100, round(100/total_time,2)))
num = 0
total_time = 0
self.sendpic.delete()
def readFrame(self): # 通過(guò)rtsp讀取圖片
self.cap = cv2.VideoCapture(self.rtsp)
if self.cap.isOpened():
time.sleep(0.01)
print("成功獲得句柄!")
while self.threadFlag:
ret, frame = self.cap.read()
if ret:
self.frameLock.acquire()
while self.frameQueue.qsize() > 3: # 盡量確保隊(duì)列中為最新的圖片幀
self.frameQueue.get()
self.frameQueue.put(frame)
self.frameLock.release()
else:
print("句柄獲得失?。?)
self.threadFlag = False
self.cap.release()
if __name__ == '__main__':
rtsp_read = rtspRead("rtsp://xx:xx@xx", 'xx')
rtsp_read.start()
2.2 接收?qǐng)D片數(shù)據(jù)并可視化
-
思路:通過(guò)redis數(shù)據(jù)庫(kù)的消息監(jiān)聽(tīng)機(jī)制,當(dāng)接收到數(shù)據(jù)入庫(kù)消息,則提取圖片字節(jié)流數(shù)據(jù),并將其處理為opencv格式的圖片數(shù)據(jù),從而做到顯示。
-
實(shí)現(xiàn)
#coding=utf-8
# 文件名:redis_receive.py
import redis
import time
import scipy.misc
import cv2
import base64
from PIL import Image
import io
import time
import scipy.misc
import numpy as np
r = redis.Redis(host='127.0.0.1',port=6379,db=0)
ps = r.pubsub()
charecter = "img"
ps.subscribe(charecter + str(0))
is_first = True
for item in ps.listen():
print("get message, ", item)
#r.set("result", str("ok"))
if is_first:
r.set('receive_time', str(time.time())) # 獲得可處理圖片時(shí)間
is_first = False
start = time.time()
if item['type'] == 'message' and item['data'] is not None:
img_base64 = r.get(str(item['data'], 'utf-8'))
img_data = base64.b64decode(img_base64) #將拿到的base64的圖片轉(zhuǎn)換回來(lái)
img_array = np.fromstring(img_data,np.uint8) # 轉(zhuǎn)換np序列
img = cv2.imdecode(img_array,cv2.COLOR_BGR2RGB) # 轉(zhuǎn)換Opencv格式
r.set('receive_time', str(time.time())) # 獲得可處理圖片時(shí)間
mat = cv2.resize(img, (600, 600))
print(mat.shape)
cv2.imshow('redis_pic', mat)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
#scipy.misc.imsave('D:/video.png', img)
# frame = cv2.resize(img_data, (0, 0), fx=0.5, fy=0.5)
#r.delete(charecter + str(0))
#r.set("result", str("ok"))
print("cost time:", time.time() - start)
cv2.destroyAllWindows()
r.set("result", str("stop"))
2.3 測(cè)試
# 先開(kāi)一個(gè)terminal窗口,啟用接收進(jìn)程
python redis_receive.py
# 再開(kāi)一個(gè)terminal窗口,啟用發(fā)送進(jìn)程
python redis_send.py
結(jié)束進(jìn)程直接在顯示窗口上按下q鍵即可。
3. 對(duì)比
綜合來(lái)看,在可視化攝像頭畫(huà)面的前提下,兩者均可做到實(shí)時(shí)顯示。其中,采用redis方式速度為14幀/秒左右,采用http方式速度為10幀/秒左右。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-697822.html
若要提高速度,可取消base64的加密過(guò)程;若僅考慮傳輸,可取消其中的可視化部分,傳輸速度應(yīng)該會(huì)進(jìn)一步提高。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-697822.html
到了這里,關(guān)于python 基于http方式與基于redis方式傳輸攝像頭圖片數(shù)據(jù)的實(shí)現(xiàn)和對(duì)比的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!