国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù))

這篇具有很好參考價值的文章主要介紹了基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù))。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

在實現(xiàn)ia業(yè)務(wù)服務(wù)器時需要構(gòu)建一個python-socket客戶端,1、要求能與服務(wù)器保持心跳連接,每10秒鐘發(fā)送一次心跳信號;2、要求能根據(jù)socket服務(wù)器發(fā)送的指令創(chuàng)建或終止一個定時任務(wù)。
為此以3個類實現(xiàn)該功能,分別為socket通信類(用于實現(xiàn)通信連接與任務(wù)創(chuàng)建)、任務(wù)池類(用于管理任務(wù))、任務(wù)類(用于實現(xiàn)具體任務(wù))。

1、socket通信客戶端

這里定義的MySocket類主體結(jié)構(gòu)如下圖所示,共包含4個函數(shù),2個線程(其本身繼承Thread類實現(xiàn)主任務(wù)流程——run函數(shù)、接收服務(wù)器信息并創(chuàng)建任務(wù)添加到任務(wù)池;同時又在__init__函數(shù)中將self.thread_msg類封裝為一個線程,每隔10秒鐘向socket服務(wù)器發(fā)送一次心跳包)。check_connection函數(shù)用于檢測socket是否與服務(wù)器斷開連接,在send_msg函數(shù)中調(diào)用,當發(fā)現(xiàn)客戶端掉線后則立刻進行重連。send_msg函數(shù)用于發(fā)送信息給服務(wù)器,因為run函數(shù)與thread_msg函數(shù)2個線程都需要調(diào)用連接與服務(wù)器發(fā)送數(shù)據(jù),為避免沖突故而定義為函數(shù)在內(nèi)部進行加鎖。
基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù)),項目實戰(zhàn)記錄,python,服務(wù)器,網(wǎng)絡(luò)

