国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

HDFS寫流程源碼分析(一)-客戶端

這篇具有很好參考價(jià)值的文章主要介紹了HDFS寫流程源碼分析(一)-客戶端。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

環(huán)境為hadoop 3.1.3

一、客戶端

以下代碼創(chuàng)建并寫入文件。

public void create() throws URISyntaxException, IOException, InterruptedException {
        // 配置文件
        Configuration conf = new Configuration();
        // 獲取文件系統(tǒng)
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.157.128:9000"), conf, "root");
        // 創(chuàng)建文件并寫入數(shù)據(jù)
        FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
        out.write("Hello, HDFS".getBytes());
        out.flush();
        // 關(guān)閉流
        fs.close();
    }

Configuration加載了hadoop的配置信息,下為其靜態(tài)代碼塊,可以看到眼熟的配置文件名稱。

static{
    //print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if(cL.getResource("hadoop-site.xml")!=null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
    }
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
  }

FileSystem是Hadoop文件系統(tǒng)的抽象類,有許多實(shí)現(xiàn)(如下圖),hdfs便是其分布式文件系統(tǒng)的具體實(shí)現(xiàn)。
HDFS寫流程源碼分析(一)-客戶端
這里我們關(guān)注的為DistributedFileSystem。

(一)文件創(chuàng)建及Pipeline構(gòu)建階段

FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));

上面一行代碼用于創(chuàng)建/root/test3.txt文件,并獲取該文件的輸出流。經(jīng)過多次跳轉(zhuǎn),定向到DistributedFileSystemcreate方法。

public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);  // metric
    Path absF = fixRelativePart(f);  // 獲取絕對路徑
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        /*
        * 主要工作:
        * 1、向NameNode進(jìn)行create方法的rpc調(diào)用,創(chuàng)建文件
        * 2、啟動DataStreamer,用于后續(xù)的數(shù)據(jù)傳輸
        */
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        return dfs.createWrappedOutputStream(dfsos, statistics);  // 封裝返回HdfsDataOutputStream
      }

	  // 異常重試
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }

DFSClientcreate方法主要做了兩件事,一是向NameNode進(jìn)行create方法的rpc調(diào)用,創(chuàng)建文件,二是啟動DataStreamer,用于后續(xù)的數(shù)據(jù)傳輸。

public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt,
                             InetSocketAddress[] favoredNodes) throws IOException {
    // 檢查客戶端狀態(tài)
    checkOpen();
    // 封裝權(quán)限信息(rw-r--r--)
    if (permission == null) {
      permission = FsPermission.getFileDefault();
    }
    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
    if(LOG.isDebugEnabled()) {
      LOG.debug(src + ": masked=" + masked);
    }
    // 更優(yōu)先選擇作為DataNode的節(jié)點(diǎn)
    String[] favoredNodeStrs = null;
    if (favoredNodes != null) {
      favoredNodeStrs = new String[favoredNodes.length];
      for (int i = 0; i < favoredNodes.length; i++) {
        favoredNodeStrs[i] = 
            favoredNodes[i].getHostName() + ":" 
                         + favoredNodes[i].getPort();
      }
    }
    /*
    * 1、create的rpc調(diào)用
    * 2、DataStreamer啟動
    */
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt),
        favoredNodeStrs);
    // lease
    beginFileLease(result.getFileId(), result);
    return result;
  }

這里我們著重關(guān)注DFSOutputStream.newStreamForCreate()方法。

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    HdfsFileStatus stat = null;

    // Retry the create if we get a RetryStartFileException up to a maximum
    // number of times
    boolean shouldRetry = true;
    int retryCount = CREATE_RETRY_COUNT;
    while (shouldRetry) {
      shouldRetry = false;
      try {
        // rpc調(diào)用
        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
            new EnumSetWritable<CreateFlag>(flag), createParent, replication,
            blockSize, SUPPORTED_CRYPTO_VERSIONS);
        break;
      } catch (RemoteException re) {
        IOException e = re.unwrapRemoteException(
            AccessControlException.class,
            DSQuotaExceededException.class,
            FileAlreadyExistsException.class,
            FileNotFoundException.class,
            ParentNotDirectoryException.class,
            NSQuotaExceededException.class,
            RetryStartFileException.class,
            SafeModeException.class,
            UnresolvedPathException.class,
            SnapshotAccessControlException.class,
            UnknownCryptoProtocolVersionException.class);
        if (e instanceof RetryStartFileException) {
          if (retryCount > 0) {
            shouldRetry = true;
            retryCount--;
          } else {
            throw new IOException("Too many retries because of encryption" +
                " zone operations", e);
          }
        } else {
          throw e;
        }
      }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes);
    // 啟動DataStreamer
    out.start();
    return out;
  }

