一 單接口
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import random
import datetime
import requests
import threading
import time
class Presstest(object):
headers = {
'Content-Type': 'application/json; charset=UTF-8'
}
def __init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
def login(self):
'''登陸獲取session'''
data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token'] = XToken
def testinterface(self):
'''壓測接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data = {"id": 418}
global ERROR_NUM
try:
print('開始調(diào)接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testonework(self):
'''一次并發(fā)處理單個(gè)任務(wù)'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.testinterface()
time.sleep(LOOP_SLEEP)
def run(self):
'''使用多線程進(jìn)程并發(fā)測試'''
t1 = time.time()
Threads = []
for i in range(THREAD_NUM):
t = threading.Thread(target=self.testonework, name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
print("===============壓測結(jié)果===================")
print("URL:", self.press_url)
print("任務(wù)數(shù)量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("總耗時(shí)(秒):", t2 - t1)
print("每次請求耗時(shí)(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承載請求數(shù):", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
print("錯(cuò)誤數(shù)量:", ERROR_NUM)
if __name__ == '__main__':
login_url = 'http://test/sale/login/login'
press_url = 'http://test/sale/hsOrder/afterExamineAdopt'
phone = "150000000"
password = "123456"
THREAD_NUM = 1 # 并發(fā)線程總數(shù)
ONE_WORKER_NUM = 5 # 每個(gè)線程的循環(huán)次數(shù)
LOOP_SLEEP = 0.1 # 每次請求時(shí)間間隔(秒)
ERROR_NUM = 0 # 出錯(cuò)數(shù)
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
二 多接口參數(shù)化(實(shí)現(xiàn)多接口參數(shù)化并發(fā),data和url必須一一對應(yīng),且THREAD_NUM并發(fā)線程數(shù)不能大于url_list長度)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import random
import datetime
import requests
import threading
import time
class Presstest(object):
headers = {
'Content-Type': 'application/json; charset=UTF-8'
}
def __init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
def login(self):
'''登陸獲取session'''
data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token'] = XToken
def testinterface(self):
'''壓測接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data = {"id": 418}
global ERROR_NUM
try:
print('開始調(diào)接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testinterface2(self, url, data):
'''壓測接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
# data = {"orderId": 1179, "cause": "讓人"}
global ERROR_NUM
try:
print('開始調(diào)接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
print("接口請求入?yún)rl,data==", url, data)
response = self.session.post(url,
data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testonework(self, url, data):
'''一次并發(fā)處理單個(gè)任務(wù)'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.testinterface2(url, data)
self.testinterface()
time.sleep(LOOP_SLEEP)
def run(self):
'''使用多線程進(jìn)程并發(fā)測試'''
t1 = time.time()
Threads = []
# 實(shí)現(xiàn)多接口參數(shù)化并發(fā),data和url必須一一對應(yīng),且THREAD_NUM并發(fā)線程數(shù)不能大于url_list長度
data_list = [{"orderId": 1194, "cause": "讓人"},
{"orderId": 1193, "cause": "讓人"},
{"orderId": 1192, "cause": "讓人"}]
url_list = ["http://test/api1",
"http://test/api2",
"http://test/api3"]
list_arr = list(range(0, len(data_list)))
print("index========", list_arr)
for i in range(THREAD_NUM):
index = random.choice(list_arr)
list_arr.remove(index)
t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
print("===============壓測結(jié)果===================")
print("URL:", self.press_url)
print("任務(wù)數(shù)量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("總耗時(shí)(秒):", t2 - t1)
print("每次請求耗時(shí)(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承載請求數(shù):", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
print("錯(cuò)誤數(shù)量:", ERROR_NUM)
if __name__ == '__main__':
login_url = 'http://test/login'
press_url = 'http://test/afterExamineAdopt'
phone = "1500000000"
password = "123456"
THREAD_NUM = 3 # 并發(fā)線程總數(shù)
ONE_WORKER_NUM = 1 # 每個(gè)線程的循環(huán)次數(shù)
LOOP_SLEEP = 0 # 每次請求時(shí)間間隔(秒)
ERROR_NUM = 0 # 出錯(cuò)數(shù)
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
三 多接口并發(fā)調(diào)用方法二
import datetime
import json
import requests
import threading
import time
def post_request(url, data):
start_time = time.time()
print('開始調(diào)接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = requests.post(url, data=json.dumps(data), headers={"Content-Type": "application/json;charset=UTF-8"})
print('調(diào)用結(jié)束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
end_time = time.time()
duration = end_time - start_time
print("Response from", url, ":", data, response.text)
print("Request duration:", duration * 1000, "ms")
data = [{"orderId": 1194, "cause": "讓人"},
{"orderId": 1193, "cause": "讓人"},
{"orderId": 1192, "cause": "讓人"}]
urls = ["http://test/api1",
"http://test/api2",
"http://test/api3"]
threads = []
for i in range(len(urls)):
t = threading.Thread(target=post_request, args=(urls[i], data[i]))
threads.append(t)
t.start()
for t in threads:
t.join()
四 多接口同時(shí)并發(fā)(相當(dāng)于集合點(diǎn))(異步實(shí)現(xiàn)集合點(diǎn))文章來源:http://www.zghlxwxcb.cn/news/detail-738112.html
import asyncio
import datetime
import aiohttp
async def make_request(session, url, data):
print('開始調(diào)接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
async with session.post(url, json=data) as response:
result = await response.json(content_type='text/html', encoding='utf-8')
return result
async def run_concurrent_requests(urls, datas, max_concurrent_requests):
headers = {
"Content-Type": "application/json;charset=UTF-8"
}
tasks = []
async with aiohttp.ClientSession() as session:
session.headers = headers
session.headers["token"] = "BEE010F2A6D0696DD90A82FF28B21AF2"
sem = asyncio.Semaphore(max_concurrent_requests)
list_arr = list(range(0, len(datas)))
for index in list_arr:
await sem.acquire()
task = asyncio.ensure_future(make_request(session, urls[index], datas[index]))
task.add_done_callback(lambda t: sem.release())
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
if __name__ == '__main__':
urls = ['http://test/SubmitSettlement'] * 5
datas = [{"idArr": [2086]}, {"idArr": [2090]}, {"idArr": [2089]}, {"idArr": [2086]}, {"idArr": [2090]}]
max_concurrent_requests = 5
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests))
print(responses)
文章來源地址http://www.zghlxwxcb.cn/news/detail-738112.html
到了這里,關(guān)于Python接口并發(fā)壓力測試(單接口,多接口參數(shù)化)+異步aiohttp的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!