#socket客戶端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.創(chuàng)建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客戶端開啟心跳維護
        # 2.準備連接服務(wù)器,建立連接
        self.serve_ip = config["serve_ip"]#當前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口當前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定時發(fā)送信息
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
            a=a.decode('utf-8')
            print("------主線程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定時發(fā)送心跳信息
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json標注的模板
            json_data={  
                "timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #進行定時發(fā)送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#將發(fā)送的數(shù)據(jù)進行編碼
            # self.lock.release()
            try:
                #進行定時發(fā)送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服務(wù)器拒絕本次連接!?。。?!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            except TimeoutError:
                print('連接超時?。。。?!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
                print('智能終端無網(wǎng)絡(luò)連接!?。。?!')

    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服務(wù)器掉線!?。。。?)
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        try:
            #進行定時發(fā)送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服務(wù)器拒絕本次連接?。。。。?)
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        except TimeoutError:
            print('連接超時?。。。?!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            print('智能終端無網(wǎng)絡(luò)連接?。。。?!')

2、任務(wù)池實現(xiàn)

任務(wù)池的實現(xiàn)代碼如下所示,主要包含3個函數(shù)(其中將remove_task封裝為一個子線程,用于實時移除已經(jīng)完成計算任務(wù)的線程),append函數(shù)用于將新創(chuàng)建的任務(wù)添加大任務(wù)池pool中,stop函數(shù)用于停止并移除正在運行中的任務(wù)。
基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù)),項目實戰(zhàn)記錄,python,服務(wù)器,網(wǎng)絡(luò)
其具體實現(xiàn)代碼如下所示,其作為MySocket類中的一個成員屬性,每當MySocket接收到服務(wù)器信息創(chuàng)建任務(wù)ocrtask后都調(diào)用TaskPool.append(ocrtask)將任務(wù)添加到任務(wù)池中。由任務(wù)池管理任務(wù)的聲明周期,具體可見其append函數(shù)可以啟動task或終止task。remove_task線程會自動將已經(jīng)完成的任務(wù)移除。

#ocr任務(wù)線程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #刪除已經(jīng)結(jié)束的任務(wù)
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0: #生存時間為0,認為該任務(wù)已經(jīng)完成需要被刪除
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #終止任務(wù)
            self.stop(ocrtask)
        else:
            #啟動任務(wù)
            ocrtask.start()
            self.pool.append(ocrtask)

    #終止任務(wù)
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

3、具體任務(wù)線程

任務(wù)的實現(xiàn)代碼如下所示,其支持3中任務(wù)模式,使用state區(qū)分任務(wù),state為0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別(故而在state為2時將count設(shè)置的特別大)。這里以count控制任務(wù)的運行,任務(wù)每運行一次count減少1。當count小于等于0,則表示任務(wù)運行完成。在TaskPool的remove_task中檢測到count為0時則會自動刪除任務(wù)。

#ocr任務(wù)
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #獲取任務(wù)的生存時間
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削減count
        return now_count

    #停止任務(wù)
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任務(wù)
        pass

4、完整代碼與使用效果

完整代碼如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
 
def hex_to_bytes(hex_str):
    """
    :param hex_str: 16進制字符串
    :return: byte_data 字節(jié)流數(shù)據(jù)
    """
    bytes_data = bytes()
    while hex_str :
        """16進制字符串轉(zhuǎn)換為字節(jié)流"""
        temp = hex_str[0:2]
        s = int(temp, 16)
        bytes_data += struct.pack('B', s)
        hex_str = hex_str[2:]
    return bytes_data

# 讀取Yaml文件方法
def read_yaml(yaml_path):
    with open(yaml_path, encoding="utf-8", mode="r") as f:
        result = yaml.load(stream=f,Loader=yaml.FullLoader)
        return result

#ocr任務(wù)
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止識別,1-連續(xù)識別count張,2-持續(xù)識別
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #獲取任務(wù)的生存時間
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削減count
        return now_count

    #停止任務(wù)
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任務(wù)
        pass

#ocr任務(wù)線程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #刪除已經(jīng)結(jié)束的任務(wù)
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0:
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #終止任務(wù)
            self.stop(ocrtask)
        else:
            #啟動任務(wù)
            ocrtask.start()
            self.pool.append(ocrtask)

    #終止任務(wù)
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

#socket客戶端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.創(chuàng)建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客戶端開啟心跳維護
        # 2.準備連接服務(wù)器,建立連接
        self.serve_ip = config["serve_ip"]#當前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口當前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定時發(fā)送信息
    
    #通信線程-用于接收服務(wù)器的指令
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
            a=a.decode('utf-8')
            print("------主線程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    #檢測socket連接是否斷開
    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定時發(fā)送心跳信息--子線程
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json標注的模板
            json_data={  
                "timestamp": int(time.time()*10),#時間戳放大一位和格式要求的長度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #進行定時發(fā)送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#將發(fā)送的數(shù)據(jù)進行編碼
            # self.lock.release()
            try:
                #進行定時發(fā)送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服務(wù)端的信息,最大數(shù)據(jù)為1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服務(wù)器拒絕本次連接!?。。?!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            except TimeoutError:
                print('連接超時?。。。。?)
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
                print('智能終端無網(wǎng)絡(luò)連接?。。。?!')

    #發(fā)送信息
    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服務(wù)器掉線?。。。?!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        try:
            #進行定時發(fā)送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服務(wù)器拒絕本次連接?。。。?!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        except TimeoutError:
            print('連接超時?。。。?!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 連接服務(wù)器,建立連接,參數(shù)是元組形式
            print('智能終端無網(wǎng)絡(luò)連接?。。。?!')

if "__main__"==__name__:
    #進行定時通信測試
    config=read_yaml("config.yaml")
    socket_client=MySocket(config)
    socket_client.start()

使用效果如下所示,這里基于socket調(diào)試工具作為客戶端

基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù)),項目實戰(zhàn)記錄,python,服務(wù)器,網(wǎng)絡(luò)
基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù)),項目實戰(zhàn)記錄,python,服務(wù)器,網(wǎng)絡(luò)
基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù)),項目實戰(zhàn)記錄,python,服務(wù)器,網(wǎng)絡(luò)文章來源地址http://www.zghlxwxcb.cn/news/detail-835803.html

