功能:?實(shí)現(xiàn)基本的tcp server端、client端,并引入threading,?保證兩端任意鏈接、斷鏈接,保證兩端的穩(wěn)定運(yùn)行
IP說(shuō)明: server不輸入IP,默認(rèn)為本機(jī)的IP,client需要輸入要鏈接的server端的IP
端口說(shuō)明:server, client端保持一致
ADB調(diào)試說(shuō)明:在連接數(shù)據(jù)線的情況下,PC安裝ADB調(diào)試工具,android端打開(kāi)ADB調(diào)試權(quán)限,輸入adb forward tcp:12345 tcp:8888,?其中12345為PC端口, 8888為android端口。參數(shù)鏈接:adb forward 參考
server code:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-703304.html
from socket import *
import threading
import time
from typing import Optional
class DataSend(threading.Thread):
tcp_server = Optional[socket]
def __init__(self, port):
threading.Thread.__init__(self, name="data send")
self.port = port
def run(self):
# Creating the socket object
self.tcp_server = socket(AF_INET, SOCK_STREAM)
# Binding to socket
self.tcp_server.bind(('', self.port)) # Host will be replaced/substitued with IP, if changed and not running on host
# Starting TCP listener
print('begin listen')
self.tcp_server.listen(3)
print('end listen')
while True:
# Starting the connection
print('begin accept')
clientsocket, address = self.tcp_server.accept()
print(address, 'accepted!')
try:
while True:
msg_bits = clientsocket.recv(1024)
print(msg_bits.decode('utf-8'))
# Message sent to client after successful connection
message = 'hello! Thank you for connecting to the server' + "\r\n"
clientsocket.send( message.encode('utf-8'))
except ConnectionResetError:
continue
clientsocket.close()
if __name__ == '__main__':
data_send = DataSend(12345)
data_send.start()
while data_send.is_alive():
time.sleep(1)
client code:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-703304.html
import threading
from socket import *
import time
from typing import Optional
class DataRec(threading.Thread):
tcp_client: Optional[socket]
def __init__(self, ip, port):
threading.Thread.__init__(self, name="data rec")
self.ip = ip
self.port = port
self.tcp_client = None
def wait_tcp_connect(self):
while self.tcp_client is None:
time.sleep(1)
self.tcp_client = socket(AF_INET, SOCK_STREAM)
try:
print('try to init client {}:{}'.format(self.ip, self.port))
self.tcp_client.connect((self.ip, self.port))
print('client inited!')
except Exception as e:
self.tcp_client = None
print("client init failed, waiting for server!")
def run(self):
self.wait_tcp_connect()
msg_buffer = ''
while True:
try:
time.sleep(1)
self.tcp_client.send('hello from client'.encode('utf-8'))
msg_bits = self.tcp_client.recv(1024*8)
if not msg_bits:
continue
msg_str = msg_bits.decode('utf-8')
print("rec: {}".format(msg_str))
except error as msg:
print('client rec msg catch error({} - {})'.format(error, msg))
self.tcp_client.close()
self.tcp_client = None
self.wait_tcp_connect()
msg_buffer = ''
# except Exception as e:
# print("client cat other error({})".format(e))
if __name__ == '__main__':
data_rec = DataRec('127.0.0.1', 12345)
data_rec.start()
while data_rec.is_alive():
time.sleep(1)
到了這里,關(guān)于python tcp server client示例代碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!