首先進(jìn)行了create方法的rpc調(diào)用。然后我們著重關(guān)注DFSOutputStream.start()。

  private synchronized void start() {
    streamer.start();
  }

開啟了DataStreamer線程用于向DataNode發(fā)送數(shù)據(jù),所以我們著重關(guān)注DataStreamer.run()。

public void run() {
      long lastPacket = Time.now();
      TraceScope traceScope = null;
      if (traceSpan != null) {
        traceScope = Trace.continueSpan(traceSpan);
      }
      while (!streamerClosed && dfsClient.clientRunning) {

        // if the Responder encountered an error, shutdown Responder
        if (hasError && response != null) {
          try {
            // ResponseProcessor,用于處理下游DataNode的響應(yīng)
            response.close();
            response.join();
            response = null;
          } catch (InterruptedException  e) {
            DFSClient.LOG.warn("Caught exception ", e);
          }
        }

        Packet one;
        try {
          // process datanode IO errors if any
          boolean doSleep = false;
          if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
            doSleep = processDatanodeError();
          }

          // dataQueue中裝的是Packet(Block(128MB) -> Packet(64KB) -> Chunk(512B數(shù)據(jù) + 4B校驗(yàn)))
          // Block:數(shù)據(jù)存儲單元; Packet:數(shù)據(jù)傳輸單元; Chunk:校驗(yàn)單元
          synchronized (dataQueue) {
            // wait for a packet to be sent.
            long now = Time.now();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                && dataQueue.size() == 0 && 
                (stage != BlockConstructionStage.DATA_STREAMING || // 狀態(tài)為DATA_STREAMING表示鏈接已建立,正在傳輸數(shù)據(jù)
                 stage == BlockConstructionStage.DATA_STREAMING && 
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try {
                dataQueue.wait(timeout);  // 等待喚醒(Packet填充完畢,dataQueue不為空了)
              } catch (InterruptedException  e) {
                DFSClient.LOG.warn("Caught exception ", e);
              }
              doSleep = false;
              now = Time.now();
            }
            
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            // get packet to be sent.
            if (dataQueue.isEmpty()) {
              // 心跳包
              one = createHeartbeatPacket();
            } else {
              one = dataQueue.getFirst(); // regular data packet
            }
          }
          assert one != null;

          // get new block from namenode.
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Allocating new block");
            }
            /*
            * 主要工作:
            * nextBlockOutputStream()
            * 1、向NameNode發(fā)送addBlock的rpc請求,新建block加入文件,分配并返回存儲該block的DataNode
            * 2、向第一個(gè)DataNode建立連接(鏈?zhǔn)綇?fù)制)
            * setPipeline()
            * 3、記錄參與鏈?zhǔn)綇?fù)制的節(jié)點(diǎn)及相關(guān)信息
            */
            setPipeline(nextBlockOutputStream());
            // 啟動ResponseProcessor(接收Pipeline中第一個(gè)DataNode的ack),更改輸出流狀態(tài)為DATA_STREAMING
            initDataStreaming();
          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
          }

		  // 最后一個(gè)Packet超出了Block限制
          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
          if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize +
                " is smaller than data size. " +
                " Offset of packet in block " + 
                lastByteOffsetInBlock +
                " Aborting file " + src);
          }

		  // 發(fā)送Block的最后一個(gè)Packet之前,先等待其它Packet都已經(jīng)被DataNode接收
		  // 保證一個(gè)block被完整地接收,防止跨block的Packet同時(shí)等待ack
          if (one.lastPacketInBlock) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                try {
                  // wait for acks to arrive from datanodes
                  dataQueue.wait(1000);
                } catch (InterruptedException  e) {
                  DFSClient.LOG.warn("Caught exception ", e);
                }
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            // 
            // 輸出流置為關(guān)閉狀態(tài)
            // 因?yàn)楹罄m(xù)block所存儲的DataNode可能與這個(gè)block不同,所以pipeline也沒啥用了,直接關(guān)閉就行
            stage = BlockConstructionStage.PIPELINE_CLOSE;
          }
          
          // send the packet
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            // 如果不是心跳包,就將該包從待發(fā)送隊(duì)列(dataQueue)移到待響應(yīng)隊(duì)列(ackQueue),等待DataNode響應(yīng)
            if (!one.isHeartbeatPacket()) {
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          try {
            // 向DataNode發(fā)送Packet
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN. If a failed or restarting node has already
            // been recorded by the responder, the following call will have no 
            // effect. Pipeline recovery can handle only one node error at a
            // time. If the primary node fails again during the recovery, it
            // will be taken out then.
            tryMarkPrimaryDatanodeFailed();
            throw e;
          }
          lastPacket = Time.now();
          
          // update bytesSent
          // 更新已發(fā)送的數(shù)據(jù)在block中的偏移量
          long tmpBytesSent = one.getLastByteOffsetBlock();
          if (bytesSent < tmpBytesSent) {
            bytesSent = tmpBytesSent;
          }

          if (streamerClosed || hasError || !dfsClient.clientRunning) {
            continue;
          }

          // Is this block full?
          // 如果剛才發(fā)送的Packet是block中的最后一個(gè),持續(xù)等待,直到該P(yáng)acket已被ack,
          // 此時(shí)該block的所有Packet都已經(jīng)被ack了(因?yàn)橹挥星懊嫠械陌急籥ck了,最后一個(gè)包才會發(fā)出去)
          if (one.lastPacketInBlock) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }

			/*
			* 1、停止ResponseProcessor
			* 2、關(guān)閉輸出流
			* 3、pipeline重置
			* 4、輸出流狀態(tài)置為PIPELINE_SETUP_CREATE,等待下一個(gè)block傳輸時(shí)再次建立連接
			*/
            endBlock();
          }
          if (progress != null) { progress.progress(); }

          // This is used by unit test to trigger race conditions.
          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
            Thread.sleep(artificialSlowdown); 
          }
        } catch (Throwable e) {
          // Log warning if there was a real error.
          if (restartingNodeIndex == -1) {
            DFSClient.LOG.warn("DataStreamer Exception", e);
          }
          if (e instanceof IOException) {
            setLastException((IOException)e);
          } else {
            setLastException(new IOException("DataStreamer Exception: ",e));
          }
          hasError = true;
          if (errorIndex == -1 && restartingNodeIndex == -1) {
            // Not a datanode issue
            streamerClosed = true;
          }
        }
      }
      if (traceScope != null) {
        traceScope.close();
      }

	  // 釋放先前沒能釋放成功的資源
      closeInternal();
    }