到了這里,關(guān)于基于python-socket構(gòu)建任務(wù)服務(wù)器(基于socket發(fā)送指令創(chuàng)建、停止任務(wù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【python】TCP socket服務(wù)器 Demo

    【python】TCP socket服務(wù)器 Demo

    目錄 一、單線程服務(wù)器 二、多線程服務(wù)器 三、多線程服務(wù)器(發(fā)送和接收分離) 說明:只能連接一個客戶端 客戶端測試結(jié)果: 服務(wù)端測試結(jié)果: ?說明:可以支持連接多個客戶端 客戶端測試結(jié)果:? 服務(wù)端測試結(jié)果: 說明:?可以支持連接多個客戶端,并且能夠做到和多

    2024年02月09日
    瀏覽(22)
  • python socket監(jiān)測服務(wù)器和客戶端連接狀態(tài)

    python socket監(jiān)測服務(wù)器和客戶端連接狀態(tài)

    服務(wù)器端和客戶端的連接狀態(tài),應(yīng)該是沒有單獨的函數(shù)返回或是接口監(jiān)測的,看網(wǎng)上很多資料說需要監(jiān)測心跳,這確實是一個普遍解決監(jiān)測狀態(tài)的辦法,但是對我的項目卻顯得有些被動,其實對一般的項目包括我的,用try…except就可以滿足,里面設(shè)置標志位,if判斷一下就可

    2024年02月13日
    瀏覽(19)
  • 【Java網(wǎng)絡(luò)編程】基于UDP-Socket 實現(xiàn)客戶端、服務(wù)器通信

    【Java網(wǎng)絡(luò)編程】基于UDP-Socket 實現(xiàn)客戶端、服務(wù)器通信

    ? 哈嘍,大家好~我是你們的老朋友: 保護小周??? 本期為大家?guī)淼氖蔷W(wǎng)絡(luò)編程的 UDP Socket 套接字,基于 UDP協(xié)議的 Socket 實現(xiàn)客戶端服務(wù)器通信 ,Socket 套接字可以理解為是,傳輸層給應(yīng)用層提供的一組 API,如此程序,確定不來看看嘛~~ 本期收錄于博主的專欄 : JavaEE_保

    2024年02月02日
    瀏覽(111)
  • C#上位機基礎(chǔ)學(xué)習(xí)_基于SOCKET實現(xiàn)與PLC服務(wù)器的TCP通信(一)

    C#上位機基礎(chǔ)學(xué)習(xí)_基于SOCKET實現(xiàn)與PLC服務(wù)器的TCP通信(一)

    測試軟件: TIA PORTAL V15.1 S7-PLCSIM ADVANCED V3.0 Visual Studio 2019 如下圖所示,打開S7-PLCSIM ADVANCED V3.0仿真軟件,新鍵一個實例,設(shè)置仿真PLC的IP地址等參數(shù),然后點擊Start激活PLC, 如下圖所示,激活PLC后,可以看到已經(jīng)存在一個實例, 如下圖所示,打開TIA PORTAL V15.1,新建一個項目,

    2023年04月15日
    瀏覽(21)
  • 基于PBS向超算服務(wù)器隊列提交任務(wù)的腳本模板與常用命令

    基于PBS向超算服務(wù)器隊列提交任務(wù)的腳本模板與常用命令

    ??本文介紹在 Linux 服務(wù)器中,通過 PBS (Portable Batch System)作業(yè)管理系統(tǒng)腳本的方式,提交任務(wù)到 服務(wù)器 隊列,并執(zhí)行任務(wù)的方法。 ??最近,需要在學(xué)校公用的超算中執(zhí)行代碼任務(wù);而和多數(shù)超算設(shè)備一樣,其也是需要通過作業(yè)隊列的方式,來提交、管理、排序不同用

    2024年04月12日
    瀏覽(18)
  • 基于k8s的web服務(wù)器構(gòu)建

    基于k8s的web服務(wù)器構(gòu)建

    項目描述/項目功能: 模擬企業(yè)里的k8s生產(chǎn)環(huán)境,部署web,nfs,harbor,Prometheus,granfa等應(yīng)用,構(gòu)建一個高可用高性能的web系統(tǒng),同時能監(jiān)控整個k8s集群的使用。 CentOS 7.9,ansible 2.9.27,Docker 2.6.0.0,Docker Compose 2.18.1,Kubernetes 1.20.6,Harbor 2.1.0,nfs v4,metrics-server 0.6.0,ingress-ngi

    2024年04月11日
    瀏覽(46)
  • 基于k8s的綜合的web服務(wù)器構(gòu)建

    基于k8s的綜合的web服務(wù)器構(gòu)建

    目錄 項目架構(gòu)圖: 項目環(huán)境: 項目描述: 項目步驟: ip規(guī)劃: 一.在三臺k8s機器上安裝部署好k8s,一臺作為master,兩臺node 安裝部署k8s node節(jié)點加入集群: master節(jié)點初始化: ?安裝Calico網(wǎng)絡(luò)插件: 二,部署nfs服務(wù),讓所有的web業(yè)務(wù)pod都取訪問,通過pv,pvc和卷掛載實現(xiàn) 1.搭建

    2024年04月13日
    瀏覽(24)
  • Python網(wǎng)絡(luò)編程實戰(zhàn):構(gòu)建TCP服務(wù)器與客戶端

    Python網(wǎng)絡(luò)編程實戰(zhàn):構(gòu)建TCP服務(wù)器與客戶端 在信息化時代,網(wǎng)絡(luò)編程是軟件開發(fā)中不可或缺的一部分。Python作為一種功能強大的編程語言,提供了豐富的網(wǎng)絡(luò)編程庫和工具,使得開發(fā)者能夠輕松構(gòu)建各種網(wǎng)絡(luò)應(yīng)用。本文將詳細介紹如何在Python中進行網(wǎng)絡(luò)編程,特別是如何使用

    2024年04月15日
    瀏覽(26)
  • 智能車上位機系統(tǒng),pyqt下的socket通信,python實現(xiàn)服務(wù)器+客戶端,文本+視頻不定長字節(jié)傳輸,超詳細,小白都能看懂

    智能車上位機系統(tǒng),pyqt下的socket通信,python實現(xiàn)服務(wù)器+客戶端,文本+視頻不定長字節(jié)傳輸,超詳細,小白都能看懂

    目錄 前言: 準備工作: 初級服務(wù)器端編寫: 中級服務(wù)器端編寫+客戶端收數(shù)據(jù)函數(shù)實現(xiàn): 數(shù)據(jù)包格式v1.0 客戶端收數(shù)據(jù)函數(shù)V1.0 客戶端分析1.0 ??? 本地測試:成功! ???? 兩臺主機測試1.0:失敗,視頻解析失敗,直接花屏了! 問題分析: 問題解決: 數(shù)據(jù)包格式V2.0 客戶端接

    2024年04月17日
    瀏覽(22)
  • 【spring authorization server系列教程】(一)入門系列,spring authorization server簡介??焖贅?gòu)建一個授權(quán)服務(wù)器(基于最新版本0.3.0)

    【spring authorization server系列教程】(一)入門系列,spring authorization server簡介??焖贅?gòu)建一個授權(quán)服務(wù)器(基于最新版本0.3.0)

    【spring authorization server系列教程】(一)入門系列,快速構(gòu)建一個授權(quán)服務(wù)器 spring authorization server是spring團隊最新的認證授權(quán)服務(wù)器,之前的oauth2后面會逐步棄用。不過到現(xiàn)在發(fā)文的時候,我看到官網(wǎng)已經(jīng)把之前oauth2倉庫廢棄了。 現(xiàn)在spring authorization server已經(jīng)到生產(chǎn)就緒階段了

    2024年02月05日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包