對應課程視頻: 【計算機網(wǎng)絡】 斯坦福大學CS144課程
本節(jié)作為Lab Four的收尾,主要帶領各位來看看網(wǎng)絡交互的整體流程是怎樣的。
引言
這里以tcp_ipv4.cc文件為起點,來探究一下cs144是如何實現(xiàn)整個協(xié)議棧的。
首先,項目根路徑中的 tun.sh 會使用 ip tuntap 技術創(chuàng)建虛擬 Tun/Tap 網(wǎng)絡設備。這類接口僅能工作在內(nèi)核中。不同于普通的網(wǎng)絡接口,沒有物理硬件。這樣做的目的應該是為了模擬真實網(wǎng)絡環(huán)境下的網(wǎng)絡環(huán)境。
Tun/Tap簡介
關于Tun/Tap的介紹可以參考:
- 虛擬設備之TUN和TAP
- Linux官方內(nèi)核文檔: Tun/Tap驅(qū)動程序說明
TUN/TAP提供了用戶空間程序的數(shù)據(jù)包接收和傳輸功能。
它可以被視為一個簡單的點對點或以太網(wǎng)設備,不是從物理媒體接收數(shù)據(jù)包,而是從用戶空間程序接收數(shù)據(jù)包,并且不是通過物理媒體發(fā)送數(shù)據(jù)包,而是將數(shù)據(jù)包寫入用戶空間程序。
為了使用驅(qū)動程序,程序必須打開/dev/net/tun,并發(fā)出相應的ioctl()來向內(nèi)核注冊一個網(wǎng)絡設備。網(wǎng)絡設備將顯示為tunXX或tapXX,這取決于所選擇的選項。當程序關閉文件描述符時,網(wǎng)絡設備和所有相應的路由都將消失。
根據(jù)所選擇的設備類型,用戶空間程序必須讀取/寫入IP數(shù)據(jù)包(對于tun)或以太網(wǎng)幀(對于tap),使用哪種取決于ioctl()給定的標志。
- TUN 是一個虛擬網(wǎng)絡設備,它模擬的是一個三層設備,通過它可以處理來自網(wǎng)絡層的數(shù)據(jù)包,也就是 IP 數(shù)據(jù)包。由于它只模擬到了 IP 層,所以它無法與物理網(wǎng)卡做 bridge,也沒有 MAC 地址,但是可以通過三層交換的方式來與物理網(wǎng)卡相互通信。
- TAP 模擬的是一個二層設備,它比 TUN 更加深入,它可以處理數(shù)據(jù)鏈路層的數(shù)據(jù)包,擁有 MAC 地址,可以與物理網(wǎng)卡做 bridge,支持 MAC 層廣播,也可以給它設置 IP 地址。
tcp_ipv4.cc文件
當 Tun/Tap 網(wǎng)絡設備建立好后,接下來我們進入到 tcp_ipv4.cc 的main函數(shù)中:
int main(int argc, char **argv) {
try {
// 參數(shù)個數(shù)檢查: 第一個參數(shù)是編譯器傳入的程序名,然后是我們需要傳入的host和port
if (argc < 3) {
show_usage(argv[0], "ERROR: required arguments are missing.");
return EXIT_FAILURE;
}
// 解析參數(shù),獲取TCPConfig,FdAdapterConfig,當前啟動的模式(server or client) 和 選擇哪個網(wǎng)卡
auto [c_fsm, c_filt, listen, tun_dev_name] = get_config(argc, argv);
// 借助Tun/Tap實現(xiàn)一個虛擬網(wǎng)卡,該虛擬網(wǎng)絡設備實現(xiàn)到了IP層
// TunFD是tun設備的文件描述符
// TCPOverIPv4OverTunFdAdapter封裝從tun設備讀取和寫入IPV4數(shù)據(jù)報的操作
// LossyTCPOverIPv4OverTunFdAdapter采用裝飾器模式在前者基礎上,增加寫入時根據(jù)先前設置的丟包率隨機丟包的功能
// LossyTCPOverIPv4SpongeSocket 對上層提供一個標準Socket接口,進行調(diào)用
LossyTCPOverIPv4SpongeSocket tcp_socket(LossyTCPOverIPv4OverTunFdAdapter(
TCPOverIPv4OverTunFdAdapter(TunFD(tun_dev_name == nullptr ? TUN_DFLT : tun_dev_name))));
// 如果啟動的是server mode,那么在監(jiān)聽指定端口上的消息
if (listen) {
tcp_socket.listen_and_accept(c_fsm, c_filt);
} else {
// 如果啟動的是client mode,那么主動與對應server建立連接
tcp_socket.connect(c_fsm, c_filt);
}
// 鍵盤輸入的數(shù)據(jù)會寫入socket,socket有可讀的數(shù)據(jù)會輸出到屏幕上
bidirectional_stream_copy(tcp_socket);
// 同步等待直到_tcp_thread線程結束
tcp_socket.wait_until_closed();
} catch (const exception &e) {
cerr << "Exception: " << e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
配置信息初始化
下面給出get_config方法源碼解析,感興趣可以瞅兩眼:
//! Config for TCP sender and receiver
class TCPConfig {
public:
// 發(fā)送器和接收器緩沖區(qū)的默認容量。緩沖區(qū)容量指的是在給定時間內(nèi)可以存儲的最大數(shù)據(jù)量
static constexpr size_t DEFAULT_CAPACITY = 64000; //!< Default capacity
// tcp數(shù)據(jù)報中payload部分最大容量限制
static constexpr size_t MAX_PAYLOAD_SIZE = 1000; //!< Conservative max payload size for real Internet
// 默認的重傳超時時間,以毫秒為單位。
// 當TCP發(fā)送器向接收器傳輸數(shù)據(jù)時,它期望在規(guī)定的超時時間內(nèi)收到一個確認(ACK)。如果發(fā)送器在超時時間內(nèi)沒有收到確認,它會重新傳輸數(shù)據(jù)
static constexpr uint16_t TIMEOUT_DFLT = 1000; //!< Default re-transmit timeout is 1 second
// 數(shù)據(jù)包在放棄之前允許的最大重傳次數(shù)。如果發(fā)送器在經(jīng)過指定的重傳嘗試次數(shù)后仍未收到確認,它會認為連接不可靠并采取適當?shù)拇胧?/span>
static constexpr unsigned MAX_RETX_ATTEMPTS = 8; //!< Maximum re-transmit attempts before giving up
// 用于保存重傳超時的初始值,以毫秒為單位。它指定發(fā)送器在重新傳輸數(shù)據(jù)之前應等待ACK的時間
// 由于重傳超時時間會在網(wǎng)絡擁塞的時候動態(tài)增加,因此當重置超時重傳計數(shù)器時,需要將重傳超時時間恢復為初始值
uint16_t rt_timeout = TIMEOUT_DFLT; //!< Initial value of the retransmission timeout, in milliseconds
// 接收和發(fā)送緩沖區(qū)默認大小
size_t recv_capacity = DEFAULT_CAPACITY; //!< Receive capacity, in bytes
size_t send_capacity = DEFAULT_CAPACITY; //!< Sender capacity, in bytes
// 初始序列號,如果沒有設置,那么會采用隨機值策略
std::optional<WrappingInt32> fixed_isn{};
};
//! Config for classes derived from FdAdapter
class FdAdapterConfig {
public:
// 源ip地址和端口號
Address source{"0", 0}; //!< Source address and port
// 目的ip地址和端口號
Address destination{"0", 0}; //!< Destination address and port
// 下行丟包率,即從服務器發(fā)往客戶端的數(shù)據(jù)包丟失的概率
uint16_t loss_rate_dn = 0; //!< Downlink loss rate (for LossyFdAdapter)
// 上行丟包率,即從客戶端發(fā)往服務器的數(shù)據(jù)包丟失的概率
uint16_t loss_rate_up = 0; //!< Uplink loss rate (for LossyFdAdapter)
};
static tuple<TCPConfig, FdAdapterConfig, bool, char *> get_config(int argc, char **argv) {
TCPConfig c_fsm{};
FdAdapterConfig c_filt{};
char *tundev = nullptr;
int curr = 1;
bool listen = false;
// 如果我們不指定Host和Port,那么使用默認提供的ip地址和隨機端口號
string source_address = LOCAL_ADDRESS_DFLT;
string source_port = to_string(uint16_t(random_device()()));
// 判斷是否傳入了相關參數(shù),保留最后兩個host和port值
while (argc - curr > 2) {
// 打開server端的Listen模式
if (strncmp("-l", argv[curr], 3) == 0) {
listen = true;
curr += 1;
} else if (strncmp("-a", argv[curr], 3) == 0) {
// -a 用來指定自己的ip地址
check_argc(argc, argv, curr, "ERROR: -a requires one argument.");
source_address = argv[curr + 1];
curr += 2;
} else if (strncmp("-s", argv[curr], 3) == 0) {
// -s 用來指定自己的端口號
check_argc(argc, argv, curr, "ERROR: -s requires one argument.");
source_port = argv[curr + 1];
curr += 2;
} else if (strncmp("-w", argv[curr], 3) == 0) {
// -w 用來指定自己接收窗口大小
check_argc(argc, argv, curr, "ERROR: -w requires one argument.");
c_fsm.recv_capacity = strtol(argv[curr + 1], nullptr, 0);
curr += 2;
} else if (strncmp("-t", argv[curr], 3) == 0) {
// -t 指定RTO超時時間
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
c_fsm.rt_timeout = strtol(argv[curr + 1], nullptr, 0);
curr += 2;
} else if (strncmp("-d", argv[curr], 3) == 0) {
// -d 指定要連接的tundev也就是網(wǎng)卡
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
tundev = argv[curr + 1];
curr += 2;
} else if (strncmp("-Lu", argv[curr], 3) == 0) {
// -Lu 此選項設置上行丟包率,即從客戶端發(fā)往服務器的數(shù)據(jù)包丟失的概率
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateUpT = decltype(c_filt.loss_rate_up);
c_filt.loss_rate_up =
static_cast<LossRateUpT>(static_cast<float>(numeric_limits<LossRateUpT>::max()) * lossrate);
curr += 2;
} else if (strncmp("-Ld", argv[curr], 3) == 0) {
// -Ld 此選項設置下行丟包率,即從服務器發(fā)往客戶端的數(shù)據(jù)包丟失的概率
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateDnT = decltype(c_filt.loss_rate_dn);
c_filt.loss_rate_dn =
static_cast<LossRateDnT>(static_cast<float>(numeric_limits<LossRateDnT>::max()) * lossrate);
curr += 2;
} else if (strncmp("-h", argv[curr], 3) == 0) {
// -h 顯示提示信息
show_usage(argv[0], nullptr);
exit(0);
} else {
show_usage(argv[0], string("ERROR: unrecognized option " + string(argv[curr])).c_str());
exit(1);
}
}
// parse positional command-line arguments
// 是否打開了server端LISTEN模式
if (listen) {
// 說明當前啟動的是server端 --> 從參數(shù)中獲取監(jiān)聽端口號
// 將過濾器的源地址配置為 "0"(表示監(jiān)聽所有本地網(wǎng)絡接口的地址)
c_filt.source = {"0", argv[curr + 1]};
if (c_filt.source.port() == 0) {
show_usage(argv[0], "ERROR: listen port cannot be zero in server mode.");
exit(1);
}
} else {
// 說明當前啟動的是client端 -- 目的ip地址和端口號從最后兩個參數(shù)獲取
c_filt.destination = {argv[curr], argv[curr + 1]};
// 我們可以通過-a或者-s參數(shù)指定啟動的客戶端監(jiān)聽的ip地址和端口
c_filt.source = {source_address, source_port};
}
return make_tuple(c_fsm, c_filt, listen, tundev);
}
cs144實現(xiàn)的fd家族體系
main函數(shù)中會建立一個 TCPOverIPv4OverTunFdAdapter
。TunFd
指的是連接進 Tun 設備上的 socket :
TunFD具體應用可以看app/tun.cc :
int main() {
try {
TunFD tun("tun144");
while (true) {
auto buffer = tun.read();
cout << "\n\n***\n*** Got packet:\n***\n";
hexdump(buffer.data(), buffer.size());
IPv4Datagram ip_dgram;
cout << "attempting to parse as ipv4 datagram... ";
if (ip_dgram.parse(move(buffer)) != ParseResult::NoError) {
cout << "failed.\n";
continue;
}
cout << "success! totlen=" << ip_dgram.header().len << ", IPv4 header contents:\n";
cout << ip_dgram.header().to_string();
if (ip_dgram.header().proto != IPv4Header::PROTO_TCP) {
cout << "\nNot TCP, skipping.\n";
continue;
}
cout << "\nAttempting to parse as a TCP segment... ";
TCPSegment tcp_seg;
if (tcp_seg.parse(ip_dgram.payload(), ip_dgram.header().pseudo_cksum()) != ParseResult::NoError) {
cout << "failed.\n";
continue;
}
cout << "success! payload len=" << tcp_seg.payload().size() << ", TCP header contents:\n";
cout << tcp_seg.header().to_string() << endl;
}
} catch (const exception &e) {
cout << "Exception: " << e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
基于自定義fd體系進行數(shù)據(jù)讀寫的adapter適配器體系
TCPOverIPv4OverTunFdAdapter
是一個 IP 層面的封裝接口。當調(diào)用 adapter 向其寫入 TCP 報文段時,它會自動 wrap 上 IP 段并傳輸進網(wǎng)絡設備中;讀取也是亦然,會自動解除 IP 段并返回其內(nèi)部封裝的 TCP報文段:
// A FD adapter for IPv4 datagrams read from and written to a TUN device
class TCPOverIPv4OverTunFdAdapter : public TCPOverIPv4Adapter {
private:
TunFD _tun;
public:
//! Construct from a TunFD
explicit TCPOverIPv4OverTunFdAdapter(TunFD &&tun) : _tun(std::move(tun)) {}
//! Attempts to read and parse an IPv4 datagram containing a TCP segment related to the current connection
// 從tun設備返回的以太網(wǎng)幀中解析得到ip數(shù)據(jù)報
std::optional<TCPSegment> read() {
InternetDatagram ip_dgram;
if (ip_dgram.parse(_tun.read()) != ParseResult::NoError) {
return {};
}
// 去除ip頭,返回tcp報文段
return unwrap_tcp_in_ip(ip_dgram);
}
//! Creates an IPv4 datagram from a TCP segment and writes it to the TUN device
// 將寫入的tcp報文段添加上ip頭后寫入tun設備
void write(TCPSegment &seg) { _tun.write(wrap_tcp_in_ip(seg).serialize()); }
//! Access the underlying TUN device
operator TunFD &() { return _tun; }
//! Access the underlying TUN device
operator const TunFD &() const { return _tun; }
};
LossyTCPOverIPv4OverTunFdAdapter本身由模板類LossyFdAdapter實例化而來,該模板類通過裝飾器模式對內(nèi)部持有的Adapter進行功能增強,主要增加在讀寫數(shù)據(jù)時,根據(jù)先前設置丟包率來判斷是否丟棄此次的數(shù)據(jù)報:
template <typename AdapterT>
class LossyFdAdapter {
private:
//! Fast RNG used by _should_drop()
std::mt19937 _rand{get_random_generator()};
//! The underlying FD adapter
AdapterT _adapter;
...
bool _should_drop(bool uplink) {
const auto &cfg = _adapter.config();
const uint16_t loss = uplink ? cfg.loss_rate_up : cfg.loss_rate_dn;
return loss != 0 && uint16_t(_rand()) < loss;
}
//! \brief Read from the underlying AdapterT instance, potentially dropping the read datagram
//! \returns std::optional<TCPSegment> that is empty if the segment was dropped or if
//! the underlying AdapterT returned an empty value
std::optional<TCPSegment> read() {
auto ret = _adapter.read();
if (_should_drop(false)) {
return {};
}
return ret;
}
//! \brief Write to the underlying AdapterT instance, potentially dropping the datagram to be written
//! \param[in] seg is the packet to either write or drop
void write(TCPSegment &seg) {
if (_should_drop(true)) {
return;
}
return _adapter.write(seg);
}
...
};
自定義socket體系
cs144中封裝的Socket繼承體系如下所示:
socket的read/write接口都位于頂層FileDescriptor父類中:
//! \param[in] limit is the maximum number of bytes to read; fewer bytes may be returned
//! \param[out] str is the string to be read
// 通過系統(tǒng)調(diào)用read從fd對應的設備或者文件中讀取數(shù)據(jù)
void FileDescriptor::read(std::string &str, const size_t limit) {
constexpr size_t BUFFER_SIZE = 1024 * 1024; // maximum size of a read
const size_t size_to_read = min(BUFFER_SIZE, limit);
str.resize(size_to_read);
ssize_t bytes_read = SystemCall("read", ::read(fd_num(), str.data(), size_to_read));
if (limit > 0 && bytes_read == 0) {
_internal_fd->_eof = true;
}
if (bytes_read > static_cast<ssize_t>(size_to_read)) {
throw runtime_error("read() read more than requested");
}
str.resize(bytes_read);
register_read();
}
// 通過write系統(tǒng)調(diào)用向fd對應的設備或者文件中寫入數(shù)據(jù)
size_t FileDescriptor::write(BufferViewList buffer, const bool write_all) {
size_t total_bytes_written = 0;
do {
auto iovecs = buffer.as_iovecs();
const ssize_t bytes_written = SystemCall("writev", ::writev(fd_num(), iovecs.data(), iovecs.size()));
if (bytes_written == 0 and buffer.size() != 0) {
throw runtime_error("write returned 0 given non-empty input buffer");
}
if (bytes_written > ssize_t(buffer.size())) {
throw runtime_error("write wrote more than length of input buffer");
}
register_write();
buffer.remove_prefix(bytes_written);
total_bytes_written += bytes_written;
} while (write_all and buffer.size());
return total_bytes_written;
}
自定義事件循環(huán)EventLoop
cs144在Linux提供的多路復用模型Poll基礎上進行封裝,造出了一個簡易版本的事件循環(huán)機制EventLoop:
//! Waits for events on file descriptors and executes corresponding callbacks.
class EventLoop {
public:
// 對fd的讀事件還是寫事件感興趣
enum class Direction : short {
In = POLLIN,
Out = POLLOUT
};
private:
using CallbackT = std::function<void(void)>;
using InterestT = std::function<bool(void)>;
// 內(nèi)部類Rule,說白了就是持有用戶對哪個fd的那些事件感興趣的信息載體
// 同時持有對應事件發(fā)生和取消時的回調(diào)接口
class Rule {
public:
FileDescriptor fd;
Direction direction;
// 發(fā)生感興趣事件的時候回調(diào)該接口
CallbackT callback;
// 返回值決定當前fd是否需要被監(jiān)聽
InterestT interest;
// 當對應fd關閉,出錯時,回調(diào)該接口
CallbackT cancel;
// 根據(jù)direction的不同返回當前fd已經(jīng)被讀取或者寫入了多少次
unsigned int service_count() const;
};
// 用戶注冊的感興趣的事件集合
std::list<Rule> _rules{};
public:
// 事件監(jiān)聽的返回結果
enum class Result {
Success, // At least one Rule was triggered.
Timeout, // No rules were triggered before timeout.
Exit // All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_event.
};
// 用戶添加感興趣的事件
void add_rule(
const FileDescriptor &fd,
const Direction direction,
const CallbackT &callback,
const InterestT &interest = [] { return true; },
const CallbackT &cancel = [] {});
// 等待下一個感興趣的事件發(fā)生 --- 參數(shù)是等待超時時間
Result wait_next_event(const int timeout_ms);
};
- add_rule函數(shù): 注冊感興趣的事件
void EventLoop::add_rule(const FileDescriptor &fd,
const Direction direction,
const CallbackT &callback,
const InterestT &interest,
const CallbackT &cancel) {
_rules.push_back({fd.duplicate(), direction, callback, interest, cancel});
}
- service_count函數(shù): 當前fd已經(jīng)被讀取或者寫入了多少次
unsigned int EventLoop::Rule::service_count() const {
return direction == Direction::In ? fd.read_count() : fd.write_count();
}
- wait_next_event函數(shù): 等待獲取下一個發(fā)生的感興趣的事件
EventLoop::Result EventLoop::wait_next_event(const int timeout_ms) {
vector<pollfd> pollfds{};
pollfds.reserve(_rules.size());
bool something_to_poll = false;
// set up the pollfd for each rule
// 遍歷所有Rule
for (auto it = _rules.cbegin(); it != _rules.cend();) { // NOTE: it gets erased or incremented in loop body
const auto &this_rule = *it;
// 如果當前rule期望從fd中讀取數(shù)據(jù),并且此時fd已經(jīng)沒有數(shù)據(jù)可以讀取了,那么回調(diào)當前rule的cacel回調(diào)接口
// 并且將當前rule從已有的rule集合中移除
if (this_rule.direction == Direction::In && this_rule.fd.eof()) {
// no more reading on this rule, it's reached eof
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 如果當前fd關閉了,同上處理
if (this_rule.fd.closed()) {
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 判斷是否對當前rule感興趣,如果感興趣則加入pollfds進入下面事件輪詢階段
if (this_rule.interest()) {
// pollfd由三個屬性: 需要輪詢的fd,是對fd的可讀還是可寫事件感興趣,實際發(fā)生了什么事件
pollfds.push_back({this_rule.fd.fd_num(), static_cast<short>(this_rule.direction), 0});
something_to_poll = true;
} else {
// 為了保持 pollfds 數(shù)組和規(guī)則列表 _rules 中的規(guī)則一一對應,仍然需要將一個 pollfd 結構體添加到 pollfds 數(shù)組中
// 但是對應的事件設置為 0,表示不關注任何事件,相當于占位符
pollfds.push_back({this_rule.fd.fd_num(), 0, 0}); // placeholder --- we still want errors
}
++it;
}
// quit if there is nothing left to poll --- 沒有任何rule需要輪詢
if (not something_to_poll) {
return Result::Exit;
}
// call poll -- wait until one of the fds satisfies one of the rules (writeable/readable)
try {
// 通過調(diào)用poll對pollfds集合中所有pollfd開啟事件輪詢
// 最后一個參數(shù): 如果沒有感興趣事件發(fā)生,最多輪詢等待多久
if (0 == SystemCall("poll", ::poll(pollfds.data(), pollfds.size(), timeout_ms))) {
return Result::Timeout;
}
} catch (unix_error const &e) {
if (e.code().value() == EINTR) {
return Result::Exit;
}
}
// go through the poll results
// 遍歷poll結果 -- rules和pollfds集合索引是一一對應的
for (auto [it, idx] = make_pair(_rules.begin(), size_t(0)); it != _rules.end(); ++idx) {
const auto &this_pollfd = pollfds[idx];
// revents保存著實際發(fā)生的事件 -- 是否發(fā)生錯誤
const auto poll_error = static_cast<bool>(this_pollfd.revents & (POLLERR | POLLNVAL));
if (poll_error) {
throw runtime_error("EventLoop: error on polled file descriptor");
}
const auto &this_rule = *it;
// 獲取發(fā)生了哪些感興趣的事件
const auto poll_ready = static_cast<bool>(this_pollfd.revents & this_pollfd.events);
// 當描述符關閉時或者對端連接關閉時,會設置描述符掛起事件
const auto poll_hup = static_cast<bool>(this_pollfd.revents & POLLHUP);
// 如果當前描述符被掛起了,那么將當前rule移除
if (poll_hup && this_pollfd.events && !poll_ready) {
// if we asked for the status, and the _only_ condition was a hangup, this FD is defunct:
// - if it was POLLIN and nothing is readable, no more will ever be readable
// - if it was POLLOUT, it will not be writable again
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 如果存在感興趣的事件發(fā)生
if (poll_ready) {
// we only want to call callback if revents includes the event we asked for
const auto count_before = this_rule.service_count();
// 回調(diào)Rule對應的接口
this_rule.callback();
// only check for busy wait if we're not canceling or exiting
if (count_before == this_rule.service_count() and this_rule.interest()) {
throw runtime_error(
"EventLoop: busy wait detected: callback did not read/write fd and is still interested");
}
}
++it; // if we got here, it means we didn't call _rules.erase()
}
return Result::Success;
}
模板類TCPSpongeSocket詳解
TCPSpongeSocket本身是一個模板類,再該模板類基礎上衍生出大量實例化類型:
using TCPOverUDPSpongeSocket = TCPSpongeSocket<TCPOverUDPSocketAdapter>;
using TCPOverIPv4SpongeSocket = TCPSpongeSocket<TCPOverIPv4OverTunFdAdapter>;
using TCPOverIPv4OverEthernetSpongeSocket = TCPSpongeSocket<TCPOverIPv4OverEthernetAdapter>;
using LossyTCPOverUDPSpongeSocket = TCPSpongeSocket<LossyTCPOverUDPSocketAdapter>;
using LossyTCPOverIPv4SpongeSocket = TCPSpongeSocket<LossyTCPOverIPv4OverTunFdAdapter>;
TCPSpongeSocket類中重要的屬性如下所示:
//! Multithreaded wrapper around TCPConnection that approximates the Unix sockets API
template <typename AdaptT>
class TCPSpongeSocket : public LocalStreamSocket {
private:
//! Stream socket for reads and writes between owner and TCP thread
LocalStreamSocket _thread_data;
protected:
//! Adapter to underlying datagram socket (e.g., UDP or IP)
AdaptT _datagram_adapter;
private:
//! Set up the TCPConnection and the event loop
void _initialize_TCP(const TCPConfig &config);
//! TCP state machine -- Lab Four實現(xiàn)的
std::optional<TCPConnection> _tcp{};
//! eventloop that handles all the events (new inbound datagram, new outbound bytes, new inbound bytes)
// 事件循環(huán)機制 -- 參考Select和Epoll模型
EventLoop _eventloop{};
//! Process events while specified condition is true
void _tcp_loop(const std::function<bool()> &condition);
//! Main loop of TCPConnection thread
void _tcp_main();
//! Handle to the TCPConnection thread; owner thread calls join() in the destructor
std::thread _tcp_thread{};
//! Construct LocalStreamSocket fds from socket pair, initialize eventloop
TCPSpongeSocket(std::pair<FileDescriptor, FileDescriptor> data_socket_pair, AdaptT &&datagram_interface);
std::atomic_bool _abort{false}; //!< Flag used by the owner to force the TCPConnection thread to shut down
bool _inbound_shutdown{false}; //!< Has TCPSpongeSocket shut down the incoming data to the owner?
bool _outbound_shutdown{false}; //!< Has the owner shut down the outbound data to the TCP connection?
bool _fully_acked{false}; //!< Has the outbound data been fully acknowledged by the peer?
...
listen_and_accept方法
我們先來看一下TCPSpongeSocket類的listen_and_accept方法實現(xiàn),服務端會調(diào)用該方法進行端口監(jiān)聽:
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
if (_tcp) {
throw runtime_error("listen_and_accept() with TCPConnection already initialized");
}
// 初始化TCP連接和事件循環(huán)
_initialize_TCP(c_tcp);
_datagram_adapter.config_mut() = c_ad;
_datagram_adapter.set_listening(true);
cerr << "DEBUG: Listening for incoming connection...\n";
// 啟動tcp事件循環(huán),傳入的函數(shù)為condition,其返回值決定事件循環(huán)是否繼續(xù)
// 該事件循環(huán)只負責將連接建立起來,三次握手結束后,退出事件循環(huán) -- 事務循環(huán)函數(shù)解析下面會給出
_tcp_loop([&] {
const auto s = _tcp->state();
return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT);
});
cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
// _tcp_thread線程負責完成當前TCP連接后續(xù)數(shù)據(jù)傳輸,此時線程已經(jīng)啟動
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}
_tcp_main方法
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_main() {
try {
if (not _tcp.has_value()) {
throw runtime_error("no TCP");
}
// 開啟tcp事件循環(huán),不斷運行,直到TCP連接斷開
_tcp_loop([] { return true; });
// 關閉當前Socket
shutdown(SHUT_RDWR);
if (not _tcp.value().active()) {
cerr << "DEBUG: TCP connection finished "
<< (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
}
// 將optional里面保存的TCPConnection清空
_tcp.reset();
} catch (const exception &e) {
cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
throw e;
}
}
_initialize_TCP初始化Tcp連接和事件循環(huán)
_initialize_TCP負責初始化tcp連接和事件循環(huán):
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_initialize_TCP(const TCPConfig &config) {
// 將tcpConfig設置到TCPConnection中
_tcp.emplace(config);
// Set up the event loop
// There are four possible events to handle:
//
// 1) Incoming datagram received (needs to be given to
// TCPConnection::segment_received method)
//
// 2) Outbound bytes received from local application via a write()
// call (needs to be read from the local stream socket and
// given to TCPConnection::data_written method)
//
// 3) Incoming bytes reassembled by the TCPConnection
// (needs to be read from the inbound_stream and written
// to the local stream socket back to the application)
//
// 4) Outbound segment generated by TCP (needs to be
// given to underlying datagram socket)
// rule 1: read from filtered packet stream and dump into TCPConnection
// 監(jiān)聽網(wǎng)絡是否有數(shù)據(jù)報到達
_eventloop.add_rule(
// 監(jiān)聽的fd本質(zhì)是tun設備
_datagram_adapter,
Direction::In,
// 當感興趣事件發(fā)生時,會回調(diào)該接口
[&] {
// 從tun設備讀取數(shù)據(jù)
auto seg = _datagram_adapter.read();
// 交給TcpConnection進行處理
if (seg) {
_tcp->segment_received(move(seg.value()));
}
// debugging output:
if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " has been fully acknowledged.\n";
_fully_acked = true;
}
},
// 只要tcp連接還活躍,那么就繼續(xù)輪詢當前rule
[&] { return _tcp->active(); });
// rule 2: read from pipe into outbound buffer
// 監(jiān)聽應用程序是否有數(shù)據(jù)需要傳輸
_eventloop.add_rule(
// 監(jiān)聽_thread_data -- 豎立在應用程序和協(xié)議棧直接的數(shù)據(jù)傳輸通道
_thread_data,
Direction::In,
[&] {
// 應用程序向_thread_data中寫入數(shù)據(jù),然后通知協(xié)議棧有數(shù)據(jù)需要發(fā)送
// 根據(jù)tcp寫入窗口剩余空閑大小讀取指定的需要寫出的數(shù)據(jù)量
const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
const auto len = data.size();
// 調(diào)用TCPConnection的write方法進行寫出
const auto amount_written = _tcp->write(move(data));
if (amount_written != len) {
throw runtime_error("TCPConnection::write() accepted less than advertised length");
}
// 如果應用程序主動調(diào)用close關閉了_thread_data通道,那么tcp寫入通道也可以關閉了
if (_thread_data.eof()) {
_tcp->end_input_stream();
// 輸出通道關閉
_outbound_shutdown = true;
// debugging output:
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " finished (" << _tcp.value().bytes_in_flight() << " byte"
<< (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";
}
},
// 只要當前tcp連接還活躍并且輸出通道還沒有關閉并且當前tcp寫入窗口大小不為0,就繼續(xù)輪詢當前rule
[&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
// fd發(fā)生錯誤時,回調(diào)該接口
[&] {
_tcp->end_input_stream();
_outbound_shutdown = true;
});
// rule 3: read from inbound buffer into pipe
// 監(jiān)聽是否有按序到達的字節(jié)流還未寫入,同時_thread_data通道還未關閉,如果有則寫入_thread_data通道
_eventloop.add_rule(
// 監(jiān)聽thread_data
_thread_data,
// 關注可寫事件
Direction::Out,
[&] {
// 獲取tcp接收器的讀取流
ByteStream &inbound = _tcp->inbound_stream();
// Write from the inbound_stream into
// the pipe, handling the possibility of a partial
// write (i.e., only pop what was actually written).
// 一口氣把所有已經(jīng)按序達到的字節(jié)流全部讀取出來
const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
const std::string buffer = inbound.peek_output(amount_to_write);
// 將讀取出來的數(shù)據(jù)全部寫入_thread_data管道中
const auto bytes_written = _thread_data.write(move(buffer), false);
// 已經(jīng)成功被應用程序接收的字節(jié)流可以丟掉了
inbound.pop_output(bytes_written);
// 如果tcp進入四次揮手階段或者斷開連接了,那么關閉_thread_data管道
if (inbound.eof() or inbound.error()) {
_thread_data.shutdown(SHUT_WR);
_inbound_shutdown = true;
// debugging output:
cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
<< " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");
// 滿足下面這個條件說明目前此端為客戶端,并且進入了四次揮手的TIME_WAIT階段
if (_tcp.value().state() == TCPState::State::TIME_WAIT) {
cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
}
}
},
// 如果tcp接收器還存在按序到達的字節(jié)流沒有讀取,或者tcp_receiver還沒有接收到FIN包,那么就繼續(xù)輪詢當前rule
[&] {
return (not _tcp->inbound_stream().buffer_empty()) or
((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
});
// rule 4: read outbound segments from TCPConnection and send as datagrams
// 監(jiān)聽TCPConnection是否有數(shù)據(jù)需要發(fā)送,如果有則發(fā)送,前提是_datagram_adapter可寫
_eventloop.add_rule(
_datagram_adapter,
Direction::Out,
[&] {
// 如果TCPConnection的segments_out等待隊列不為空,說明存在待傳輸?shù)臄?shù)據(jù)包
while (not _tcp->segments_out().empty()) {
// 寫入segments_out,進行數(shù)據(jù)包的實際傳輸
_datagram_adapter.write(_tcp->segments_out().front());
_tcp->segments_out().pop();
}
},
// 只要segments_out不為空,就繼續(xù)輪詢當前rule
[&] { return not _tcp->segments_out().empty(); });
}
_tcp_loop函數(shù)啟動tcp事件循環(huán)
_tcp_loop函數(shù)啟動tcp事件循環(huán):
//! \param[in] condition is a function returning true if loop should continue
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
auto base_time = timestamp_ms();
// 什么時候停止事件循環(huán)取決于condition函數(shù)返回值
while (condition()) {
// 等待獲取下一個待發(fā)生的rule,超時則返回 -- 超時時間為10毫秒
auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
// 沒有事件發(fā)生,說明TCP斷開了連接
if (ret == EventLoop::Result::Exit or _abort) {
break;
}
// 如果tcp連接仍然活躍
if (_tcp.value().active()) {
// 每隔10毫秒,調(diào)用一次TCPConnection的tick方法
const auto next_time = timestamp_ms();
// 傳入?yún)?shù): 距離上次調(diào)用該方法過了多久
_tcp.value().tick(next_time - base_time);
// 只有TCPOverIPv4OverEthernetAdapter的tick函數(shù)才有意義 -- lab five會講解
// 其他adapter均為空實現(xiàn)
_datagram_adapter.tick(next_time - base_time);
base_time = next_time;
}
}
}
connect 方法
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
if (_tcp) {
throw runtime_error("connect() with TCPConnection already initialized");
}
// 初始化TCP連接和事件循環(huán)
_initialize_TCP(c_tcp);
_datagram_adapter.config_mut() = c_ad;
cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "...\n";
// 開始三次握手,首先由Client發(fā)出一個SYN包
_tcp->connect();
const TCPState expected_state = TCPState::State::SYN_SENT;
if (_tcp->state() != expected_state) {
throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +
expected_state.name());
}
// 使用事件循環(huán),等待三次連接建立完畢
_tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
cerr << "Successfully connected to " << c_ad.destination.to_string() << ".\n";
// 單獨開啟一個線程用于后續(xù)數(shù)據(jù)傳輸
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}
bidirectional_stream_copy方法
無論對于 Server 還是 Client,在三次握手之后,都會建立一個新的線程,來專門執(zhí)行 LossyTCPOverIPv4SpongeSocket
中的 eventloop。而主線程會另起一個 eventloop 以及另外開辟兩個緩沖區(qū),用于存放用戶寫入的數(shù)據(jù)與即將輸出至屏幕的數(shù)據(jù)。當用戶通過 stdin 輸入數(shù)據(jù)時, eventloop 中所注冊的 poll 事件被檢測到,則數(shù)據(jù)將會被寫入進本地輸入緩沖區(qū)中。當 TCPOverIPv4OverTunFdAdapter
可寫時,它會將本地輸入緩沖區(qū)中的數(shù)據(jù)全部寫入至 TCPOverIPv4OverTunFdAdapter
,并最終傳輸至遠程。
而 webget 與真實服務器通信的原理,也是通過將 IP 報文寫入 tun 虛擬網(wǎng)絡設備,將其注入進 OS 協(xié)議棧中,模擬實際的發(fā)包情況。
// 在標準輸入(stdin)和標準輸出(stdout)之間以及一個自定義的 socket 對象之間進行雙向數(shù)據(jù)復制
// 標準輸入 --> socket --> 標準輸出
// 鍵盤輸入的數(shù)據(jù)會寫入socket,socket有可讀的數(shù)據(jù)會輸出到屏幕上
void bidirectional_stream_copy(Socket &socket) {
constexpr size_t max_copy_length = 65536;
constexpr size_t buffer_size = 1048576;
EventLoop _eventloop{};
FileDescriptor _input{STDIN_FILENO};
FileDescriptor _output{STDOUT_FILENO};
ByteStream _outbound{buffer_size};
ByteStream _inbound{buffer_size};
bool _outbound_shutdown{false};
bool _inbound_shutdown{false};
socket.set_blocking(false);
_input.set_blocking(false);
_output.set_blocking(false);
// rule 1: read from stdin into outbound byte stream
// 標準輸入有數(shù)據(jù)可讀則寫入_outbound通道
_eventloop.add_rule(
_input,
Direction::In,
[&] {
_outbound.write(_input.read(_outbound.remaining_capacity()));
if (_input.eof()) {
_outbound.end_input();
}
},
[&] { return (not _outbound.error()) and (_outbound.remaining_capacity() > 0) and (not _inbound.error()); },
[&] { _outbound.end_input(); });
// rule 2: read from outbound byte stream into socket
// socket可寫,則將_outbound通道中數(shù)據(jù)寫入socket
_eventloop.add_rule(
socket,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _outbound.buffer_size());
const size_t bytes_written = socket.write(_outbound.peek_output(bytes_to_write), false);
_outbound.pop_output(bytes_written);
if (_outbound.eof()) {
socket.shutdown(SHUT_WR);
_outbound_shutdown = true;
}
},
[&] { return (not _outbound.buffer_empty()) or (_outbound.eof() and not _outbound_shutdown); },
[&] { _outbound.end_input(); });
// rule 3: read from socket into inbound byte stream
// socket有可讀數(shù)據(jù),則讀取數(shù)據(jù)并寫入_inbound通道
_eventloop.add_rule(
socket,
Direction::In,
[&] {
_inbound.write(socket.read(_inbound.remaining_capacity()));
if (socket.eof()) {
_inbound.end_input();
}
},
[&] { return (not _inbound.error()) and (_inbound.remaining_capacity() > 0) and (not _outbound.error()); },
[&] { _inbound.end_input(); });
// rule 4: read from inbound byte stream into stdout
// 如果標準輸出可寫,則將數(shù)據(jù)從_inbound中讀取出來,然后寫入標準輸出
_eventloop.add_rule(
_output,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _inbound.buffer_size());
const size_t bytes_written = _output.write(_inbound.peek_output(bytes_to_write), false);
_inbound.pop_output(bytes_written);
if (_inbound.eof()) {
_output.close();
_inbound_shutdown = true;
}
},
[&] { return (not _inbound.buffer_empty()) or (_inbound.eof() and not _inbound_shutdown); },
[&] { _inbound.end_input(); });
// loop until completion -- 死循環(huán),每次都阻塞到下一次事件發(fā)生
while (true) {
if (EventLoop::Result::Exit == _eventloop.wait_next_event(-1)) {
return;
}
}
}
TCPSpongeSocket的wait_until_closed方法
wait_until_closed方法負責同步等待直到_tcp_thread線程結束:
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::wait_until_closed() {
// 關閉當前socket
shutdown(SHUT_RDWR);
// 同步等待直到_tcp_thread線程結束
if (_tcp_thread.joinable()) {
cerr << "DEBUG: Waiting for clean shutdown... ";
_tcp_thread.join();
cerr << "done.\n";
}
}
通道串聯(lián)起子主線程
首先,我們來看一下TCPSpongeSocket的構造函數(shù)和析構函數(shù):
// socketpair系統(tǒng)調(diào)用的作用是在本地進程間創(chuàng)建一對已連接的套接字(sockets)。
// 這對套接字可用于本地通信,類似于網(wǎng)絡套接字的用法,但是不需要通過網(wǎng)絡協(xié)議棧進行通信,而是直接在內(nèi)核中完成通信,因此效率更高。
static inline pair<FileDescriptor, FileDescriptor> socket_pair_helper(const int type) {
int fds[2];
// 具體來說,socketpair創(chuàng)建了兩個相關聯(lián)的套接字,一個作為讀取套接字(reading socket),另一個作為寫入套接字(writing socket)。
// 這兩個套接字之間形成了一條雙向的通信通道,任何通過寫入套接字發(fā)送的數(shù)據(jù)都可以通過讀取套接字接收,并且反之亦然。
SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast<int *>(fds)));
return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};
}
//! \param[in] datagram_interface is the underlying interface (e.g. to UDP, IP, or Ethernet)
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(AdaptT &&datagram_interface)
: TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(pair<FileDescriptor, FileDescriptor> data_socket_pair,
AdaptT &&datagram_interface)
// 主線程拿著通道一端
: LocalStreamSocket(move(data_socket_pair.first))
// 子線程拿著通道的另一端
, _thread_data(move(data_socket_pair.second))
, _datagram_adapter(move(datagram_interface)) {
_thread_data.set_blocking(false);
}
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::~TCPSpongeSocket() {
try {
if (_tcp_thread.joinable()) {
cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";
// force the other side to exit
_abort.store(true);
_tcp_thread.join();
}
} catch (const exception &e) {
cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;
}
}
主線程和子線程通過socketpair系統(tǒng)調(diào)用創(chuàng)建的一對已連接的套接字(sockets)進行本地通信。
- 主線程中發(fā)生鍵盤輸入事件,到輸入的內(nèi)容通過socktpair創(chuàng)建的雙向通道傳輸?shù)阶泳€程,然后由子線程將數(shù)據(jù)最終通過tun設備發(fā)送出去,這中間結合了兩個eventloop共同協(xié)作完成
文章來源:http://www.zghlxwxcb.cn/news/detail-759229.html
- 當tun設備接收到網(wǎng)絡數(shù)據(jù)包的時候,會將數(shù)據(jù)包傳輸給TCP協(xié)議棧進行處理,TCP協(xié)議棧處理完后,如果發(fā)現(xiàn)_thread_data雙向通道可寫,則將處理完畢的數(shù)據(jù)包丟到通道中,主線程中的Socket發(fā)現(xiàn)來數(shù)據(jù)了,將數(shù)據(jù)寫入_inbound通道中,此時發(fā)現(xiàn)標準輸出可寫,最終將接收到的數(shù)據(jù)包輸出到屏幕上
- 這中間同樣結合了兩個eventloop共同協(xié)作工作,大家可以好好理解一下
- 這中間同樣結合了兩個eventloop共同協(xié)作工作,大家可以好好理解一下
小結
以上就是我個人對cs144 Lab Four測試文件tcp_ipv4.cc文件大體流程的理解,可能會存在錯誤,歡迎各位大佬評論區(qū)指出,同時由于篇幅有限,不能將所有源碼一一貼出講解,所以閱讀過程中大家可以對照cs144 lab four相關源碼進行學習文章來源地址http://www.zghlxwxcb.cn/news/detail-759229.html
到了這里,關于CS 144 Lab Four 收尾 -- 網(wǎng)絡交互全流程解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!