這里我們主要關(guān)注建立連接的nextBlockOutputStream()方法和啟動ResponseProcessorinitDataStreaming()方法。

    private LocatedBlock nextBlockOutputStream() throws IOException {
      LocatedBlock lb = null;
      DatanodeInfo[] nodes = null;
      StorageType[] storageTypes = null;
      int count = dfsClient.getConf().nBlockWriteRetry;
      boolean success = false;
      ExtendedBlock oldBlock = block;
      do {
        hasError = false;
        lastException.set(null);
        errorIndex = -1;
        success = false;

        long startTime = Time.now();
        // 不會被選作用于存儲該block的DataNode
        DatanodeInfo[] excluded =
            excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
            .keySet()
            .toArray(new DatanodeInfo[0]);
        block = oldBlock;
        /* 發(fā)送addBlock方法的rpc請求給NameNode,以:
        * 1、創(chuàng)建block并加入該文件
        * 2、選擇存儲該block的DataNode并排序,返回(lb)
        */
        lb = locateFollowingBlock(startTime,
            excluded.length > 0 ? excluded : null);
        block = lb.getBlock();
        block.setNumBytes(0);
        bytesSent = 0;
        accessToken = lb.getBlockToken();
        nodes = lb.getLocations();
        storageTypes = lb.getStorageTypes();

        //
        // Connect to first DataNode in the list.
        //
        // 連接鏈中第一個(gè)DataNode
        success = createBlockOutputStream(nodes, storageTypes, 0L, false);

        if (!success) {
          DFSClient.LOG.info("Abandoning " + block);
          dfsClient.namenode.abandonBlock(block, fileId, src,
              dfsClient.clientName);
          block = null;
          DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
          /*
          * 這里需要注意一下,如果建立連接失敗,會將連接不上的DataNode加到excludedNodes中,
          * 下次調(diào)用addBlock時(shí)附帶,以避免分配客戶端連接不上的DataNode給該塊
          */
          excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
        }
      } while (!success && --count >= 0);

      if (!success) {
        throw new IOException("Unable to create new block.");
      }
      return lb;
    }

