国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

第四節(jié) zookeeper集群與分布式鎖

這篇具有很好參考價值的文章主要介紹了第四節(jié) zookeeper集群與分布式鎖。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

1. Zookeeper集群操作

1.1 客戶端操作zk集群

1.2 模擬集群異常操作

1.3 curate客戶端連接zookeeper集群

2. Zookeeper實戰(zhàn)案例

2.1 創(chuàng)建項目引入依賴

2.2 獲取zk客戶端對象

2.3 常用API

2.4 客戶端向服務(wù)端寫入數(shù)據(jù)流程

2.5 服務(wù)器動態(tài)上下線、客戶端動態(tài)監(jiān)聽

2.6 測試

3.Zookeeper分布式鎖

3.1 什么是分布式鎖

3.2 Zookeeper分布式鎖分析

3.3 分布式鎖實現(xiàn)


1. Zookeeper集群操作

1.1 客戶端操作zk集群

1) 第一步啟動集群,啟動后查看Zookeeper進程。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

jps命令 作用是顯示當(dāng)前所有java 進程的pid 的命令,QuorumPeerMain是zookeeper集群的啟動入口類

2) 客戶端連接

  • 連接集群所有客戶端

[root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183 ? ? ? ? ? ? ? ? ?

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

連接集群單個客戶端

# 連接2181
[root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181 
?
# 連接2182
[root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2182
?
# 在2181中創(chuàng)建節(jié)點
[zk: 192.168.58.200:2181(CONNECTED) 0] create /test2
?
# 在2182中查詢,發(fā)現(xiàn)數(shù)據(jù)已同步
[zk: 192.168.58.200:2182(CONNECTED) 0] ls /
[test1, test2, zookeeper]

以上兩種方式的區(qū)別在于:

  • 如果只連接單個客戶端,如果當(dāng)前連接的服務(wù)器掛掉,當(dāng)前客戶端連接也會掛掉,連接失敗。

  • 如果是連接所有客戶端的形式,則允許集群中半數(shù)以下的服務(wù)掛掉!當(dāng)半數(shù)以上服務(wù)掛掉才會停止服務(wù),可用性更高一點!

3)集群節(jié)點信息查看

集群中的節(jié)點信息被存放在每一個節(jié)點/zookeeper/config/目錄下

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

1.2 模擬集群異常操作

Leader選舉

  • Serverid : 服務(wù)器ID

    • 三臺服務(wù) 編號分別是 1 2 3

    • 編號越大在選擇算法中權(quán)重越大

  • Zxid: 數(shù)據(jù)ID

    • 服務(wù)器中存放的最大的數(shù)據(jù)ID, 值越大數(shù)據(jù)越新

  • 在Leader選舉的過程中 如果某臺Zookeeper獲得了超過半數(shù)的選票,就可以當(dāng)選Leader

(1)首先我們先測試如果是從服務(wù)器掛掉,會怎么樣

把3號服務(wù)器停掉,觀察1號和2號,發(fā)現(xiàn)狀態(tài)并沒有變化

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop
?
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

由此得出結(jié)論,3個節(jié)點的集群,從服務(wù)器掛掉,集群正常

(2)我們再把1號服務(wù)器(從服務(wù)器)也停掉,查看2號(主服務(wù)器)的狀態(tài),發(fā)現(xiàn)已經(jīng)停止運行了。

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop
?
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

由此得出結(jié)論,3個節(jié)點的集群,2個從服務(wù)器都掛掉,主服務(wù)器也無法運行。因為可運行的機器沒有超過集群總數(shù)量的半數(shù)。

(3)我們再次把1號服務(wù)器啟動起來,發(fā)現(xiàn)2號服務(wù)器又開始正常工作了。而且依然是領(lǐng)導(dǎo)者。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

(4)我們把3號服務(wù)器也啟動起來,把2號服務(wù)器停掉,停掉后觀察1號和3號的狀態(tài)。

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop
?
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

發(fā)現(xiàn)新的leader產(chǎn)生了~

由此我們得出結(jié)論,當(dāng)集群中的主服務(wù)器掛了,集群中的其他服務(wù)器會自動進行選舉狀態(tài),然后產(chǎn)生新得leader 。

(5)我們再次測試,當(dāng)我們把2號服務(wù)器重新啟動起來啟動后,會發(fā)生什么?2號服務(wù)器會再次成為新的領(lǐng)導(dǎo)嗎?我們看結(jié)果

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
?
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

我們會發(fā)現(xiàn),2號服務(wù)器啟動后依然是跟隨者(從服務(wù)器),3號服務(wù)器依然是領(lǐng)導(dǎo)者(主服務(wù)器),沒有撼動3號服務(wù)器的領(lǐng)導(dǎo)地位。

由此我們得出結(jié)論,當(dāng)領(lǐng)導(dǎo)者產(chǎn)生后,再次有新服務(wù)器加入集群,不會影響到現(xiàn)任領(lǐng)導(dǎo)者。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

1.3 curate客戶端連接zookeeper集群

public class CuratorCluster {
?
 ? ?//zookeeper連接
 ? ?private final static String CLUSTER_CONNECT = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";
?
 ? ?//session超時時間
 ? ?private static final int sessionTimeoutMs = 60 * 1000;
?
 ? ?//連接超時時間
 ? ?private static final int connectionTimeoutMs = 5000;
?
 ? ?private static CuratorFramework client;
?
 ? ?public static String getClusterConnect() {
 ? ? ? ?return CLUSTER_CONNECT;
 ?  }
?
 ? ?@Before
 ? ?public void init(){
?
 ? ? ? ?// 重試策略
 ? ? ? ?RetryPolicy retryPolicy =new ExponentialBackoffRetry(3000,10);
?
 ? ? ? ?// zookeeper連接
 ? ? ? ?client = CuratorFrameworkFactory.builder()
 ? ? ? ? ? ? ?  .connectString(getClusterConnect())
 ? ? ? ? ? ? ?  .sessionTimeoutMs(60*1000)
 ? ? ? ? ? ? ?  .connectionTimeoutMs(15*1000)
 ? ? ? ? ? ? ?  .retryPolicy(retryPolicy)
 ? ? ? ? ? ? ?  .namespace("mashibing") ?//當(dāng)前程序創(chuàng)建目錄的根目錄
 ? ? ? ? ? ? ?  .build();
?
 ? ? ? ?// 添加監(jiān)聽器
 ? ? ? ?client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
 ? ? ? ? ? ? ? ?System.out.println("連接成功!");
 ? ? ? ? ?  }
 ? ? ?  });
?
 ? ? ? ?client.start();
 ?  }
