ros實現(xiàn)將智能小車數(shù)據(jù)通過TCP/IP發(fā)送到上位機網(wǎng)關
這里主要注意:
將浮點型數(shù)據(jù)*1000轉(zhuǎn)換成int型數(shù)據(jù)然后分字節(jié)儲存(另取一個字節(jié)作為符號位)。沒有用struct.pack進行字節(jié)流打包,原因是不同平臺,字節(jié)流的打包浮點型數(shù)據(jù)不一樣,又要加上包頭等校驗信息,所以這里分別用單字節(jié)保存在列表中,然后sock.sendall(struct.pack(“B”*len(list_send),list_send))
學到的點
1、
在 Python 中, 是一個解包運算符(unpacking operator)。它可以應用于可迭代對象,如列表、元組等,并將可迭代對象中的元素解包為單獨的參數(shù)。
在給定的代碼中,*list_send 使用解包運算符 * 將列表 list_send 中的元素作為單獨的參數(shù)傳遞給 struct.pack 函數(shù)。這意味著 struct.pack 函數(shù)將接收列表中的每個元素作為單獨的參數(shù)。如果列表 list_send 包含三個元素 [1, 2, 3],那么 *list_send 將被解包為 1, 2, 3。
這種解包操作是為了確保 struct.pack 函數(shù)能夠接收正確數(shù)量的參數(shù),并將它們打包為字節(jié)流發(fā)送到 socket 連接。
B是無符號8位
len(list_send)是鏈表長度(列表中元素個數(shù))
2、
這里sock.connect((TCP_IP, TCP_PORT))會報錯異常終止程序運行我們用try except就非常好
while not connect:
try:
sock.connect((TCP_IP, TCP_PORT))
print("TCP/IP連接成功")
except:
time_connect+=1
print("TCP/IP連接失敗次數(shù):%d" % time_connect)
time.sleep(10)
3、def send_data(car_subscriber): 寫子函數(shù)時這個形參隨便寫調(diào)用傳入的是什么就是什么類型
4、轉(zhuǎn)16位高低8位分別存儲文章來源:http://www.zghlxwxcb.cn/news/detail-816222.html
int16_num=int(abs(raw_data.twist.linear.y)*1000)&0xFFFF
list_raw[4]=int16_num&0xFF
list_raw[5]=(int16_num>>8)&0xFF
5、創(chuàng)建套接字 固定IP和端口否則端口會自動分配文章來源地址http://www.zghlxwxcb.cn/news/detail-816222.html
local_ip='192.168.0.11'
local_port=8090
TCP_IP = '192.168.1.81'
TCP_PORT = 8090
# 創(chuàng)建TCP/IP客戶端套接字 AF_INET是指 IPv4 地址族 參數(shù)是套接字(socket)類型,用于指定 socket 的傳輸協(xié)議是基于流(stream)的,也就是 TCP/IP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((local_ip,local_port))
# 獲取實際分配的IP地址和端口號
actual_ip, actual_port = sock.getsockname()
# 打印實際分配的IP地址和端口號
print("Successfully bound to {}:{}".format(actual_ip, actual_port))
#!/usr/bin/env python
# encoding: utf-8
import binascii
from sqlite3 import connect
from threading import local
import time
import rospy
from std_msgs.msg import Float32
from sensor_msgs.msg import Imu, BatteryState
import struct
import socket
from geometry_msgs.msg import Twist
from time import sleep
#當前車頭方向
x_yaw=0
# 設置TCP/IP服務器地址和端口號
local_ip='192.168.0.11'
local_port=8090
TCP_IP = '192.168.1.81'
TCP_PORT = 8090
""" TCP_IP = '192.168.0.2'
TCP_PORT = 8888 """
# 創(chuàng)建TCP/IP客戶端套接字 AF_INET是指 IPv4 地址族 參數(shù)是套接字(socket)類型,用于指定 socket 的傳輸協(xié)議是基于流(stream)的,也就是 TCP/IP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((local_ip,local_port))
# 獲取實際分配的IP地址和端口號
actual_ip, actual_port = sock.getsockname()
# 打印實際分配的IP地址和端口號
print("Successfully bound to {}:{}".format(actual_ip, actual_port))
connect = False
time_connect=0
while not connect:
try:
sock.connect((TCP_IP, TCP_PORT))
print("TCP/IP連接成功")
connect=True
except:
time_connect+=1
print("TCP/IP連接失敗次數(shù):%d" % time_connect)
time.sleep(10)
# 訂閱小車速度、IMU角速度、電壓和電量
class RosTcpIPPublisher:
def __init__(self):
self.twist =Twist()
self.twist.linear.x=0.0
self.twist.linear.y=0.0
self.twist.linear.z=0.0
self.twist.angular.x = 0.0
self.twist.angular.y = 0.0
self.twist.angular.z =0.0
self.battery = 0.0
rospy.Subscriber('/vel_raw', Twist, self.vel_callback)
rospy.Subscriber('voltage', Float32, self.battery_callback)
def vel_callback(self, msg):
self.twist.linear.x = msg.linear.x
self.twist.linear.y = msg.linear.y
self.twist.linear.z = msg.linear.z
self.twist.angular.x = msg.angular.x
self.twist.angular.y = msg.angular.y
self.twist.angular.z = msg.angular.z
def battery_callback(self, msg):
self.battery = msg.data
# 打包數(shù)據(jù)并發(fā)送到TCP/IP服務器
def send_data(car_subscriber):
while not rospy.is_shutdown():
list_data=trans_data(car_subscriber)
list_send=clc_data(list_data)
# 發(fā)送數(shù)據(jù)到TCP/IP服務器
sock.sendall(struct.pack("B"*len(list_send),*list_send))
sleep(1)
def trans_data(raw_data):
list_raw=[0]*19
if raw_data.twist.linear.x>0:
list_raw[0]=1
else:
list_raw[0]=0
if raw_data.twist.linear.y>0:
list_raw[3]=1
else:
list_raw[3]=0
if raw_data.twist.linear.z>0:
list_raw[6]=1
else:
list_raw[6]=0
if raw_data.twist.angular.x>0:
list_raw[9]=1
else:
list_raw[9]=0
if raw_data.twist.angular.y>0:
list_raw[12]=1
else:
list_raw[12]=0
if raw_data.twist.angular.z>0:
list_raw[15]=1
else:
list_raw[15]=0
int16_num=int(abs(raw_data.twist.linear.x)*1000)&0xFFFF
list_raw[1]=int16_num&0xFF
list_raw[2]=(int16_num>>8)&0xFF
int16_num=int(abs(raw_data.twist.linear.y)*1000)&0xFFFF
list_raw[4]=int16_num&0xFF
list_raw[5]=(int16_num>>8)&0xFF
int16_num=int(abs(raw_data.twist.linear.z)*1000)&0xFFFF
list_raw[7]=int16_num&0xFF
list_raw[8]=(int16_num>>8)&0xFF
int16_num=int(abs(raw_data.twist.angular.x)*1000)&0xFFFF
list_raw[10]=int16_num&0xFF
list_raw[11]=(int16_num>>8)&0xFF
int16_num=int(abs(raw_data.twist.angular.y)*1000)&0xFFFF
list_raw[13]=int16_num&0xFF
list_raw[14]=(int16_num>>8)&0xFF
int16_num=int(abs(raw_data.twist.angular.z)*1000)&0xFFFF
list_raw[16]=int16_num&0xFF
list_raw[17]=(int16_num>>8)&0xFF
list_raw[18]=int(raw_data.battery*10)&0xFF
return list_raw
def clc_data(list_data):
count=0
#i=0
total_sum=0
current=list_data
list_send=[0]*0x26
list_send[0]=0xCC
list_send[1]=0xEE
#總長
list_send[2]=0x26
list_send[3]=0x00
#8Byte
for i in range(8):
list_send[i+4]=0x00
#節(jié)點類型
list_send[12]=0x88
list_send[13]=0x13
#指令
list_send[14]=0x01
#功能字
#車輛編號
list_send[15]=0x01
count=len(list_data)
for i in range(count):
list_send[i+16]=list_data[i]
#數(shù)據(jù)長度
len_L=20
len_H=00
list_send[35]=len_L
list_send[36]=len_H
for i in range(37):
total_sum+=list_send[i]
total_sum=total_sum%256
list_send[37]=total_sum
return list_send
if __name__ == '__main__':
rospy.init_node('ros_tcpip_publish',anonymous=False)
try:
car_subscriber = RosTcpIPPublisher()
sleep(2)
send_data(car_subscriber)
rospy.spin()
sock.close()
except Exception as e:
rospy.loginfo("ROS_TCP/IP_ERROR!!!")
print("發(fā)生異常",str(e))
到了這里,關于智能小車(八)ros實現(xiàn)將智能小車數(shù)據(jù)通過TCP/IP發(fā)送到上位機網(wǎng)關的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!