nextBlockOutputStream()方法向NameNode申請了新塊,并獲取該塊的存儲節(jié)點(diǎn)鏈,并與鏈中第一個(gè)DataNode建立連接。這里我們關(guān)注createBlockOutputStream()方法。

    private boolean createBlockOutputStream(DatanodeInfo[] nodes,
        StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
      if (nodes.length == 0) {
        DFSClient.LOG.info("nodes are empty for write pipeline of block "
            + block);
        return false;
      }
      Status pipelineStatus = SUCCESS;
      String firstBadLink = "";
      boolean checkRestart = false;
      if (DFSClient.LOG.isDebugEnabled()) {
        for (int i = 0; i < nodes.length; i++) {
          DFSClient.LOG.debug("pipeline = " + nodes[i]);
        }
      }

      // persist blocks on namenode on next flush
      persistBlocks.set(true);

      int refetchEncryptionKey = 1;
      while (true) {
        boolean result = false;
        DataOutputStream out = null;
        try {
          assert null == s : "Previous socket unclosed";
          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
          // 建立socket連接
          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
          
          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
          InputStream unbufIn = NetUtils.getInputStream(s);
          IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
          unbufOut = saslStreams.out;
          unbufIn = saslStreams.in;
          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          blockReplyStream = new DataInputStream(unbufIn);
  
          //
          // Xmit header info to datanode
          //
  
          BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;

          // We cannot change the block length in 'block' as it counts the number
          // of bytes ack'ed.
          ExtendedBlock blockCopy = new ExtendedBlock(block);
          blockCopy.setNumBytes(blockSize);

          // send the request
          // 發(fā)送請求,建立連接(WRITE_BLOCK類型)
          new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
              dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
              nodes.length, block.getNumBytes(), bytesSent, newGS,
              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
  
          // receive ack for connect
          // 收到ack
          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
              PBHelper.vintPrefixed(blockReplyStream));
          pipelineStatus = resp.getStatus();
          firstBadLink = resp.getFirstBadLink();
          
          // Got an restart OOB ack.
          // If a node is already restarting, this status is not likely from
          // the same node. If it is from a different node, it is not
          // from the local datanode. Thus it is safe to treat this as a
          // regular node error.
          if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
            restartingNodeIndex == -1) {
            checkRestart = true;
            throw new IOException("A datanode is restarting.");
          }
          if (pipelineStatus != SUCCESS) {
            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
              throw new InvalidBlockTokenException(
                  "Got access token error for connect ack with firstBadLink as "
                      + firstBadLink);
            } else {
              throw new IOException("Bad connect ack with firstBadLink as "
                  + firstBadLink);
            }
          }
          assert null == blockStream : "Previous blockStream unclosed";
          blockStream = out;
          result =  true; // success
          restartingNodeIndex = -1;
          hasError = false;
        } catch (IOException ie) {
          if (restartingNodeIndex == -1) {
            DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
          }
          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                + "encryption key was invalid when connecting to "
                + nodes[0] + " : " + ie);
            // The encryption key used is invalid.
            refetchEncryptionKey--;
            dfsClient.clearDataEncryptionKey();
            // Don't close the socket/exclude this node just yet. Try again with
            // a new encryption key.
            continue;
          }
  
          // find the datanode that matches
          if (firstBadLink.length() != 0) {
            for (int i = 0; i < nodes.length; i++) {
              // NB: Unconditionally using the xfer addr w/o hostname
              if (firstBadLink.equals(nodes[i].getXferAddr())) {
                errorIndex = i;
                break;
              }
            }
          } else {
            assert checkRestart == false;
            errorIndex = 0;
          }
          // Check whether there is a restart worth waiting for.
          if (checkRestart && shouldWaitForRestart(errorIndex)) {
            restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
                Time.now();
            restartingNodeIndex = errorIndex;
            errorIndex = -1;
            DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
                nodes[restartingNodeIndex]);
          }
          hasError = true;
          setLastException(ie);
          result =  false;  // error
        } finally {
          if (!result) {
            IOUtils.closeSocket(s);
            s = null;
            IOUtils.closeStream(out);
            out = null;
            IOUtils.closeStream(blockReplyStream);
            blockReplyStream = null;
          }
        }
        return result;
      }
    }