?
 ? ?//創(chuàng)建節(jié)點
 ? ?public void createIfNeed(String path) throws Exception {
 ? ? ? ?Stat stat = client.checkExists().forPath(path);
 ? ? ? ?if(stat == null){
 ? ? ? ? ? ?String s = client.create().forPath(path);
 ? ? ? ? ? ?System.out.println("創(chuàng)建節(jié)點: " + s);
 ? ? ?  }
 ?  }
?
?
 ? ?//從集群中獲取數(shù)據(jù)
 ? ?@Test
 ? ?public void testCluster() throws Exception {
 ?
 ? ? ? ?createIfNeed("/test");
?
 ? ? ? ?//每隔一段時間 獲取一次數(shù)據(jù)
 ? ? ? ?while(true){
 ? ? ? ? ? ?byte[] data = client.getData().forPath("/test");
 ? ? ? ? ? ?System.out.println(new String(data));
?
 ? ? ? ? ? ?TimeUnit.SECONDS.sleep(5);
 ? ? ?  }
 ?  }
}

在集群中的任意服務(wù)器節(jié)點,為test設(shè)置數(shù)據(jù)

[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 2] set /mashibing/test 12345

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

2. Zookeeper實戰(zhàn)案例

2.1 創(chuàng)建項目引入依賴

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

