背景
現(xiàn)有一個百萬行數(shù)據(jù)的csv格式文件,需要在兩分鐘之內(nèi)存入數(shù)據(jù)庫。
方案
方案一:多線程+協(xié)程+異步MySql
方案二:多線程+MySql批量插入
代碼
1,先通過pandas讀取所有csv數(shù)據(jù)存入列表。
2,設置N個線程,將一百萬數(shù)據(jù)均分為N份,以start,end傳遞給線程以切片的方法讀取區(qū)間數(shù)據(jù)(建議為16個線程)
3,方案二 線程內(nèi)以 executemany 方法批量插入所有數(shù)據(jù)。
4,方案一 線程內(nèi)使用異步事件循環(huán)遍歷所有數(shù)據(jù)異步插入。
5,方案一純屬沒事找事型。
方案二文章來源:http://www.zghlxwxcb.cn/news/detail-515875.html
import threading
import pandas as pd
import asyncio
import time
import aiomysql
import pymysql
data=[]
error_data=[]
def run(start,end):
global data
global error_data
print("start"+threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
mysdb = getDb("*", *, "*", "*", "*")
cursor = mysdb.cursor()
sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
cursor.executemany(sql,data[start:end])
mysdb.commit()
mysdb.close()
print("end" + threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def csv_file_read_use_pd(csvFile):
csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
csv_result = csv_result.fillna(value="None")
result = csv_result.values.tolist()
return result
class MyDataBase:
def __init__(self,host=None,port=None,username=None,password=None,database=None):
self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
def close(self):
self.db.close()
def getDb(host,port,username,password,database):
MyDb = MyDataBase(host, port, username, password,database)
return MyDb.db
def main(csvFile):
global data #獲取全局對象 csv全量數(shù)據(jù)
#讀取所有的數(shù)據(jù) 將所有數(shù)據(jù)均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行
thread_lens=20
csv_result=csv_file_read_use_pd(csvFile)
day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
for item in csv_result:
item.insert(0,day)
data=csv_result
thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間
csv_lens=len(csv_result)
avg = csv_lens // thread_lens
remainder=csv_lens % thread_lens
# 0,27517 27517,55,034
nowIndex=0
for i in range(thread_lens):
temp=[nowIndex,nowIndex+avg]
nowIndex=nowIndex+avg
thread_exe_count_list.append(temp)
thread_exe_count_list[-1:][0][1]+=remainder #余數(shù)分給最后一個線程
# print(thread_exe_count_list)
#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
for i in range(thread_lens):
sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
sub_thread.start()
sub_thread.join()
time.sleep(3)
if __name__=="__main__":
#csv_file_read_use_pd("分公司箱型箱量.csv")
main("分公司箱型箱量.csv")
方案一文章來源地址http://www.zghlxwxcb.cn/news/detail-515875.html
import threading
import pandas as pd
import asyncio
import time
import aiomysql
data=[]
error_data=[]
async def async_basic(loop,start,end):
global data
global error_data
print("start"+threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
conn = await aiomysql.connect(
host="*",
port=*,
user="*",
password="*",
db="*",
loop=loop
)
day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
async with conn.cursor() as cursor:
for item in data[start:end]:
params=[day]
params.extend(item)
try:
x=await cursor.execute(sql,params)
if x==0:
error_data.append(item)
print(threading.current_thread().name+" result "+str(x))
except Exception as e:
print(e)
error_data.append(item)
time.sleep(10)
pass
await conn.close()
#await conn.commit()
#關閉連接池
# pool.close()
# await pool.wait_closed()
print("end" + threading.current_thread().name)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def csv_file_read_use_pd(csvFile):
csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
csv_result = csv_result.fillna(value="None")
result = csv_result.values.tolist()
return result
def th(start,end):
loop = asyncio.new_event_loop()
loop.run_until_complete(async_basic(loop,start,end))
def main(csvFile):
global data #獲取全局對象 csv全量數(shù)據(jù)
#讀取所有的數(shù)據(jù) 將所有數(shù)據(jù)均分成 thread_lens 份 分發(fā)給 thread_lens 個線程去執(zhí)行
thread_lens=20
csv_result=csv_file_read_use_pd(csvFile)
data=csv_result
thread_exe_count_list=[] #線程需要執(zhí)行的區(qū)間
csv_lens=len(csv_result)
avg = csv_lens // thread_lens
remainder=csv_lens % thread_lens
# 0,27517 27517,55,034
nowIndex=0
for i in range(thread_lens):
temp=[nowIndex,nowIndex+avg]
nowIndex=nowIndex+avg
thread_exe_count_list.append(temp)
thread_exe_count_list[-1:][0][1]+=remainder #余數(shù)分給最后一個線程
print(thread_exe_count_list)
#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
for i in range(thread_lens):
sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
sub_thread.start()
time.sleep(3)
if __name__=="__main__":
#csv_file_read_use_pd("分公司箱型箱量.csv")
main("分公司箱型箱量.csv")
到了這里,關于Python 大批量寫入數(shù)據(jù) 百萬級別的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!