NameNode收到連接請求后,會為該block創(chuàng)建一個(gè)DataXceiver,這個(gè)后面到DataNode端會講。然后回到initDataStreaming()方法。

    private void initDataStreaming() {
      this.setName("DataStreamer for file " + src +
          " block " + block);
      // 初始化ResponseProcessor(需要從哪些DataNode收取ack)
      response = new ResponseProcessor(nodes);
      // 啟動ResponseProcessor
      response.start();
      // 將輸出流狀態(tài)置為DATA_STREAMING
      stage = BlockConstructionStage.DATA_STREAMING;
    }

該方法主要是啟動ResponseProcessor線程用于收取DataNode的ack,所以我們主要關(guān)注ResponseProcessor.run()方法。

      public void run() {

        setName("ResponseProcessor for block " + block);
        // 用于反序列化ack消息
        PipelineAck ack = new PipelineAck();

        while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
          // process responses from datanodes.
          try {
            // read an ack from the pipeline
            long begin = Time.monotonicNow();
            // 反序列化ack消息
            ack.readFields(blockReplyStream);
            long duration = Time.monotonicNow() - begin;
            if (duration > dfsclientSlowLogThresholdMs
                && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
              DFSClient.LOG
                  .warn("Slow ReadProcessor read fields took " + duration
                      + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
                      + ack + ", targets: " + Arrays.asList(targets));
            } else if (DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("DFSClient " + ack);
            }

			// 獲取請求序號
            long seqno = ack.getSeqno();
            // processes response status from datanodes.
            // 由于hdfs采用鏈?zhǔn)綇?fù)制,所以鏈中第一個(gè)節(jié)點(diǎn)的ack消息會聚合所有鏈中節(jié)點(diǎn)的ack
            // 這里一一校驗(yàn)是否有DataNode復(fù)制失敗
            for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
              final Status reply = ack.getReply(i);
              // Restart will not be treated differently unless it is
              // the local node or the only one in the pipeline.
              if (PipelineAck.isRestartOOBStatus(reply) &&
                  shouldWaitForRestart(i)) {
                restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
                    Time.now();
                setRestartingNodeIndex(i);
                String message = "A datanode is restarting: " + targets[i];
                DFSClient.LOG.info(message);
               throw new IOException(message);
              }
              // node error
              if (reply != SUCCESS) {
                setErrorIndex(i); // first bad datanode
                throw new IOException("Bad response " + reply +
                    " for block " + block +
                    " from datanode " + 
                    targets[i]);
              }
            }
            
            assert seqno != PipelineAck.UNKOWN_SEQNO : 
              "Ack for unknown seqno should be a failed ack: " + ack;
            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
              continue;
            }

            // a success ack for a data packet
            Packet one;
            synchronized (dataQueue) {
              one = ackQueue.getFirst();
            }
            /*
            * 每個(gè)block由單線程的DataStreamer負(fù)責(zé)傳輸,在DataNode中也由對應(yīng)的單線程DataXceiver進(jìn)行處理,
            * 由于提供的通信鏈路能保證FIFO,所以序列號應(yīng)該能對上,消息不會亂序到達(dá)
            */
            if (one.seqno != seqno) {
              throw new IOException("ResponseProcessor: Expecting seqno " +
                                    " for block " + block +
                                    one.seqno + " but received " + seqno);
            }
            isLastPacketInBlock = one.lastPacketInBlock;

            // Fail the packet write for testing in order to force a
            // pipeline recovery.
            if (DFSClientFaultInjector.get().failPacket() &&
                isLastPacketInBlock) {
              failPacket = true;
              throw new IOException(
                    "Failing the last packet for testing.");
            }
              
            // update bytesAcked
            block.setNumBytes(one.getLastByteOffsetBlock());

			// ack隊(duì)列移除該P(yáng)acket,喚醒dataQueue
            synchronized (dataQueue) {
              lastAckedSeqno = seqno;
              ackQueue.removeFirst();
              dataQueue.notifyAll();

              one.releaseBuffer(byteArrayManager);
            }
          } catch (Exception e) {
            if (!responderClosed) {
              if (e instanceof IOException) {
                setLastException((IOException)e);
              }
              hasError = true;
              // If no explicit error report was received, mark the primary
              // node as failed.
              tryMarkPrimaryDatanodeFailed();
              synchronized (dataQueue) {
                dataQueue.notifyAll();
              }
              if (restartingNodeIndex == -1) {
                DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
                     + " for block " + block, e);
              }
              responderClosed = true;
            }
          }
        }
      }

