GitHub源碼https://github.com/zhangchuangiie/SimpleKafka
SimpleKafka(Kafka客戶端封裝工具類)
一個基于Kafka客戶端封裝的工具,Kafka開發(fā)效率神器
特點:
- 封裝了常用的Kafka客戶端操作,無需維護配置,無需初始化客戶端,真正實現(xiàn)了一行代碼調(diào)用
- 將連接池的維護封裝在工具類里面,多線程使用也無需維護客戶端集合
使用方式:
只需要集成1個KafkaUtil.java文件即可,修改里面的kafka服務(wù)地址即可
典型示例:
- 同步生產(chǎn):?LinkedHashMap<String, Object> recordMeta = KafkaUtil.sendToKafka("RULEa93304e6d844000","222","aaaa");
- 異步生產(chǎn):?KafkaUtil.sendToKafkaAsync("RULEa93304e6d844000", "222", "aaaa");
- 消費數(shù)據(jù):?ArrayList<LinkedHashMap<String, Object>> buffer = KafkaUtil.recvFromKafka("RULEa93304e6d844000", "group1");
- 重置偏移:?KafkaUtil.resetOffsetToEarliest("RULEa93304e6d844000", "group1");
接口介紹:
- kafkaListTopics:?topic列表
- createTopic:?topic創(chuàng)建
- delTopic:?topic刪除
- partitionsTopic:?topic的分區(qū)列表,分區(qū)和副本數(shù)
- delGroupId:?刪除groupId
- descCluster:?集群的節(jié)點列表
- kafkaConsumerGroups:?消費者列表
- kafkaConsumerGroups:?指定topic的活躍消費者列表
- sendToKafka:?生產(chǎn)數(shù)據(jù)到指定的topic,同步接口{"topic":"RULEa93304e6d844000","partition":1,"offset":681}
- sendToKafkaAsync:?生產(chǎn)數(shù)據(jù)到指定的topic,異步接口,默認回調(diào)
- sendToKafkaAsync:?生產(chǎn)數(shù)據(jù)到指定的topic,異步接口,自定義回調(diào)
- recvFromKafka:?按groupId消費指定topic的數(shù)據(jù)[{"topic":"RULEa93304e6d844000","key":"222","value":"aaaa","partition":1,"offset":681}]
- recvFromKafkaByOffset:?消費指定topic指定partition對應(yīng)的offset數(shù)據(jù)
- recvFromKafkaByTimestamp:?消費指定topic指定partition對應(yīng)的timestamp以后的數(shù)據(jù)
- resetOffsetToTimestamp:?重置指定topic的offset到對應(yīng)的timestamp
- resetOffsetToEarliest:?重置指定topic的offset到最早
- resetOffsetToLatest:?重置指定topic的offset到最晚,一般在跳過測試臟數(shù)據(jù)時候使用
- consumerPositions:?獲取當前消費偏移量情況{"partitionNum":2,"dataNum":1,"lagNum":0,"positions":[{"partition":0,"begin":0,"end":0,"current":0,"current1":0,"size":0,"lag":0},{"partition":1,"begin":681,"end":682,"current":682,"current1":682,"size":1,"lag":0}]}
- topicSize:?獲取指定topic數(shù)據(jù)量詳情情況 [{"partition": 0,"begin": 65,"end": 65,"size": 0}]
- topicSizeAll:?獲取所有topic數(shù)據(jù)量詳情情況
- topicSizeStatistics:?獲取指定topic數(shù)據(jù)量統(tǒng)計{"partitionNum":5452,"dataNum":41570647}
- topicSizeStatisticsAll:?獲取所有topic數(shù)據(jù)量統(tǒng)計{"topicNum":2550,"partitionNum":5452,"dataNum":41570647}
接口列表:
- kafkaListTopics:?List kafkaListTopics()
- createTopic:?void createTopic(String topic)
- delTopic:?void delTopic(String topic)
- partitionsTopic:?List partitionsTopic(String topic)
- delGroupId:?void delGroupId(String groupId)
- descCluster:?List descCluster()
- kafkaConsumerGroups:?List kafkaConsumerGroups()
- kafkaConsumerGroups:?List kafkaConsumerGroups(String topic)
- sendToKafka:?LinkedHashMap<String, Object> sendToKafka(String topic, String key, String value)
- sendToKafkaAsync:?void sendToKafkaAsync(String topic, String key, String value)
- sendToKafkaAsync:?void sendToKafkaAsync(String topic, String key, String value,Callback callback)
- recvFromKafka:?ArrayList<LinkedHashMap<String, Object>> recvFromKafka(String topic, String groupId)
- recvFromKafkaByOffset:?ArrayList<LinkedHashMap<String, Object>> recvFromKafkaByOffset(String topic, String groupId,int partition,long offset)
- recvFromKafkaByTimestamp:?ArrayList<LinkedHashMap<String, Object>> recvFromKafkaByTimestamp(String topic, String groupId,int partition,long timestamp)
- resetOffsetToTimestamp:?boolean resetOffsetToTimestamp(String topic, String groupId, long timestamp)
- resetOffsetToEarliest:?boolean resetOffsetToEarliest(String topic, String groupId)
- resetOffsetToLatest:?boolean resetOffsetToLatest(String topic, String groupId)
- consumerPositions:?List<LinkedHashMap<String, Object>> consumerPositions(String topic, String groupId)
- topicSize:?List<LinkedHashMap<String, Object>> topicSize(String topic)
- topicSizeAll:?LinkedHashMap<String, Object> topicSizeAll()
- topicSizeStatistics:?LinkedHashMap<String, Object> topicSizeStatistics(String topic)
- topicSizeStatisticsAll:?LinkedHashMap<String, Object> topicSizeStatisticsALL()
示范應(yīng)用:
為了說明該工具的效用,基于該工具實現(xiàn)了一個HTTP接口的消息隊列服務(wù),該服務(wù)只用了幾十行代碼,就實現(xiàn)了基于標簽內(nèi)容的發(fā)布訂閱服務(wù),服務(wù)見APIKafka.java,客戶端示例見ClientKafka.java。
該服務(wù)支持生產(chǎn)者任意標注標簽,支持消費者按表達式條件訂閱數(shù)據(jù),表達式支持與或非,支持集合查找,以及字符串子串匹配。
同時也支持消息回溯消費已經(jīng)消息統(tǒng)計查詢。
實現(xiàn)了流式消息檢索的基本需求。?
APIKafka,支持生產(chǎn)者任意標注標簽,標簽是開放的,可以是任意JSON,Key無需預(yù)先定義和Value也不必是枚舉值,支持消費者按表達式條件訂閱數(shù)據(jù),支持開源表達式語言O(shè)GNL,包括支持與或非,支持對象取值,支持數(shù)組和集合的訪問,也支持Java表達式,常用的有contains,startsWith,endsWith,length等,也支持matches正則匹配??梢詽M足流式消息檢索的各種匹配要求。文章來源:http://www.zghlxwxcb.cn/news/detail-754696.html
聯(lián)系人:
有問題可以聯(lián)系:zhangchuang@iie.ac.cn文章來源地址http://www.zghlxwxcb.cn/news/detail-754696.html
到了這里,關(guān)于一個基于Kafka客戶端封裝的工具,Kafka開發(fā)效率神器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!