一、文件寫入流程
1.相關(guān)知識(shí)點(diǎn)介紹
Pipeline管道:
?Pipeline,即管道。這是 HDFS 在上傳?件寫數(shù)據(jù)過程時(shí)采?的?種數(shù)據(jù)傳輸?式??蛻舳藢?shù)據(jù)塊寫?第?個(gè)數(shù)據(jù)節(jié)點(diǎn),第?個(gè)數(shù)據(jù)節(jié)點(diǎn)保存數(shù)據(jù)之后再將塊復(fù)制到第?個(gè)數(shù)據(jù)節(jié)點(diǎn),后者保存后將其復(fù)制到第三個(gè)數(shù)據(jù)節(jié)點(diǎn)。通俗描述 pipeline 的過程就是:Client——>DN1——>DN2—>DN3
為什么 datanode 之間采? pipeline 線性傳輸,?不是?次給三個(gè) datanode 拓?fù)涫絺鬏斈??因?yàn)閿?shù)據(jù)以管道的?式,順序的沿著?個(gè)?向傳輸,這樣能夠充分利?每個(gè)機(jī)器的帶寬,避免?絡(luò)瓶頸和?延遲時(shí)的連接,最?化推送所有數(shù)據(jù)的延時(shí)。在線性推送模式下,每臺(tái)機(jī)器所有的出?寬帶都?于以最快的速度傳輸數(shù)據(jù),?不是在多個(gè)接受者之間分配寬帶。
ACK應(yīng)答:
?ACK (Acknowledge character)即是確認(rèn)字符,在數(shù)據(jù)通信中,接收?發(fā)給發(fā)送?的?種傳輸類控制字符。表示發(fā)來的數(shù)據(jù)已確認(rèn)接收?誤。在 pipeline 管道傳輸數(shù)據(jù)的過程中,傳輸?shù)姆?向會(huì)進(jìn)? ACK 校驗(yàn),確保數(shù)據(jù)傳輸安全。
2.寫入流程
(1)客戶端調(diào)? DistributedFileSystem 對(duì)象的 create() ?法創(chuàng)建?個(gè)?件輸出流對(duì)象。
(2)DistributedFileSystem 對(duì)象向 NameNode 發(fā)起 RPC 調(diào)?,NameNode 檢查該?件是否已經(jīng)存在,以及客戶端是否有權(quán)限新建?件。如果這些檢查通過,NameNode 就會(huì)為創(chuàng)建新?件記錄?條記錄。否則,?件創(chuàng)建失敗并向客戶端拋出 IOException。
(3)DistributedFileSystem 向客戶端返回 FSDataOutputStream 輸出流對(duì)象。由此客戶端可以開始寫?數(shù)據(jù)。FSDataOutputStream 封裝?個(gè) DFSOutputStream 對(duì)象,負(fù)責(zé)處理 DataNode 和 NameNode 之間的通信。
(4)客戶端調(diào)? FSDataOutputStream 對(duì)象的 write() ?法寫數(shù)據(jù)。DFSOutputStream 將數(shù)據(jù)分成?個(gè)個(gè)數(shù)據(jù)包(packet 默認(rèn) 64KB),并寫??個(gè)稱之為數(shù)據(jù)隊(duì)列(Data queue)的內(nèi)部隊(duì)列。DFSOutputStream 有?個(gè)內(nèi)部類DataStreamer,?于請(qǐng)求 NameNode 挑選出適合存儲(chǔ)數(shù)據(jù)副本的?組 DataNode。這?組DataNode 采? pipeline 機(jī)制做數(shù)據(jù)的發(fā)送。默認(rèn)是 3 副本存儲(chǔ)。
(5)Pipeline 傳遞數(shù)據(jù)給 DataNode。DataStreamer 將數(shù)據(jù)包流式傳輸?shù)?pipeline 的第?個(gè) datanode,該DataNode存儲(chǔ)數(shù)據(jù)包并將它發(fā)送到 pipeline 的第?個(gè) DataNode。同樣,第?個(gè) DataNode 存儲(chǔ)數(shù)據(jù)包并且發(fā)送給第三個(gè)(也是最后?個(gè))DataNode。
(6)DFSOutputStream 也維護(hù)著?個(gè)內(nèi)部數(shù)據(jù)包隊(duì)列來等待 DataNode 的收到確認(rèn)回執(zhí),稱之為確認(rèn)隊(duì)列(ackqueue),收到 pipeline 中所有 DataNode 確認(rèn)信息后,該數(shù)據(jù)包才會(huì)從確認(rèn)隊(duì)列刪除。管道上的數(shù)據(jù)節(jié)點(diǎn)按反向順序返回確認(rèn)信息,最終由管道中的第?個(gè)數(shù)據(jù)節(jié)點(diǎn)將整條管道的確認(rèn)信息發(fā)送給客戶端。
(7)客戶端完成寫?,調(diào)? close() ?法關(guān)閉?件輸出流。
(8)通知 NameNode ?件寫?成功。
(9)在?個(gè)塊被寫?期間可能會(huì)出現(xiàn)多個(gè) DataNode 同時(shí)發(fā)?故障。只要寫?了指定的最?副本數(shù)(dfs.namenode.replication.min 的默認(rèn)值為 1),寫操作就會(huì)判定成功。數(shù)據(jù)塊其他副本可以在集群中異步復(fù)制,直到達(dá)到其?標(biāo)副本數(shù)(dfs.replication 的默認(rèn)值為 3)。
3.代碼實(shí)現(xiàn)
// ?件寫?
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileInputStream;
import java.io.IOException;
public class HDFSWriteDemo {
??public static void main(String[] args) throws IOException {
????// 設(shè)置客戶端?戶身份:root 具備在 hdfs 的讀寫權(quán)限
????System.setProperty("HADOOP_USER_NAEM", "root");
????// 創(chuàng)建 conf 對(duì)象
????Configuration conf = new Configuration();
????// 設(shè)置分布?件系統(tǒng),默認(rèn)為本地?件系統(tǒng)file:///
????conf.set("fs.defaultFS", "hdfs://node01:9000");
????// 創(chuàng)建 FileSystem 對(duì)象
????FileSystem fs = FileSystem.get(conf);
????// 設(shè)置?件輸出路徑
????Path path = new Path("/mydir/data.txt");
????// 調(diào)? create ?法創(chuàng)建?件
????FSDataOutputStream out = fs.create(path);
????// 創(chuàng)建本地輸?流
????FileInputStream in = new FileInputStream("D:\\data.txt");
????// IO ?具類實(shí)現(xiàn)流對(duì)拷?
????IOUtils.copy(in, out);
????// 關(guān)閉連接
????fs.close();
?}
}
二、文件讀取流程
1.讀取流程
?(1)HDFS Client 調(diào)?抽象類 FileSystem.get() 獲取?個(gè) DistributedFileSystem 對(duì)象,然后調(diào)?
DistributedFileSystem.open() 打開要讀取的?件。
(2)DistributedFileSystem 向 NameNode 發(fā)起 RPC 調(diào)?,獲得?件的數(shù)據(jù)塊信息。NameNode 返回?cái)?shù)據(jù)塊 ID 及 存儲(chǔ)數(shù)據(jù)塊的 DataNode 地址列表,該列表按照數(shù)據(jù)塊 DataNode 與客戶端的?絡(luò)拓?fù)渚嚯x進(jìn)?排序。文章來源:http://www.zghlxwxcb.cn/news/detail-735150.html
(3)DistributedFileSystem 在底層調(diào)? ClientProtocol.open(),返回?個(gè) FSDataInputStream 對(duì)象?于讀取數(shù)據(jù)。FSDataInputStream 底層封裝了 DFSInputStream 對(duì)象,負(fù)責(zé)管理 DataNode 和 NameNode 之間的 I/O。
(4)客戶端對(duì) DFSInputStream 調(diào)? read() ?法。隨即 DFSInputStream 連接與客戶端距離最近的 NameNode,通過對(duì)數(shù)據(jù)流反復(fù)調(diào)?read()?法,把數(shù)據(jù)從 DataNode 傳輸?shù)娇蛻舳恕?br> (5)當(dāng)數(shù)據(jù)塊讀取完畢時(shí),DFSInputStream 將關(guān)閉與該 DataNode 的連接,然后尋找下?個(gè)塊的最佳DataNode。這些操作對(duì)?戶來說是透明的。所以?戶感覺起來它?直在讀取?個(gè)連續(xù)的流。
(6)當(dāng)客戶端讀取完數(shù)據(jù)時(shí),調(diào)? FSDataInputStream.close() ?法關(guān)閉輸?流。
(7)如果 DFSInputStream 與 DataNode 通信時(shí)遇到錯(cuò)誤,它將嘗試該塊的下?個(gè)最接近的 DataNode 讀取數(shù)據(jù),并記住發(fā)?故障的 DataNode,保證以后不會(huì)反復(fù)讀取該 DataNode 后續(xù)的塊。此外,DFSInputStream也會(huì)通過校驗(yàn)和(checksum)確認(rèn)從 DataNode 發(fā)來的數(shù)據(jù)是否完整。如果發(fā)現(xiàn)有損壞的塊,DFSInputStream 會(huì)嘗試從其他DataNode 讀取該塊的副本,也會(huì)將被損壞的塊報(bào)告給 NameNode。文章來源地址http://www.zghlxwxcb.cn/news/detail-735150.html
2.相關(guān)代碼
// ?件讀取
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileOutputStream;
import java.io.IOException;
public class HDFSReadDemo {
??public static void main(String[] args) throws IOException {
????// 設(shè)置客戶端?戶身份:root 具備在 hdfs 的讀寫權(quán)限
????System.setProperty("HADOOP_USER_NAEM", "root");
????// 創(chuàng)建 conf 對(duì)象
????Configuration conf = new Configuration();
????// 設(shè)置分布?件系統(tǒng),默認(rèn)為本地?件系統(tǒng)file:///
????conf.set("fs.defaultFS", "hdfs://node01:9000");
????// 創(chuàng)建 FileSystem 對(duì)象
????FileSystem fs = FileSystem.get(conf);
????// 調(diào)? open ?法讀取?件
????FSDataInputStream in = fs.open(new Path("/data.txt"));
????// 創(chuàng)建本地輸出流
????FileOutputStream out = new FileOutputStream("D:\\data.txt");
????// IO ?具類實(shí)現(xiàn)流對(duì)拷?
????IOUtils.copy(in, out);
????// 關(guān)閉連接
????fs.close();
?}
}
到了這里,關(guān)于HDFS的文件寫入和文件讀取流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!