(二)數(shù)據(jù)寫入

out.write("Hello, HDFS".getBytes());

首先進(jìn)入FilterOutputStream.write()。

    public void write(byte b[]) throws IOException {
        write(b, 0, b.length);
    }

然后進(jìn)入DataOutputStream.write()。

    public synchronized void write(byte b[], int off, int len)
        throws IOException
    {
        out.write(b, off, len);
        incCount(len);
    }

進(jìn)入FSDataOutputStream.write()。

    public void write(byte b[], int off, int len) throws IOException {
      out.write(b, off, len);
      position += len;                            // update position
      if (statistics != null) {
        statistics.incrementBytesWritten(len);
      }
    }

進(jìn)入FSOutputSummer.write()

  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }

    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {  // 注意這個(gè)write1()
    }
  }

進(jìn)入FSOutputSummer.write1()。

  private int write1(byte b[], int off, int len) throws IOException {
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      // 如果buffer是空的,而且待寫入數(shù)據(jù)大小大于buffer大小,直接生成校驗(yàn)和并寫chunk
      final int length = buf.length;
      writeChecksumChunks(b, off, length);
      return length;
    }
    
    // copy user data to local buffer
    // 如果buffer不為空,首先計(jì)算buffer剩余大小,
    // 并填充對應(yīng)長度的數(shù)據(jù)進(jìn)入buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      // buffer滿了,flush
      flushBuffer();
    } 
    return bytesToCopy;
  }

該方法用于寫chunk,關(guān)鍵方法是writeChecksumChunks(),flushBuffer()中該方法也為關(guān)鍵方法。

  private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    // 計(jì)算校驗(yàn)和
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    // sum.getBytesPerChecksum()一般為512B
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      // 該chunk的長度
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      // 校驗(yàn)和的偏移量,CRC32C中,校驗(yàn)和的長度為4B
      // 校驗(yàn)和和chunk數(shù)據(jù)在Packet中是分開存的,所以可以靠此偏移量找到校驗(yàn)和應(yīng)填充的位置
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      // 寫chunk
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }

關(guān)鍵關(guān)注writeChunk()。

  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }

	// 如果當(dāng)前Packet為空,就創(chuàng)建Packet
    if (currentPacket == null) {
      // packetSize為65532(64K),chunksPerPacket為127(一個(gè)Packet有127個(gè)chunk)
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }

	// 寫校驗(yàn)和(checksum的范圍是33 ~ 33 + 4 * 127),4 * 127為校驗(yàn)和大小 * chunk數(shù)量
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    // 寫chunk數(shù)據(jù)(chunk數(shù)據(jù)的范圍是33 + 4 * 127 ~ 65532)
    currentPacket.writeData(b, offset, len);
    currentPacket.numChunks++;
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    // 如果Packet或者block滿了,將現(xiàn)在的Packet加入dataQueue
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }

	  // 將Packet加入dataQueue
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

著重關(guān)注waitAndQueueCurrentPacket()。

  private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      try {
      // If queue is full, then wait till we have enough space
      while (!closed && dataQueue.size() + ackQueue.size()  > dfsClient.getConf().writeMaxPackets) {
        try {
          dataQueue.wait();
        } catch (InterruptedException e) {
          // If we get interrupted while waiting to queue data, we still need to get rid
          // of the current packet. This is because we have an invariant that if
          // currentPacket gets full, it will get queued before the next writeChunk.
          //
          // Rather than wait around for space in the queue, we should instead try to
          // return to the caller as soon as possible, even though we slightly overrun
          // the MAX_PACKETS length.
          Thread.currentThread().interrupt();
          break;
        }
      }
      checkClosed();
      // 將當(dāng)前Packet加入dataQueue
      queueCurrentPacket();
      } catch (ClosedChannelException e) {
      }
    }
  }

繼續(xù)看queueCurrentPacket()。

  private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      // 當(dāng)前Packet加入dataQueue
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.seqno;
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
      }
      currentPacket = null;
      dataQueue.notifyAll();
    }
  }

至此傳輸流程已通。

(三)輸出流關(guān)閉

fs.close();

