對于非常大的比如上百G的大文件讀取,單線程讀是非常非常慢的,需要考慮用多線程讀,多個線程讀同一個文件時不用加鎖的,每個線程打開一個獨(dú)立的文件句柄
多線程讀同一個文件實(shí)現(xiàn)思路
思路1
- 先打開一個文件句柄,獲取整個文件大小
file_size
- 確定要采用線程讀取的部分大小
read_size
和多線程的個數(shù)thread_num
,算出平均每個線程要讀取的大小為read_size/thread_num=each_size
- 計算出每個線程讀取的位置
start_pos
和它下一個線程的讀取位置next_pos
- 對于每個線程來說,讀取時的情況可以有如下種情況:
-
start_pos
等于0(整個文件都采用多線程讀取),這種情況下直接用getline
讀取,直到讀取某一行后讀取指針位置超過next_pos
-
start_pos
>0, 讀取當(dāng)前位置所在的字符,如果字符恰好為\n
,則直接用getline
讀取,直到讀取某一行后讀取指針位置超過next_pos
-
start_pos
>0, 讀取當(dāng)前位置所在的字符,如果字符不為\n
,則先用getline
讀取一行,假設(shè)讀取這行后新的位置為cur_pos
,如果cur_pos >= next_pos
則這個線程直接退出,不讀取任何數(shù)據(jù),因為這個線程的下一個線程會和它讀取同一行,這一行的內(nèi)容應(yīng)該有下一個線程讀取; 如果cur_pos < next_pos
則當(dāng)前讀取的這一行直接丟棄(因為這一行交給了上一個線程來讀取), 直接從下一行開始用getline
讀取,直到讀取某一行后讀取指針位置超過next_pos
- 最后代碼還要計算剩下的部分,因為文件大小
read_size
不一定能整除線程個數(shù)thread_num
,剩下的部分應(yīng)該全部交給主線程來讀
這個思路實(shí)現(xiàn)起來容易出bug,需要保證每一個線程至少能讀取一個完整的行
源碼實(shí)現(xiàn)
可能有bug,但是功能基本實(shí)現(xiàn)
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/spdlog.h"
#include <chrono>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
using namespace std;
void init_log()
{
try
{
auto new_logger = spdlog::basic_logger_mt("new_default_logger", "test.log", true);
spdlog::set_default_logger(new_logger);
spdlog::info("new logger log start");
}
catch (const spdlog::spdlog_ex &ex)
{
std::cout << "Log init failed: " << ex.what() << std::endl;
}
}
void thread_read_file(int tid, const string &file_path, std::streampos start_pos, std::streampos next_pos, int each_size)
{
ifstream file(file_path.c_str(), ios::in);
if (!file.good())
{
file.close();
spdlog::info("線程{} 打開文件{}失敗", tid, file_path);
return;
}
file.seekg(start_pos, ios::beg);
//
string text;
if (start_pos != 0)
{
char cur_ch = 0;
// spdlog::info("讀取前{}", file.tellg());
file.read(&cur_ch, 1); //會讓指針向后移動一個字節(jié)
// spdlog::info("讀取后{}", file.tellg());
if (start_pos == 115)
{
spdlog::info("tid={},115={}", tid, cur_ch);
}
if (cur_ch != '\n')
{
getline(file, text);
spdlog::info("線程{},跳過{}", tid, text);
if (file.tellg() >= next_pos)
{
/*
1. 如果線程起始位置不為換行符,則要跳過本行,本行內(nèi)容交給上一個線程讀取,如果跳過本行后的讀取位置(一定是換行符)>=下一個線程的起始位置,
如果位置等于下一個線程起始位置,說明下個線程起始位置是換行符,下一行內(nèi)容應(yīng)該由下一個線程讀取;如果位置>下一個線程起始位置,同樣本行內(nèi)容由上一個線程
讀取,下一行內(nèi)容也不用本線程讀取,可能是下一個線程讀取
*/
spdlog::info("線程{} start_pos={},next_pos={},each_size={} 起始位置不是\\n,讀取一行后的指針位置{}>=next_pos,不需要讀取內(nèi)容",
tid, start_pos, next_pos, each_size, file.tellg());
file.close();
return;
}
}
else
{
file.seekg(-1, ios::cur);
}
// spdlog::info("線程{} cur_ch={}", tid, cur_ch);
}
std::streampos cur_pos = file.tellg();
while (cur_pos < next_pos && getline(file, text))
{
/*
1. cur_pos始終指向每一行的行尾,如果cur_pos=next_pos則說明next_pos是行尾,則接下來的一行應(yīng)該由
下一個線程讀,所以這里是cur_pos < next_pos,而不是cur_pos <= next_pos
*/
int cur_line_len = file.tellg() - cur_pos;
spdlog::info("線程{} start_pos={},next_pos={},each_size={},本行開始pos={},本行結(jié)束pos={},本行讀長={},text={}",
tid, start_pos, next_pos, each_size, cur_pos, file.tellg(), cur_line_len, text);
cur_pos = file.tellg();
}
spdlog::info("線程{} start_pos={},next_pos={},each_size={},結(jié)束時cur_pos={},總共區(qū)間長度為{}\n", tid, start_pos, next_pos, each_size, cur_pos, cur_pos - start_pos);
file.close();
return;
}
void test_detach(const string &file_path)
{
// for (int i = 0; i < 10; ++i)
// {
// std::thread th(thread_read_file, i, file_path);
// th.detach();
// }
}
void test_join(const string &file_path)
{
//確定文件長度
ifstream file(file_path.c_str(), ios::in);
//把指針指到文件末尾求出文件大小
int file_size = file.seekg(0, ios::end).tellg();
file.close();
int thread_nums = 50; //線程個數(shù)
int each_size = file_size / thread_nums; //平均每個線程讀取的字節(jié)數(shù)
std::streampos start_pos = 0, next_pos = 0; //每個線程讀取位置的起始和下一個線程讀取的起始位置
vector<std::thread> vec_threads; //線程列表
spdlog::info("thread_nums={},each_size={},file_size={}", thread_nums, each_size, file_size);
int t_id = 0; //線程id
for (; t_id < thread_nums; ++t_id)
{
next_pos += each_size;
std::thread th(thread_read_file, t_id, file_path, start_pos, next_pos, each_size);
vec_threads.emplace_back(std::move(th)); // push_back() is also OK
start_pos = next_pos;
}
if (file_size % thread_nums != 0)
{
thread_read_file(t_id, file_path, start_pos, file_size, each_size);
}
for (auto &it : vec_threads)
{
it.join();
}
}
int main()
{
init_log();
string file_path = "./1.txt";
// test_detach(file_path);
// std::this_thread::sleep_for(std::chrono::seconds(1)); // wait for detached threads done
test_join(file_path);
return 0;
}
思路2
- 整體思路和方法1一樣,只是讀取的時候不是按照位置來判斷每個線程應(yīng)該讀取多少,而是統(tǒng)計每個線程讀取的長度
- 每次移動位置指針時,記錄一下移動的位置,因為每個線程應(yīng)該讀取的平均長度已經(jīng)提前計算,只要線程讀取的數(shù)據(jù)超過了平均大小,或者讀取到了文件末尾就結(jié)束
源碼實(shí)現(xiàn)
沒有bug,可以適應(yīng)多個線程被分配到同一行的情況,但是每個線程讀取的大小必須>0
#include <chrono>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
using namespace std;
void thread_read_file(int tid, const string &file_path, int start_pos, int next_pos, int each_size)
{
ifstream file(file_path.c_str(), ios::in);
if (!file.good())
{
stringstream ss;
ss << "Thread " << tid << " failed to open file: " << file_path << endl;
cout << ss.str();
return;
}
file.seekg(start_pos, ios::beg);
//
string text;
stringstream ss;
if (start_pos != 0)
{
char cur_ch;
file.read(&cur_ch, 1);
// ss << "Thread " << tid << ", cur_ch=" << cur_ch << endl;
if (cur_ch != '\n')
{
getline(file, text);
}
}
while (getline(file, text) && start_pos <= next_pos)
{
ss << "Thread " << tid << ", start_pos=" << start_pos << ";next_pos="
<< next_pos << ";each_size=" << each_size << ": " << text << endl;
cout << ss.str();
start_pos = file.tellg();
}
file.close();
return;
}
void test_detach(const string &file_path)
{
// for (int i = 0; i < 10; ++i)
// {
// std::thread th(thread_read_file, i, file_path);
// th.detach();
// }
}
void test_join(const string &file_path)
{
//確定文件長度
ifstream file(file_path.c_str(), ios::in);
//把指針指到文件末尾求出文件大小
int file_size = file.seekg(0, ios::end).tellg();
file.close();
int thread_nums = 10;
int each_size = file_size / thread_nums;
int start_pos = 0, next_pos = 0;
vector<std::thread> vec_threads;
int t_id = 0;
for (; t_id < thread_nums; ++t_id)
{
next_pos += each_size;
std::thread th(thread_read_file, t_id, file_path, start_pos, next_pos, each_size);
vec_threads.emplace_back(std::move(th)); // push_back() is also OK
start_pos = next_pos;
}
if (file_size % thread_nums != 0)
{
thread_read_file(t_id, file_path, start_pos, next_pos, each_size);
}
for (auto &it : vec_threads)
{
it.join();
}
}
int main()
{
string file_path = "./1.txt";
// test_detach(file_path);
// std::this_thread::sleep_for(std::chrono::seconds(1)); // wait for detached threads done
test_join(file_path);
return 0;
}
運(yùn)行結(jié)果
文章來源:http://www.zghlxwxcb.cn/news/detail-837610.html
本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!文章來源地址http://www.zghlxwxcb.cn/news/detail-837610.html
到了這里,關(guān)于c++多線程按行讀取同一個每行長度不規(guī)則文件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!