原文:https://blog.mickeyzzc.tech/posts/ebpf/deepflow-agent-proto-dev
MongoDB 目前使用廣泛,但是缺乏有效的可觀測能力。DeepFlow 在可觀測能力上是很優(yōu)秀的解決方案,但是卻缺少了對 MongoDB 協(xié)議的支持。該文是為 DeepFlow 擴展了 MongoDB 協(xié)議解析,增強 MongoDB 生態(tài)的可觀測能力,簡要描述了從協(xié)議文檔分析到在 DeepFlow 內(nèi)實現(xiàn)代碼解析的過程拆解。
0x0: 如何分析一個協(xié)議(MongoDB)
協(xié)議文檔的分析思路
首先要從官方網(wǎng)站找到協(xié)議解析的文檔,在協(xié)議文檔《mongodb-wire-protocol#standard-message-header》中,可以看到 MongoDB 的協(xié)議頭結(jié)構(gòu)體描述如下:
struct MsgHeader {
int32 messageLength; // total message size, including this
int32 requestID; // identifier for this message
int32 responseTo; // requestID from the original request
// (used in responses from the database)
int32 opCode; // message type
}
上述結(jié)構(gòu)代碼理解為下圖所示:
??注意,在協(xié)議文檔《mongodb-wire-protocol》有一段說明,MongoDB 協(xié)議是用了字節(jié)小端順序:
Byte Ordering
All integers in the MongoDB wire protocol use little-endian byte order: that is, least-significant
接下來從實際的抓包看一下實際的數(shù)據(jù)是長什么樣子的:
0000 a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00
0010 00 00 00 00 00 8e 00 00 00 01 6f 6b 00 00 00 00
0020 00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030 69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040 75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050 6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060 1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070 00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080 7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090 de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0 00 00 00
上述的抓包數(shù)據(jù)簡單拆解到如下信息:
- 字段
messageLength
為a3 00 00 00
:即 消息長度為a3
- 字段
requestID
為0a 50 88 48
:即 請求ID為4888500a
- 字段
responseTo
為23 00 00 00
:即 對ID為23
的響應(yīng) - 字段
opCode
為dd 07 00 00
:即 命令號為7dd
,十進制是2013
,對應(yīng)協(xié)議文檔中的OP_MSG
指令
MongoDB 協(xié)議操作碼說明表
操作碼名稱 | 操作碼 | 操作碼說明 | 額外說明 |
---|---|---|---|
OP_COMPRESSED | 2012 | 使用壓縮 | |
OP_MSG | 2013 | Send a message using the standard format. Used for both client requests and database replies. | |
OP_REPLY | 1 | 通過responseTo指定響應(yīng)客戶端請求。 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.| |
OP_UPDATE | 2001 | 更新文檔 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_INSERT | 2002 | 插入文檔 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
RESERVED | 2003 | 略 | |
OP_QUERY | 2004 | 查詢文檔 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_GET_MORE | 2005 | 略 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_DELETE | 2006 | 刪除文檔 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_KILL_CURSORS | 2007 | 略 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
對最常見的操作碼 OP_MSG
分析
從協(xié)議文檔 《mongodb-wire-protocol#op_msg》 查看 OP_MSG
的結(jié)構(gòu)體:
OP_MSG {
MsgHeader header; // standard message header
uint32 flagBits; // message flags
Sections[] sections; // data sections
optional<uint32> checksum; // optional CRC-32C checksum
}
OP_MSG
需要關(guān)注的解碼內(nèi)容在 Sections
,只需要判斷 kind
為 0
和 1
的情況,其中:
- 0:后面直接用
BSON
解碼 - 1:先偏移
int32
和c_string
占用的byte
后,用BSON
解碼后面的內(nèi)容
從實際抓包看一下原始數(shù)據(jù)。如下所示,MongoDB協(xié)議的操作碼 OP_MSG
內(nèi)容從第十六(從0開始數(shù),后續(xù)文檔統(tǒng)一按此規(guī)律)字節(jié)開始:
0000 a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00
0010 00 00 00 00
00
8e 00 00 00 01 6f 6b 00 00 00 00
0020 00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030 69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040 75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050 6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060 1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070 00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080 7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090 de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0 00 00 00
不需要關(guān)心字段 flagBits
,偏移4個字節(jié)后從第四個字節(jié)判斷字段 kind
類型。由此判斷后面為 BSON
結(jié)構(gòu)數(shù)據(jù)。
到這里我們已經(jīng)基本了解到 MongoDB 協(xié)議的數(shù)據(jù)結(jié)構(gòu)和解碼思路了,接下來我們開始在 DeepFlow Agent 中嘗試實現(xiàn)解碼觀察。
0x1: 在 DeepFlow Agent 擴展一個協(xié)議解析采集
DeepFlow Agent 的開發(fā)文檔概要
前提, DeepFlow Agent 的原生開發(fā)需要掌握 Rust 語言的基礎(chǔ)開發(fā)能力。
接下來先參考官方文檔《HOW_TO_SUPPORT_YOUR_PROTOCOL_CN》了解幾個關(guān)鍵信息:
-
L7Protocol
用于標識協(xié)議常量
源碼位置:deepflow/agent/crates/public/src/l7_protocol.rs
-
L7ProtocolParser
主要用于協(xié)議判斷和解析出L7ProtocolInfo
(七層協(xié)議的基礎(chǔ)結(jié)構(gòu)信息)
源碼位置:deepflow/agent/src/common/l7_protocol_log.rs
-
L7ProtocolInfo
由L7ProtocolParser
解析出來,并且用于后續(xù)會話聚合
源碼位置:deepflow/agent/src/common/l7_protocol_info.rs
-
L7ProtocolInfoInterface
七層協(xié)議結(jié)構(gòu)L7ProtocolInfo
都需要實現(xiàn)這個接口來處理特征邏輯
源碼位置:deepflow/agent/src/common/l7_protocol_info.rs
-
L7ProtocolSendLog
統(tǒng)一發(fā)送到deepflow-server
的結(jié)構(gòu)
源碼位置:deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs
在 DeepFlow Agent 中開發(fā)的大致步驟:
- 在
deepflow/agent/crates/public/src/l7_protocol.rs
添加對應(yīng)協(xié)議名稱和協(xié)議號。 -
L7ProtocolParser::parse_payload()
需要返回L7ProtocolInfo
,所以需要先定義一個結(jié)構(gòu),實現(xiàn)L7ProtocolInfoInterface
接口并且添加到L7ProtocolInfo
這個枚舉。 - 實現(xiàn)
L7ProtocolParserInterface
接口,并添加到deepflow/agent/src/common/l7_protocol_log.rs
中的impl_protocol_parser!
宏。 - 在
deepflow-server
中只需增加一個常量用于搜索提示即可。
代碼指引
-
定義一個協(xié)議,并用一個常量標識:
源碼位置:
deepflow/agent/crates/public/src/l7_protocol.rs
,DeepFlow Agent 通過遍歷所有支持協(xié)議判斷一個流的應(yīng)用層協(xié)議。
這里說明一下,由于業(yè)界的通用應(yīng)用協(xié)議沒有一個約束字段來定義應(yīng)用協(xié)議類型,所以在大量網(wǎng)絡(luò)包是通過遍歷已知協(xié)議解碼邏輯來判斷應(yīng)用層協(xié)議的。pub enum L7Protocol { #[num_enum(default)] Unknown = 0, Other = 1, // HTTP Http1 = 20, Http2 = 21, Http1TLS = 22, Http2TLS = 23, // RPC Dubbo = 40, Grpc = 41, SofaRPC = 43, FastCGI = 44, // SQL MySQL = 60, PostgreSQL = 61, // NoSQL Redis = 80, + MongoDB = 81, // MQ Kafka = 100, MQTT = 101, // INFRA DNS = 120, Custom = 127, Max = 255, }
impl From<String> for L7Protocol { fn from(l7_protocol_str: String) -> Self { let l7_protocol_str = l7_protocol_str.to_lowercase(); match l7_protocol_str.as_str() { "http" | "https" => Self::Http1, "dubbo" => Self::Dubbo, "grpc" => Self::Grpc, "fastcgi" => Self::FastCGI, "custom" => Self::Custom, "sofarpc" => Self::SofaRPC, "mysql" => Self::MySQL, + "mongodb" => Self::MongoDB, "postgresql" => Self::PostgreSQL, "redis" => Self::Redis, "kafka" => Self::Kafka, "mqtt" => Self::MQTT, "dns" => Self::DNS, _ => Self::Unknown, } } }
-
為新協(xié)議準備解析邏輯
定義結(jié)構(gòu)體:
在deepflow/agent/src/flow_generator/protocol_logs/
該路徑下找一個目錄建立相關(guān)的協(xié)議解析邏輯代碼文件,該案例的代碼文件放在上述目錄下的sql/mongo.rs
。pub struct MongoDBInfo { msg_type: LogMessageType, #[serde(rename = "req_len")] pub req_len: u32, #[serde(rename = "resp_len")] pub resp_len: u32, 參考“deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs” // 準備要處理的結(jié)構(gòu)體。 // 其中“request_id”、“response_id”、“op_code”和“op_code_name”是 // 從mongodb header解析出來的關(guān)鍵信息。 #[serde(rename = "request_id")] pub request_id: u32, #[serde(rename = "response_id")] pub response_id: u32, #[serde(rename = "op_code")] pub op_code: u32, #[serde(skip)] pub op_code_name: String, “request”、“response”和“response_code”是 // 從mongodb協(xié)議主體內(nèi)容解析出來的所需信息。 #[serde(rename = "request_resource"] pub request: String, #[serde(skip)] pub response: String, #[serde(rename = "response_code"] pub response_code: i32, #[serde(rename = "response_status")] pub status: L7ResponseStatus, }
-
實現(xiàn)
L7ProtocolParserInterface
-
先看源碼結(jié)構(gòu)邏輯(以下只顯示需處理函數(shù),不需處理的保留默認邏輯即可)
#[enum_dispatch] pub trait L7ProtocolParserInterface { fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool; // 協(xié)議解析 fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult>; // 返回協(xié)議號和協(xié)議名稱,由于的bitmap使用u128,所以協(xié)議號不能超過128. // 其中 crates/public/src/l7_protocol.rs 里面的 pub const L7_PROTOCOL_xxx 是已實現(xiàn)的協(xié)議號. // =========================================================================================== // return protocol number and protocol string. because of bitmap use u128, so the max protocol number can not exceed 128 // crates/public/src/l7_protocol.rs, pub const L7_PROTOCOL_xxx is the implemented protocol. fn protocol(&self) -> L7Protocol; // l4是tcp時是否解析,用于快速過濾協(xié)議 // ============================== // whether l4 is parsed when tcp, use for quickly protocol filter fn parsable_on_tcp(&self) -> bool { true } // l4是udp是是否解析,用于快速過濾協(xié)議 // ============================== // whether l4 is parsed when udp, use for quickly protocol filter fn parsable_on_udp(&self) -> bool { true } // return perf data fn perf_stats(&mut self) -> Option<L7PerfStats>; }
-
解碼協(xié)議的第一步是如何識別協(xié)議,代碼中需處理
L7ProtocolParserInterface::check_payload()
邏輯 -
定義
MongoDB
協(xié)議頭并解碼// 定義MongoDB協(xié)議頭結(jié)構(gòu)體,并對必要信息字段一一解碼 #[derive(Clone, Debug, Default, Serialize)] pub struct MongoDBHeader { length: u32, request_id: u32, response_to: u32, op_code: u32, op_code_name: String, } impl MongoDBHeader { fn decode(&mut self, payload: &[u8]) -> isize { // 對payload前16位以MongoDBHeader結(jié)構(gòu)解碼,判斷是否符合MongoDB的協(xié)議 } fn is_request(&self) -> bool { // 解碼op_code判斷是否request } pub fn get_op_str(&self) -> &'static str { // 解碼op_code出對應(yīng)文本描述 } }
-
在
L7ProtocolParserInterface::check_payload()
調(diào)用MongoDB
協(xié)議頭解碼邏輯
在此過程,把protocol(&self)
和parsable_on_udp(&self)
也一并處理。impl L7ProtocolParserInterface for MongoDBLog { fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool { let mut header = MongoDBHeader::default(); header.decode(payload); return header.is_request(); } fn protocol(&self) -> L7Protocol { L7Protocol::MongoDB } // udp協(xié)議的跳過解碼 fn parsable_on_udp(&self) -> bool {false} }
-
第一步的效果展示
到這一步的解碼將會得到如下展示效果,接下來還需要對具體的協(xié)議操作碼做進一步解碼。 -
解碼協(xié)議的第二步是對關(guān)鍵指令定義結(jié)構(gòu)體和解碼接口邏輯實現(xiàn),對應(yīng)處理是
L7ProtocolParserInterface::parse_payload()
代碼實現(xiàn),這里以OP_MSG
為例 -
定義
OP_MSG
操作碼的結(jié)構(gòu)體并解碼#[derive(Clone, Debug, Default, Serialize)] pub struct MongoOpMsg { flag: u32, sections: Sections, checksum: Option<u32>, } impl MongoOpMsg { fn decode(&mut self, payload: &[u8]) -> Result<bool> { // 略過偏移邏輯 let _ = sections.decode(&payload); self.sections = sections; Ok(true) } }
-
對
OP_MSG
操作碼中業(yè)務(wù)需要關(guān)注的字段Sections
做進一步解碼#[derive(Clone, Debug, Default, Serialize)] struct Sections { kind: u8, kind_name: String, // kind: 0 mean doc doc: Document, // kind: 1 mean body size: Option<i32>, c_string: Option<String>, } impl Sections { pub fn decode(&mut self, payload: &[u8]) -> Result<bool> { match self.kind { 0 => {// Body} 1 => {// Doc} 2 => {// Internal} _ => {// Unknown} } Ok(true) } }
-
處理
L7ProtocolParserInterface::parse_payload
,返回L7ProtocolInfo
#[derive(Clone, Debug, Default, Serialize)] pub struct MongoDBLog { info: MongoDBInfo, #[serde(skip)] perf_stats: Option<L7PerfStats>, } impl L7ProtocolParserInterface for MongoDBLog { fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> { let mut info = MongoDBInfo::default(); self.parse(payload, param.l4_protocol, param.direction, &mut info)?; // 解碼得到L7ProtocolInfo } } impl MongoDBLog { fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,)-> Result<bool> { // 解碼指令獲取請求和響應(yīng)等信息} // command decode match info.op_code { _OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => { // OP_MSG let mut msg_body = MongoOpMsg::default(); // TODO: Message Flags msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?; } } } }
-
為
MongoDBInfo
實現(xiàn)L7ProtocolInfoInterface
impl L7ProtocolInfoInterface for MongoDBInfo { fn session_id(&self) -> Option<u32> { // 這里返回流標識id,例如 http2 返回 streamid,dns 返回 transaction id,如果沒有就返回 None } fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> { // 這里的self必定是請求,other必定是響應(yīng) if let L7ProtocolInfo::MongoDBInfo(other) = other { self.merge(other); } Ok(()) } fn app_proto_head(&self) -> Option<AppProtoHead> { // 這里返回一個 AppProtoHead 結(jié)構(gòu),返回 None 直接丟棄這段數(shù)據(jù) Some(AppProtoHead { proto: L7Protocol::MongoDB, }) } fn is_tls(&self) -> bool { self.is_tls } }
-
為
MongoDBInfo
實現(xiàn)L7ProtocolSendLog
impl From<MongoDBInfo> for L7ProtocolSendLog { fn from(f: MongoDBInfo) -> Self { let log = L7ProtocolSendLog { // 這里需要把 info 轉(zhuǎn)換成統(tǒng)一的發(fā)送結(jié)構(gòu) L7ProtocolSendLog }; return log; } } // 參考源碼來自:deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs pub struct L7ProtocolSendLog { pub req_len: Option<u32>, pub resp_len: Option<u32>, pub row_effect: u32, pub req: L7Request, pub resp: L7Response, pub version: Option<String>, pub trace_info: Option<TraceInfo>, pub ext_info: Option<ExtendedInfo>, }
-
把實現(xiàn)
L7ProtocolParserInterface
的接口,添加到deepflow/agent/src/common/l7_protocol_log.rs
中的impl_protocol_parser!
宏。impl_protocol_parser! { pub enum L7ProtocolParser { // http have two version but one parser, can not place in macro param. // custom must in frist so can not place in macro DNS(DnsLog), SofaRPC(SofaRpcLog), MySQL(MysqlLog), Kafka(KafkaLog), Redis(RedisLog), + MongoDB(MongoDBLog), PostgreSQL(PostgresqlLog), Dubbo(DubboLog), FastCGI(FastCGILog), MQTT(MqttLog), // add protocol below } }
-
第二步的效果
-
通過
perf_states
統(tǒng)計記錄QPS
、耗時
和異常
情況impl L7ProtocolParserInterface for MongoDBLog { fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> { let mut info = MongoDBInfo::default(); self.parse(payload, param.l4_protocol, param.direction, &mut info)?; // 解碼得到L7ProtocolInfo info.cal_rrt(param, None).map(|rrt| { info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); // 耗時 }); } impl MongoDBLog { fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,) -> Result<bool> { // 解碼指令獲取請求和響應(yīng)等信息 if header.is_request() { + self.perf_stats.as_mut().map(|p: &mut L7PerfStats| p.inc_req()); // 請求記錄 } else { + self.perf_stats.as_mut().map(|p| p.inc_resp()); // 響應(yīng)記錄 } match info.op_code { _OP_REPLY if payload.len() > _HEADER_SIZE => { let mut msg_body = MongoOpReply::default(); msg_body.decode(&payload[_HEADER_SIZE..])?; if !msg_body.reply_ok { + self.perf_stats.as_mut().map(|p| p.inc_resp_err());// 異常記錄 } } } } }
效果如圖:
-
最后在 deepflow-server 補充服務(wù)端的協(xié)議識別
以下兩部分內(nèi)容在代碼文件server/libs/datatype/flow.go
中type L7Protocol uint8 const ( L7_PROTOCOL_UNKNOWN L7Protocol = 0 L7_PROTOCOL_OTHER L7Protocol = 1 L7_PROTOCOL_HTTP_1 L7Protocol = 20 L7_PROTOCOL_HTTP_2 L7Protocol = 21 L7_PROTOCOL_HTTP_1_TLS L7Protocol = 22 L7_PROTOCOL_HTTP_2_TLS L7Protocol = 23 L7_PROTOCOL_DUBBO L7Protocol = 40 L7_PROTOCOL_GRPC L7Protocol = 41 L7_PROTOCOL_SOFARPC L7Protocol = 43 L7_PROTOCOL_FASTCGI L7Protocol = 44 L7_PROTOCOL_MYSQL L7Protocol = 60 L7_PROTOCOL_POSTGRE L7Protocol = 61 L7_PROTOCOL_REDIS L7Protocol = 80 + L7_PROTOCOL_MONGODB L7Protocol = 81 L7_PROTOCOL_KAFKA L7Protocol = 100 L7_PROTOCOL_MQTT L7Protocol = 101 L7_PROTOCOL_DNS L7Protocol = 120 L7_PROTOCOL_CUSTOM L7Protocol = 127 )
func (p L7Protocol) String() string { formatted := "" switch p { case L7_PROTOCOL_HTTP_1: formatted = "HTTP" case L7_PROTOCOL_DNS: formatted = "DNS" case L7_PROTOCOL_MYSQL: formatted = "MySQL" case L7_PROTOCOL_POSTGRE: formatted = "PostgreSQL" case L7_PROTOCOL_REDIS: formatted = "Redis" + case L7_PROTOCOL_MONGODB: + formatted = "MongoDB" case L7_PROTOCOL_DUBBO: formatted = "Dubbo" case L7_PROTOCOL_GRPC: formatted = "gRPC" case L7_PROTOCOL_CUSTOM: formatted = "Custom" case L7_PROTOCOL_OTHER: formatted = "Others" default: formatted = "N/A" } return formatted }
server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol
# Value , DisplayName , Description 0 , N/A , 1 , Others , 20 , HTTP , 21 , HTTP2 , 22 , HTTP1_TLS , 23 , HTTP2_TLS , 40 , Dubbo , 41 , gRPC , 43 , SOFARPC , 44 , FastCGI , 60 , MySQL , 61 , PostgreSQL , 80 , Redis , + 81 , MongoDB , 100 , Kafka , 101 , MQTT , 120 , DNS , 127 , Custom ,
-
到這里已經(jīng)完成 DeepFlow Agent 的原生協(xié)議擴展了,參考《# 完整指南:如何編譯、打包和部署二次開發(fā)的 DeepFlow 》編譯程序發(fā)布即可。
如果想快速實現(xiàn)一個協(xié)議采集解析,或者不熟悉Rust語言呢?我們還有一個選擇,就是利用Wasm插件快速擴展協(xié)議解碼。
0x2: 利用 Wasm 插件擴展 DeepFlow 的協(xié)議采集
該案例是用 Wasm 擴展 Kafka 協(xié)議支持 Topic
的實踐。
首先還是參考Kafka的官方文檔對Kafka協(xié)議做一個簡單的分析
Kafka協(xié)議分析
Kafka
的 Header
和 Data
概覽
Kafka的Fetch API
Kafka的Produce API
Kafka 協(xié)議 DeepFlow Agent 原生解碼:
截止到 v6.3.x 版本,DeepFlow Agent 對 Kafka的原生解碼如下圖所示,還不支持 Topic 字段的解碼,
且API的解碼還沒有版本號。
接下來的插件開發(fā)主要解決 Topic字 段的解碼放在 resource 展示,同時把 API 的版本號也解析出來。
DeepFlow Agent 的 Wasm 插件
參考官方插件文檔《 wasm-plugin》,需要注意兩點:
-
DeepFlow Agent 通過遍歷所有支持協(xié)議判斷一個流的應(yīng)用層協(xié)議,順序是:
HTTP
->Wasm Hook
->DNS
-> … -
需要使用
Go
版本不低于1.21
并且tinygo
版本需要不低于0.29
Wasm Go SDK 的框架
-
先對框架有一個大概的認識,如下代碼所示,整個框架邏輯都在以下五個接口函數(shù)。
package main import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk" // 定義結(jié)構(gòu),需要實現(xiàn) sdk.Parser 接口 type plugin struct {} func (p plugin) HookIn() []sdk.HookBitmap {return []sdk.HookBitmap{}} // HookIn() 包含 HOOK_POINT_HTTP_REQ 時,http 請求解析完成返回之前會調(diào)用。 // HttpReqCtx 包含了 BaseCtx 和已經(jīng)解析出來的一些 http 頭部 func (p plugin) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action { return sdk.HttpReqActionAbortWithResult(nil, trace, attr) } func (p plugin) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()} func (p plugin) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 0, "ownwasm"} func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{}) } // main 需要注冊解析器 func main() { sdk.SetParser(plugin{}) }
-
DeepFlow Agent 會遍歷所有插件調(diào)用對應(yīng)的 Export 函數(shù),但是遍歷的行為可以通過返回值控制
返回值 說明 sdk.ActionNext() 停止當前插件,直接執(zhí)行下一個插件 sdk.ActionAbort() 停止當前插件并且停止遍歷 sdk.ActionAbortWithErr(err) 停止當前插件,打印錯誤日志并且停止遍歷 sdk.HttpActionAbortWithResult() Agent 停止遍歷并且提取相應(yīng)返回結(jié)果 sdk.ParseActionAbortWithL7Info() Agent 停止遍歷并且提取相應(yīng)返回結(jié)果 ??注意:
因為該案例不涉及HTTP
協(xié)議的處理,所以OnHttpReq()
和OnHttpResp()
直接使用sdk.ActionNext()
跳過即可。
該案例也不會用到sdk.HttpActionAbortWithResult()
。 -
HookBitmap
的三個hook
點hook點 說明 HOOK_POINT_HTTP_REQ 表示 http 請求解析完成返回之前 HOOK_POINT_HTTP_RESP 表示 http 響應(yīng)解析完成返回之前 HOOK_POINT_PAYLOAD_PARSE 表示協(xié)議的判斷和解析 ??注意:因為該案例不涉及
HTTP
協(xié)議的處理,所以HOOK_POINT_HTTP_REQ
和HOOK_POINT_HTTP_RESP
在該案例也不會用到。
插件代碼指引
-
梳理后的 Kafka 協(xié)議的 Wasm 插件代碼框架
package main import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk" // 定義結(jié)構(gòu),需要實現(xiàn) sdk.Parser 接口 type kafkaParser struct {} func (p kafkaParser) HookIn() []sdk.HookBitmap { return []sdk.HookBitmap{sdk.HOOK_POINT_PAYLOAD_PARSE} } // 跳過HTTP協(xié)議處理 func (p kafkaParser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {return sdk.ActionNext()} func (p kafkaParser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()} // 協(xié)議判斷檢查 func (p kafkaParser) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 100, "kafka"} // 協(xié)議解碼 func (p kafkaParser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{}) } // main 需要注冊解析器 func main() {sdk.SetParser(plugin{})}
-
協(xié)議識別
??注意:以下代碼注釋func (p kafkaParser) OnCheckPayload(ctx *sdk.ParseCtx) (uint8, string) { // 跳過UDP協(xié)議數(shù)據(jù) if ctx.L4 != sdk.TCP { return 0, "" } // 如果環(huán)境有標準規(guī)范的端口約定,插件中指定端口會減少協(xié)議數(shù)據(jù)的遍歷,優(yōu)化解碼時cpu等資源消耗 if ctx.DstPort < 9092 || ctx.DstPort > 9093 { return 0, "" } // 讀取抓包數(shù)據(jù) payload, err := ctx.GetPayload() if err != nil { sdk.Error("get payload fail: %v", err) return 0, "" } // 引用"github.com/segmentio/kafka-go/protocol"來解碼 bl, err := protocol.ReadAll(protocol.NewBytes(payload)) if err != nil { sdk.Error("read payload fail: %v", err) return 0, "" } b, _ := decodeHeader(bl) if !b { return 0, "" } return WASM_KAFKA_PROTOCOL, "kafka" }
-
協(xié)議 API 解碼
-
官方代碼框架
OnParsePayload()
的邏輯如下func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { // ctx.L7 就是 OnCheckPayload 返回的協(xié)議號,可以先根據(jù)4層協(xié)議或協(xié)議號過濾。 if ctx.L4 != sdk.TCP {return sdk.ActionNext()} payload, err := ctx.GetPayload() if err != nil {return sdk.ActionAbortWithErr(err)} // the parse logic here // ... /* 關(guān)于 L7ProtocolInfo 結(jié)構(gòu): type L7ProtocolInfo struct { ReqLen *int // 請求長度 例如 http 的 content-length RespLen *int // 響應(yīng)長度 例如 http 的 content-length RequestID *uint32 // 子流的id標識,例如 http2 的 stream id,dns 的 transaction id Req *Request Resp *Response Trace *Trace // 跟蹤信息 Kv []KeyVal // 對應(yīng) attribute } type Request struct { ReqType string // 對應(yīng)請求類型 Domain string // 對應(yīng)請求域名 Resource string // 對應(yīng)請求資源 Endpoint string // 對應(yīng) endpoint } type Response struct { Status RespStatus // 對應(yīng)響應(yīng)狀態(tài) Code *int32 // 對應(yīng)響應(yīng)碼 Result string // 對應(yīng)響應(yīng)結(jié)果 Exception string // 對應(yīng)響應(yīng)異常 }*/ return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{}) }
-
Topic 字段解碼的代碼邏輯
func (p kafkaParser) OnParsePayload(ctx *sdk.ParseCtx) sdk.Action { // the parse logic here // ... // 解碼 header base size : // req_len(int32) + api_key(int16) + api_ver(int16) + c_id(int32) + client_len(int16) // = 14 var header_offset = 14 + header.clientLen var topic_size int16 = 0 var topic_name = "" switch protocol.ApiKey(header.apikey) { case protocol.Produce: topic_size, topic_name = decodeProduce(header.apiversion, payload[header_offset:]) case protocol.Fetch: topic_size, topic_name = decodeFetch(header.apiversion, payload[header_offset:]) } if topic_size == 0 { return sdk.ActionNext() } req = &sdk.Request{ ReqType: protocol.ApiKey(header.apikey).String() + "_v" + strconv.Itoa(int(header.apiversion)), Resource: topic_name, } return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{ { RequestID: &id, ReqLen: &length, Req: req, }, }) }
-
加載插件和效果展示
執(zhí)行如下命令編譯插件,通過CTL方式加載插件
tinygo build -o build/topic.wasm -target wasi -panic=trap -scheduler=none -no-debug ./wasm/kafka/topic.go
deepflow-ctl plugin create --type wasm --image build/topic.wasm --name topic
準備好 DeepFlow Agent 的配置文件增加如下配置。注意,DeepFlow Agent 可以加載多個 Wasm 插件。
############
## plugin ##
############
## wasm plugin need to load in agent
wasm-plugins:
- mongo
- topic
執(zhí)行命令更新配置
deepflow-plugin git:(main) ? deepflow-ctl agent-group-config update -f g-d2d06af17e.yaml
當 DeepFlow Agent 日志出現(xiàn)如下圖黃字體內(nèi)容,即加載成功。
在 Grafana 上,可以看到原生的 Kafka 協(xié)議被覆蓋,出現(xiàn)了幾個變化:
-
Protocol
字段從Kafka
變成Custom
-
Request type
字段的API
多了版本號 -
Request resource
字段出現(xiàn)了Topic
信息
0x3: 結(jié)語
最后對比一下兩個協(xié)議擴展的方式,要注意??的是:
- 兩者都存在一個共性問題,就是每增加一個協(xié)議,識別協(xié)議解碼的效率相對降低
- 可以通過配置的方式減少需解碼的協(xié)議數(shù)量
原生Rust擴展
-
優(yōu)點:文章來源:http://www.zghlxwxcb.cn/news/detail-811832.html
- 運行時的資源占用比插件低
- 支持的功能比插件的豐富,且定制性更靈活
-
缺點:文章來源地址http://www.zghlxwxcb.cn/news/detail-811832.html
- 在語言方面的開發(fā)難度比插件的大
- 相對插件開發(fā)而言,新增協(xié)議需要改動的地方較多,還涉及到 Server 的一小部分代碼
Wasm插件擴展
-
優(yōu)點:
- 用 Golang 開發(fā)相對 Rust 語言難度較低
- 可在運行時通過 CLI 方式加載
- 擴展性強
-
缺點:
- Go 的標準庫和第三方庫有一定的限制,且調(diào)試難度大,導(dǎo)致插件異常較難排除
- 由于Wasm本身限制等問題,導(dǎo)致功能相對 Rust 原生開發(fā)較弱
- 資源增加,特別是內(nèi)存方面。
0x5: 附錄
- DeepFlow 協(xié)議開發(fā)文檔
- DeepFlow的Wasm 插件系統(tǒng)
- 使用 DeepFlow Wasm 插件實現(xiàn)業(yè)務(wù)可觀測性
- MongoDB協(xié)議文檔
- Kafka協(xié)議文檔
到了這里,關(guān)于eBPF系列之:DeepFlow 擴展協(xié)議解析實踐(MongoDB協(xié)議與Kafka協(xié)議)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!