最后,在輸出流關(guān)閉之后(DistributedFileSystem.close()中關(guān)閉輸出流),會進(jìn)行complete()遠(yuǎn)程調(diào)用,用于通知NameNode完成一個(gè)文件。首先看DFSOutputStream.close()。

  public synchronized void close() throws IOException {
    if (closed) {
      IOException e = lastException.getAndSet(null);
      if (e == null)
        return;
      else
        throw e;
    }

    try {
      flushBuffer();       // flush from all upper layers

      if (currentPacket != null) { 
        waitAndQueueCurrentPacket();
      }

      if (bytesCurBlock != 0) {
        // send an empty packet to mark the end of the block
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
      }

      flushInternal();             // flush all data to Datanodes
      // get last block before destroying the streamer
      ExtendedBlock lastBlock = streamer.getBlock();
      closeThreads(false);
      // complete() rpc調(diào)用
      completeFile(lastBlock);
      dfsClient.endFileLease(fileId);
    } catch (ClosedChannelException e) {
    } finally {
      closed = true;
    }
  }

該方法釋放所有與該輸出流關(guān)聯(lián)的資源,比如把還沒傳輸完的chunk傳輸完、停止DataStreamerResponseProcessor線程等。著重關(guān)注completeFile()。

  private void completeFile(ExtendedBlock last) throws IOException {
    long localstart = Time.now();
    long localTimeout = 400;
    boolean fileComplete = false;
    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
    while (!fileComplete) {
      // complete() rpc
      fileComplete =
          dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
      if (!fileComplete) {
        final int hdfsTimeout = dfsClient.getHdfsTimeout();
        if (!dfsClient.clientRunning ||
              (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
            String msg = "Unable to close file because dfsclient " +
                          " was unable to contact the HDFS servers." +
                          " clientRunning " + dfsClient.clientRunning +
                          " hdfsTimeout " + hdfsTimeout;
            DFSClient.LOG.info(msg);
            throw new IOException(msg);
        }
        try {
          if (retries == 0) {
            throw new IOException("Unable to close file because the last block"
                + " does not have enough number of replicas.");
          }
          retries--;
          Thread.sleep(localTimeout);
          localTimeout *= 2;
          if (Time.now() - localstart > 5000) {
            DFSClient.LOG.info("Could not complete " + src + " retrying...");
          }
        } catch (InterruptedException ie) {
          DFSClient.LOG.warn("Caught exception ", ie);
        }
      }
    }
  }

這里發(fā)起complete()的rpc請求來通知NameNode完成一個(gè)文件。

二、服務(wù)端

HDFS寫流程源碼分析(二)-NameNode服務(wù)端
HDFS寫流程源碼分析(三)-DataNode服務(wù)端文章來源地址http://www.zghlxwxcb.cn/news/detail-502095.html

到了這里,關(guān)于HDFS寫流程源碼分析(一)-客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 31.Netty源碼之客戶端啟動流程

    31.Netty源碼之客戶端啟動流程

    如果看了服務(wù)器端的啟動流程,這里簡單看下就可以了。 java package io.netty.server; ? import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; ? ? public final class EchoClient { ?

    2024年02月12日
    瀏覽(29)
  • 【Kafka源碼走讀】Admin接口的客戶端與服務(wù)端的連接流程

    【Kafka源碼走讀】Admin接口的客戶端與服務(wù)端的連接流程

    注:本文對應(yīng)的kafka的源碼的版本是trunk分支。寫這篇文章的主要目的是當(dāng)作自己閱讀源碼之后的筆記,寫的有點(diǎn)凌亂,還望大佬們海涵,多謝! 最近在寫一個(gè)Web版的kafka客戶端工具,然后查看Kafka官網(wǎng),發(fā)現(xiàn)想要與Server端建立連接,只需要執(zhí)行 方法即可,但其內(nèi)部是如何工作

    2024年02月16日
    瀏覽(27)
  • 【Zookeeper源碼走讀】第三章 服務(wù)器處理客戶端請求的流程

    前一篇文章,已經(jīng)大致介紹了Server的啟動流程,在NIOServerCnxnFactory.start()方法中,啟動了多個(gè)線程,其中就有接收socket報(bào)文的線程,代碼如下: 注意這里,acceptThread是接收socket的線程(AcceptThread),acceptThread的初始化是在NIOServerCnxnFactory.configure()中實(shí)現(xiàn)的: NIOServerCnxnFactory.confi

    2024年02月02日
    瀏覽(28)
  • java 客戶端操作HDFS

    部署包win版本 源碼包zip包 lib整合:共121個(gè)jar包 $HADOOP_PREFIX/share/hadoop/{common,hdfs,mapreduce,yarn,tools}/{lib,.}*.jar ?將windows版本hadoop/bin/hadoop.dll 放到c:/windows/system32下 hadoop的bin和sbin目錄放PATH中+HADOOP_HOME+HADOOP_USER_NAME=root 安裝插件 配置 重啟電腦?。。。。。。〖虞dhadoop.dll 創(chuàng)建java p

    2024年02月10日
    瀏覽(87)
  • HDFS之Java客戶端操作

    HDFS之Java客戶端操作

    Hadoop版本:Hadoop-3.1.3 Linux版本:CentOS7.5 IDE工具:IntelliJ IDEA(Windows環(huán)境下) HDFS的Java客戶端操作也是入門Hadoop開發(fā)的學(xué)習(xí)重點(diǎn) 下載依賴 https://github.com/steveloughran/winutils 根據(jù)自己的Hadoop安裝版本選擇對應(yīng)版本的依賴,如果需要更高版本的依賴,請從以下地址下載 https://github.

    2024年02月12日
    瀏覽(46)
  • hbase優(yōu)化:客戶端、服務(wù)端、hdfs

    hbase優(yōu)化 一.讀優(yōu)化 1.客戶端: 2.服務(wù)器: 3.列簇:是否過多、 是否使用布隆過濾器:任何業(yè)務(wù)都應(yīng)該設(shè)置Bloomfilter,通常設(shè)置為row就可以,除非確認(rèn)業(yè)務(wù)隨機(jī)查詢類型為row+cf,可以設(shè)置為rowcol 是否設(shè)置ttl 4.hdfs優(yōu)化: 二、寫優(yōu)化 是否需要寫WAL?WAL是否需要同步寫入 用批量

    2024年02月14日
    瀏覽(37)
  • 【大數(shù)據(jù)】HDFS客戶端命令行(hdfs dfs)詳細(xì)使用說明

    【大數(shù)據(jù)】HDFS客戶端命令行(hdfs dfs)詳細(xì)使用說明

    hadoop分布式文件系統(tǒng)客戶端命令行操作 全局變量說明 Path 路徑支持正則表達(dá)式 通配符 名稱 匹配 * 星號 匹配0或多個(gè)字符 ? 問號 匹配單一字符 [ab] 字符類別 匹配{a,b}中的一個(gè)字符 [^ab] 非字符類別 匹配不是{a,b}中的一個(gè)字符 [a-b] 字符范圍 匹配一個(gè)在{a,b}范圍內(nèi)的 字符(包括

    2024年02月09日
    瀏覽(34)
  • 【HDFS】ResponseProcessor線程詳解以及客戶端backoff反壓

    ResponseProcessor如何處理datanode側(cè)發(fā)過來的packet ack的 客戶端側(cè)backoff邏輯。 ResponseProcessor:主要功能是處理來自datanode的響應(yīng)。當(dāng)一個(gè)packet的響應(yīng)到達(dá)時(shí),會把這個(gè)packet從ackQueue里移除。

    2024年02月11日
    瀏覽(35)
  • c語言實(shí)現(xiàn)https客戶端 源碼+詳細(xì)注釋(OpenSSL下載,visual studio編譯器環(huán)境配置)

    c語言實(shí)現(xiàn)https客戶端 源碼+詳細(xì)注釋(OpenSSL下載,visual studio編譯器環(huán)境配置)

    請參考:openssl下載安裝教程 步驟:官網(wǎng)下載-安裝到選定目錄-配置環(huán)境變量-打開命令窗口檢查是否安裝成功 注意: 打開命令窗口(快捷鍵 win + r ,在彈出窗口內(nèi)輸入cmd按回車),輸入命令openssl version如果顯示openssl版本則表示安裝成功。 我出現(xiàn)的問題:明明安裝上了卻顯示

    2024年04月16日
    瀏覽(56)
  • 【HDFS】每天一個(gè)RPC系列----complete(二):客戶端側(cè)

    【HDFS】每天一個(gè)RPC系列----complete(二):客戶端側(cè)

    上圖給出了最終會調(diào)用到complete RPC的客戶端側(cè)方法鏈路(除去Router那條線了)。 org.apache.hadoop.hdfs.DFSOutputStream#completeFile(org.apache.hadoop.hdfs.protocol.ExtendedBlock): 下面這個(gè)方法在complete rpc返回true之前,會進(jìn)行重試,直到超過最大重試次數(shù)拋異常。 另外需要注意的是,這個(gè)方法在

    2024年02月13日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包