2.2 獲取zk客戶端對象

public class ZkClientTest {

    private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";
    private int sessionTimeout = 2000;

    private ZkClient zkClient;

    /**
     * 獲取zk客戶端連接
     */
    @Before
    public void Before(){

        /**
         * 參數(shù)1:服務(wù)器的IP和端口
         * 參數(shù)2:會話的超時時間
         * 參數(shù)3:會話的連接時間
         * 參數(shù)4:序列化方式
         */
        zkClient = new ZkClient(connectString, sessionTimeout, 1000 * 15, new SerializableSerializer());

    }

    @After
    public void after(){
        zkClient.close();
    }
}

2.3 常用API

  • 創(chuàng)建節(jié)點

    /**
     * 創(chuàng)建節(jié)點
     */
    @Test
    public void testCreateNode(){

        //創(chuàng)建方式 返回創(chuàng)建節(jié)點名稱
        String nodeName = zkClient.create("/node1", "lisi", CreateMode.PERSISTENT);
        System.out.println("路徑名稱為:" + nodeName);

        zkClient.create("/node2","wangwu",CreateMode.PERSISTENT_SEQUENTIAL);
        zkClient.create("/node3","hehe",CreateMode.EPHEMERAL);
        zkClient.create("/node4","haha",CreateMode.EPHEMERAL_SEQUENTIAL);

        while(true){}
    }
  • 刪除節(jié)點

    /**
     * 刪除節(jié)點
     */
    @Test
    public void testDeleteNode(){
        // 刪除沒有子節(jié)點的節(jié)點
//        boolean b1 = zkClient.delete("/node2");
//        System.out.println("刪除成功: " + b1);

        // 遞歸刪除節(jié)點信息
        boolean b2 = zkClient.deleteRecursive("/node2");
        System.out.println("刪除成功: " + b2);
    }
  • 查看節(jié)點的子節(jié)點

    /**
     * 查詢節(jié)點的子節(jié)點
     */
    @Test
    public void testFindNodes(){

        //返回指定路徑的節(jié)點信息
        List<String> ch = zkClient.getChildren("/");

        for (String c1 : ch) {
            System.out.println(c1);
        }
    }
  • 查看當(dāng)前節(jié)點的數(shù)據(jù)

    • 注意:如果出現(xiàn):org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 61616161 異常的原因是: 在shell中的數(shù)據(jù)序列化方式 和 java代碼中使用的序列化方式不一致導(dǎo)致 因此要解決這個問題只需要保證序列化一致即可 都使用相同端操作即可

    /**
     * 獲取節(jié)點數(shù)據(jù)
     */
    @Test
    public void testFindNodeData(){
        String nodeName = zkClient.create("/node3", "taotao", CreateMode.PERSISTENT);
        Object data = zkClient.readData("/node3");
        System.out.println(data);
    }
  • 查看當(dāng)前節(jié)點的數(shù)據(jù)并獲取狀態(tài)信息

    /**
     * 獲取數(shù)據(jù)以及當(dāng)前節(jié)點狀態(tài)信息
     */
    @Test
    public void testFindNodeDataAndStat(){
        Stat stat = new Stat();
        Object data = zkClient.readData("/node20000000004", stat);

        System.out.println(data);
        System.out.println(stat);
    }
  • 修改節(jié)點數(shù)據(jù)

    /**
     * 修改節(jié)點
     */
    @Test
    public void testUpdateNodeData(){

        zkClient.writeData("/node3","123456");
    }
  • 監(jiān)聽節(jié)點數(shù)據(jù)的變化

    /**
     * 監(jiān)聽節(jié)點數(shù)據(jù)
     */
    @Test
    public void testNodeChange(){

        zkClient.subscribeDataChanges("/node3", new IZkDataListener() {

            // 當(dāng)節(jié)點的值在修改時,會自動調(diào)用這個方法
            @Override
            public void handleDataChange(String nodeName, Object result) throws Exception {
                System.out.println("節(jié)點名稱: " + nodeName);
                System.out.println("節(jié)點數(shù)據(jù): " + result);
            }

            // 當(dāng)節(jié)點被刪除時,會調(diào)用該方法
            @Override
            public void handleDataDeleted(String nodeName) throws Exception {
                System.out.println("節(jié)點名稱: " + nodeName);
            }
        });

        while(true){}
    }
  • 監(jiān)聽節(jié)點目錄的變化

    /**
     * 監(jiān)聽節(jié)點目錄的變化
     */
    @Test
    public void testNodesChange(){

        zkClient.subscribeChildChanges("/node3", new IZkChildListener() {

            @Override
            public void handleChildChange(String nodeName, List<String> list) throws Exception {
                System.out.println("父節(jié)點名稱: " + nodeName);
                System.out.println("發(fā)生變更后,所有子節(jié)點名稱: ");
                for (String name : list) {
                    System.out.println(name);
                }
            }
        });

        while(true){}
    }
  • 判斷某一個節(jié)點是否存在

    //判斷節(jié)點是否存在
    @Test
    public void exist(){

        boolean exists = zkClient.exists("/node3");

        System.out.println(exists == true ? "節(jié)點存在" : "節(jié)點不存在");

    }

2.4 客戶端向服務(wù)端寫入數(shù)據(jù)流程

  • 寫流程之寫入請求,直接發(fā)送給Leader

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

  • 寫流程之寫入請求,發(fā)送給follower節(jié)點

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

2.5 服務(wù)器動態(tài)上下線、客戶端動態(tài)監(jiān)聽

某分布式系統(tǒng)中,主節(jié)點可以有多臺,可以動態(tài)上下線,任意一臺客戶端都能實時感知到主節(jié)點服務(wù)器的上下線。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

1)根節(jié)點下,創(chuàng)建servers節(jié)點

[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 0] 
create /servers "servers"
Created /servers

2)服務(wù)端代碼

完成服務(wù)端向zookeeper注冊、動態(tài)上下線的代碼。

/**
 * 服務(wù)端
 */
public class DistributeServer {

    private ZooKeeper client;

    // 連接信息
    private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";

    // 超時時間
    private int sessionTimeOut = 30000;

    public static void main(String[] args) throws Exception {

        DistributeServer server = new DistributeServer();

        //1.獲取zk連接
        server.getConnect();

        //2.將服務(wù)器注冊到zk集群,args參數(shù)通過啟動 main方法時傳入即可
        server.register(args[0]);

        //3.啟動業(yè)務(wù)邏輯(線程睡眠)
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 注冊操作
     * @param hostName 將服務(wù)器注冊到zk集群時,所需的服務(wù)名稱
     */
    private void register(String hostName) throws Exception {

        /**
         * ZooDefs.Ids.OPEN_ACL_UNSAFE: 此權(quán)限表示允許所有人訪問該節(jié)點(服務(wù)器)
         * CreateMode.EPHEMERAL_SEQUENTIAL: 由于服務(wù)器是動態(tài)上下線的,上線后存在,下線后不存在,所以是臨時節(jié)點
         * 而服務(wù)器一般都是有序號的,所以是臨時、有序的節(jié)點.
         */
        String node = client.create("/servers/" + hostName,
                hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("已成功創(chuàng)建" + node + "節(jié)點");
        System.out.println(hostName + " 已經(jīng)上線");
    }


    /**
     * 獲取連接
     */
    private void getConnect() throws IOException {
        client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

2)客戶端代碼

服務(wù)端代碼寫好之后,再來完成客戶端動態(tài)監(jiān)聽zk服務(wù)端各個節(jié)點的代碼。

/**
 * 客戶端
 */
public class DistributeClient {

    private ZooKeeper zk;

    // 連接信息
    private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";

    // 超時時間
    private int sessionTimeOut = 30000;


    public static void main(String[] args) throws Exception {

        DistributeClient client = new DistributeClient();

        //1.獲取zk連接
        client.getConnection();

        //2.監(jiān)聽 /servers下面所有的子節(jié)點變化
        client.getServerList();

        //3.業(yè)務(wù)邏輯
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 獲取連接
     */
    private void getConnection() throws Exception {
        zk = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //監(jiān)聽服務(wù)器地址的上下線
                try {
                    getServerList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 監(jiān)聽 /servers路徑下的所有子節(jié)點變化,true表示啟動監(jiān)聽器
     */
    private void getServerList() throws Exception {

        List<String> zkChildren = zk.getChildren("/servers", true);
        List<String> servers = new ArrayList<>();

        zkChildren.forEach(node -> {
            //拼接服務(wù)完整信息
            try {
                byte[] data = zk.getData("/servers/" + node, false, null);
                servers.add(new String(data));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println(servers);
        System.out.println("=======================");
    }
}

2.6 測試

1)Zookeeper命令行,完成測試

[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/zk01 "192.168.58.200:2181"
Created /servers/zk010000000000

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/zk02 "192.168.58.200:2182"
Created /servers/zk020000000001

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

[zk: localhost:2181(CONNECTED) 3] create -e -s /servers/zk03 "192.168.58.200:2183"
Created /servers/zk030000000002

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

上面的執(zhí)行結(jié)果可以看到,在servers下面依次創(chuàng)建子結(jié)點,客戶端代碼都可以成功監(jiān)聽到。

下面我們刪除節(jié)點,看看客戶端能不能做到動態(tài)監(jiān)聽功能(也即刪除的節(jié)點不會再被監(jiān)聽到)。

[zk: localhost:2181(CONNECTED) 5] delete /servers/zk010000000000

[zk: localhost:2181(CONNECTED) 7] delete /servers/zk020000000001

[zk: localhost:2181(CONNECTED) 8] delete /servers/zk030000000002 

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

使用Java代碼來測試

  • 先啟動客戶端代碼

  • 再啟動服務(wù)端代碼

只是在服務(wù)端代碼中,我們的 register 方法中傳參用到了 args ,所以啟動之前要傳入這個參數(shù)。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

傳入 192.168.58.200 之后,可以看到服務(wù)端代碼已經(jīng)能夠?qū)崿F(xiàn)動態(tài)上線了。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

這里我們的 192.168.58.200 動態(tài)上線之后,可以看到客戶端也正常的監(jiān)聽到它了。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

轉(zhuǎn)到zk命令行中,也可以看到這臺服務(wù)器的節(jié)點信息。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

由于我們之前的服務(wù)端代碼還啟動著,此時我們再傳入新的參數(shù) 192.168.58.200:2182,那么之前的2181服務(wù)器肯定會被擠掉(這里模擬就是main方法,同一個類肯定只能同時啟動一次main方法了),那么我們看看客戶端能不能動態(tài)監(jiān)聽到2181下線、2182上線。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

服務(wù)端自然可以正常實現(xiàn)2182這臺服務(wù)器的動態(tài)上線。在客戶端代碼中可以看到List集合中已經(jīng)沒有2181了(即2181已經(jīng)下線了),而2182正常上線

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

3.Zookeeper分布式鎖

3.1 什么是分布式鎖

傳統(tǒng)單體應(yīng)用單機部署的情況下,可以使用并發(fā)處理相關(guān)的功能進行互斥控制,但是原單體單機部署的系統(tǒng)被演化成分布式集群系統(tǒng)后,由于分布式系統(tǒng)多線程、多進程并且分布在不同機器上,這將使原單機部署情況下的并發(fā)控制鎖策略失效。提出分布式鎖的概念,是為了解決跨機器的互斥機制來控制共享資源的訪問。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

3.2 Zookeeper分布式鎖分析

客戶端(對zookeeper集群而言)向zookeeper集群進行上線注冊,并在一個永久節(jié)點下創(chuàng)建有序的臨時子節(jié)點后,根據(jù)編號順序,最小順序的子節(jié)點獲取到鎖,其他子節(jié)點由小到大監(jiān)聽前一個節(jié)點。

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

當(dāng)拿到鎖的節(jié)點處理完事務(wù)后,釋放鎖,后一個節(jié)點監(jiān)聽到前一個節(jié)點釋放鎖后,立刻申請獲得鎖,以此類推

zookeeper集群 3個節(jié)點停掉2個,# zookeeper,java,zookeeper

3.3 分布式鎖實現(xiàn)

1)創(chuàng)建 Distributedlock類, 獲取與zookeeper的連接

  • 構(gòu)造方法中獲取連接

  • 添加 CountDownLatch

    CountDownLatch是具有synchronized機制的一個工具,目的是讓一個或者多個線程等待,直到其他線程的一系列操作完成。

    CountDownLatch初始化的時候,需要提供一個整形數(shù)字,數(shù)字代表著線程需要調(diào)用countDown()方法的次數(shù),當(dāng)計數(shù)為0時,線程才會繼續(xù)執(zhí)行await()方法后的其他內(nèi)容。

/**
 * 分布式鎖
 */
public class DistributedLock {

    private ZooKeeper client;

    // 連接信息
    private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";

    // 超時時間
    private int sessionTimeOut = 30000;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    //1. 在構(gòu)造方法中獲取連接
    public DistributedLock() throws Exception {

        client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });

        //等待Zookeeper連接成功,連接完成繼續(xù)往下走
        countDownLatch.await();

        //2. 判斷節(jié)點是否存在

    }

    //3.對ZK加鎖
    public void zkLock(){
        //創(chuàng)建 臨時帶序號節(jié)點

        //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點
    }


    //4.解鎖
    public void unZkLock(){

        //刪除節(jié)點
    }
}

2)對zk加鎖

/**
 * 分布式鎖
 */
public class DistributedLock {

    private ZooKeeper client;

    // 連接信息
    private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183";

    // 超時時間
    private int sessionTimeOut = 30000;

    // 等待zk連接成功
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    // 等待節(jié)點變化
    private CountDownLatch waitLatch = new CountDownLatch(1);

    //當(dāng)前節(jié)點
    private String currentNode;

    //前一個節(jié)點路徑
    private String waitPath;

    //1. 在構(gòu)造方法中獲取連接
    public DistributedLock() throws Exception {

        client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //countDownLatch 連上ZK,可以釋放
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    countDownLatch.countDown();
                }

                //waitLatch 需要釋放 (節(jié)點被刪除并且刪除的是前一個節(jié)點)
                if(watchedEvent.getType() == Event.EventType.NodeDeleted &&
                        watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });

        //等待Zookeeper連接成功,連接完成繼續(xù)往下走
        countDownLatch.await();

        //2. 判斷節(jié)點是否存在
        Stat stat = client.exists("/locks", false);
        if(stat == null){
            //創(chuàng)建一下根節(jié)點
            client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }

    //3.對ZK加鎖
    public void zkLock(){
        //創(chuàng)建 臨時帶序號節(jié)點
        try {
            currentNode = client.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點
            List<String> children = client.getChildren("/locks", false);

            //如果創(chuàng)建的節(jié)點只有一個值,就直接獲取到鎖,如果不是,監(jiān)聽它前一個節(jié)點
            if(children.size() == 1){
                return;
            }else{
                //先排序
                Collections.sort(children);

                //獲取節(jié)點名稱
                String nodeName = currentNode.substring("/locks/".length());

                //通過名稱獲取該節(jié)點在集合的位置
                int index = children.indexOf(nodeName);

                //判斷
                if(index == -1){
                    System.out.println("數(shù)據(jù)異常");
                }else if(index == 0){
                    //就一個節(jié)點,可以獲取鎖
                    return;
                }else{
                    //需要監(jiān)聽前一個節(jié)點變化
                    waitPath = "/locks/" + children.get(index-1);
                    client.getData(waitPath,true,null);

                    //等待監(jiān)聽執(zhí)行
                    waitLatch.await();
                    return;
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3)zk刪除鎖

    //4.解鎖
    public void unZkLock() throws KeeperException, InterruptedException {

        //刪除節(jié)點
        client.delete(currentNode,-1);
    }

4)測試文章來源地址http://www.zghlxwxcb.cn/news/detail-828621.html

public class DistributedLockTest {
?
 ? ?public static void main(String[] args) throws Exception {
?
 ? ? ? ?final DistributedLock lock1 = new DistributedLock();
 ? ? ? ?final DistributedLock lock2 = new DistributedLock();
?
 ? ? ? ?new Thread(new Runnable() {
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void run() {
?
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?lock1.zkLock();
 ? ? ? ? ? ? ? ? ? ?System.out.println("線程1 啟動 獲取到鎖");
?
 ? ? ? ? ? ? ? ? ? ?Thread.sleep(5 * 1000);
 ? ? ? ? ? ? ? ? ? ?lock1.unZkLock();
 ? ? ? ? ? ? ? ? ? ?System.out.println("線程1 釋放鎖");
 ? ? ? ? ? ? ?  } catch (InterruptedException | KeeperException e) {
 ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }).start();
?
 ? ? ? ?new Thread(new Runnable() {
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void run() {
?
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?lock2.zkLock();
 ? ? ? ? ? ? ? ? ? ?System.out.println("線程2 啟動 獲取到鎖");
?
 ? ? ? ? ? ? ? ? ? ?Thread.sleep(5 * 1000);
 ? ? ? ? ? ? ? ? ? ?lock2.unZkLock();
 ? ? ? ? ? ? ? ? ? ?System.out.println("線程2 釋放鎖");
 ? ? ? ? ? ? ?  } catch (InterruptedException | KeeperException e) {
 ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }).start();
 ?  }
}

到了這里,關(guān)于第四節(jié) zookeeper集群與分布式鎖的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • CentOS7安裝Zookeeper分布式集群

    前言 ZooKeeper 是一個開源的分布式協(xié)調(diào)服務(wù)框架,主要用于分布式系統(tǒng)中的數(shù)據(jù)同步、配置管理、集群控制和命名服務(wù)等方面的處理。本文在3個系統(tǒng)為CentOS7的Linux節(jié)點上配置了Zookeeper分布式集群,安裝配置的過程均在普通用戶 sxZhang 的賬戶上完成,安裝路徑為 ~/bigdata ,3個

    2024年01月17日
    瀏覽(26)
  • 分布式集群框架——有關(guān)zookeeper的面試考點

    分布式集群框架——有關(guān)zookeeper的面試考點

    ? ? ? 當(dāng)涉及到大規(guī)模分布式系統(tǒng)的協(xié)調(diào)和管理時,Zookeeper是一個非常重要的工具。 1. 分布式協(xié)調(diào)服務(wù):Zookeeper是一個分布式協(xié)調(diào)服務(wù),它提供了一個高可用和高性能的環(huán)境,用于協(xié)調(diào)和同步分布式系統(tǒng)中的各個節(jié)點。它通過提供共享的命名空間和一致性的數(shù)據(jù)模型來簡化開

    2024年02月11日
    瀏覽(19)
  • zookeeper+kafka分布式消息隊列集群的部署

    zookeeper+kafka分布式消息隊列集群的部署

    目錄 一、zookeeper 1.Zookeeper 定義 2.Zookeeper 工作機制 3.Zookeeper 特點 4.Zookeeper 數(shù)據(jù)結(jié)構(gòu) 5.Zookeeper 應(yīng)用場景 (1)統(tǒng)一命名服務(wù) (2)統(tǒng)一配置管理 (3)統(tǒng)一集群管理 (4)服務(wù)器動態(tài)上下線 6.Zookeeper 選舉機制 (1)第一次啟動選舉機制 (2)非第一次啟動選舉機制 7.部署zookeepe

    2024年02月14日
    瀏覽(25)
  • 分布式應(yīng)用之zookeeper集群+消息隊列Kafka

    分布式應(yīng)用之zookeeper集群+消息隊列Kafka

    ? ? ? ?ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護、域名服務(wù)、分布式同步、組服務(wù)等。為分布式框架提供協(xié)調(diào)服務(wù)的

    2024年02月06日
    瀏覽(138)
  • 分布式集群——jdk配置與zookeeper環(huán)境搭建

    分布式集群——jdk配置與zookeeper環(huán)境搭建

    分布式集群——jdk配置與zookeeper環(huán)境搭建 分布式集群——搭建Hadoop環(huán)境以及相關(guān)的Hadoop介紹 文章目錄 系列文章目錄 前言 一 zookeeper介紹與環(huán)境配置 1.1 zookeeper的學(xué)習(xí) 1.2 Zookeeper的主要功能 1.2.1 znode的節(jié)點類型 1.2.2 zookeeper的實現(xiàn) 1.3 Zookeeper的特征 zookeeper的幾種角色? 1.4 關(guān)于

    2024年02月10日
    瀏覽(95)
  • 第五節(jié) zookeeper集群與分布式鎖_2

    第五節(jié) zookeeper集群與分布式鎖_2

    1)要介紹分布式鎖,首先要提到與分布式鎖相對應(yīng)的是線程鎖。 線程鎖 :主要用來給方法、代碼塊加鎖。當(dāng)某個方法或代碼使用鎖,在同一時刻僅有一個線程執(zhí)行該方法或該代碼段。 線程鎖只在同一JVM中有效果,因為線程鎖的實現(xiàn)在根本上是依靠線程之間共享內(nèi)存實現(xiàn)的,

    2024年02月19日
    瀏覽(18)
  • 【簡單認(rèn)識zookeeper+kafka分布式消息隊列集群的部署】

    【簡單認(rèn)識zookeeper+kafka分布式消息隊列集群的部署】

    Zookeeper是一個開源的分布式的,為分布式框架提供協(xié)調(diào)服務(wù)的Apache項目。 Zookeeper從設(shè)計模式角度來理解:是一個基于觀察者模式設(shè)計的分布式服務(wù)管理框架,它負責(zé)存儲和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負責(zé)通知已

    2024年02月13日
    瀏覽(24)
  • 搭建Zookeeper集群:三臺服務(wù)器,一場分布式之舞

    搭建Zookeeper集群:三臺服務(wù)器,一場分布式之舞

    歡迎來到我的博客,代碼的世界里,每一行都是一個故事 在分布式系統(tǒng)的舞臺上,Zookeeper是一位不可或缺的重要角色。如何搭建一個穩(wěn)定、高性能的Zookeeper集群,成為每位系統(tǒng)管理員和開發(fā)者的必修課。讓我們一同踏上這場集群之旅,揭秘三臺服務(wù)器背后的Zookeeper配置細節(jié),

    2024年04月28日
    瀏覽(28)
  • 解決Hadoop完全分布式集群中從節(jié)點jps沒有datanode節(jié)點問題

    解決Hadoop完全分布式集群中從節(jié)點jps沒有datanode節(jié)點問題

    當(dāng)用start-dfs.sh和start-yarn.sh后,在slave節(jié)點(從節(jié)點)中用jps命令查看進程 正常情況: 有時候可能發(fā)現(xiàn)沒有Datanode,即只有兩項(第一項和最后一項)。原因可能是 重復(fù)格式化 namenode 后,導(dǎo)致 datanode 的 clusterID 和 namenode 的 clusterID 不一致。 解決方法: 在master節(jié)點(namenode): 找到你

    2024年02月06日
    瀏覽(33)
  • 解決Hadoop偽分布式集群jps沒有datanode節(jié)點問題

    解決Hadoop偽分布式集群jps沒有datanode節(jié)點問題

    在啟動Hadoop時,通過jps目錄發(fā)現(xiàn)沒有datanode進程。

    2024年02月13日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包