目標(biāo)
通過指定主題和消費(fèi)者組調(diào)用方法,實(shí)時查看主題下分區(qū)消息的消費(fèi)情況(消息總數(shù)量、消費(fèi)消息數(shù)量、未消費(fèi)的消息數(shù)量)。文章來源地址http://www.zghlxwxcb.cn/news/detail-536480.html
工具類
package com.utils.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumeLagMonitorUtils {
//主題
public static final String TOPIC_NAME = "topicName";
//消費(fèi)者組
public static final String GROUP_ID_CONFIG = "groupId";
//如果是集群,則用逗號分隔。
public static final String KAFKA_BROKER_LIST = "127.0.0.1:6667";
public static Properties getConsumeProperties(String groupId, String bootstrapServer) {
Properties props = new Properties();
props.put("group.id", groupId);
props.put("bootstrap.servers", bootstrapServer);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public List<Map<String, Object>> topicAndPartitionDetails(
String bootstrapServer,
String groupId,
String topic
) {
List<Map<String, Object>> result = new ArrayList<>();
Map<Integer, Long> endOffsetMap = new HashMap<>();
Map<Integer, Long> commitOffsetMap = new HashMap<>();
Properties consumeProps = getConsumeProperties(groupId, bootstrapServer);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps);
try {
List<TopicPartition> topicPartitions = new ArrayList<>();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionsFor) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
for (TopicPartition partitionInfo : endOffsets.keySet()) {
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
}
for (Integer partitionId : endOffsetMap.keySet()) {
System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));
}
//查詢消費(fèi)偏移量
for (TopicPartition topicAndPartition : topicPartitions) {
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
}
//累加lag
long lagSum = 0l;
if (endOffsetMap.size() == commitOffsetMap.size()) {
for (Integer partition : endOffsetMap.keySet()) {
long endOffset = endOffsetMap.get(partition);
long commitOffset = commitOffsetMap.get(partition);
long diffOffset = endOffset - commitOffset;
lagSum += diffOffset;
HashMap<String, Object> partitionMap = new HashMap<>();
//主題
partitionMap.put("topic",topic);
//消費(fèi)者組
partitionMap.put("groupId",groupId);
//分區(qū)
partitionMap.put("partition",partition);
//最后的偏移量
partitionMap.put("endOffset",endOffset);
//提交的偏移量
partitionMap.put("commitOffset",commitOffset);
//積壓的消息
partitionMap.put("diffOffset",diffOffset);
result.add(partitionMap);
}
} else {
System.out.println(topic+"主題的分區(qū)丟失。");
}
} finally {
if (consumer != null) {
consumer.close();
}
}
return result;
}
public static void main(String[] args) {
List<Map<String, Object>> list = new KafkaConsumeLagMonitorUtils().topicAndPartitionDetails(
KAFKA_BROKER_LIST,
GROUP_ID_CONFIG,
TOPIC_NAME
);
for (Map<String, Object> map : list) {
map.forEach((k, v) -> {
System.out.println(k + "=" + v);
});
System.out.println("========================");
}
}
}
批量監(jiān)控
package com.utils.kafka;
import java.util.*;
public class KafkaConsumeLagMonitor {
//kafkaIP和端口
public static final String KAFKA_BROKER_LIST ="127.0.0.1:6667";
static List<Map<String, Object>> topicList = new ArrayList<>();
//這里我監(jiān)控了兩個主題
static {
//大氣
Map<String, Object> airMap = new HashMap<>();
airMap.put("topic", "air");
airMap.put("groupId", "air_minute_group");
topicList.add(airMap);
//水
Map<String, Object> waterMap = new HashMap<>();
waterMap.put("topic", "water");
waterMap.put("groupId", "water_minute_group");
topicList.add(waterMap);
}
/**
* 只要有一個分區(qū)的消息積壓數(shù)量>lagLimit,則中斷方法,直接預(yù)警。
* @param lagLimit 消息積壓預(yù)警數(shù)量
* @return
*/
public static String isLazy(long lagLimit) {
for (Map<String, Object> map : topicList) {
List<Map<String, Object>> list = new KafkaConsumeLagMonitorUtils().topicAndPartitionDetails(
KAFKA_BROKER_LIST,
map.get("groupId").toString(),
map.get("topic").toString()
);
for (Map<String, Object> partitionItem : list) {
Set<String> keySet = partitionItem.keySet();
for (String k : keySet) {
Object v=partitionItem.get(k);
System.out.println(k + "=" + v);
if ("diffOffset".equals(k) && Long.parseLong(v.toString()) > lagLimit) {
String warnMsg = map.get("topic").toString() + "主題消息積壓,分區(qū)" + partitionItem.get("partition") + "積壓消息" + v + "條。";
return warnMsg;
}
}
System.out.println("========================");
}
}
return null;
}
public static void main(String[] args) {
String lazy = isLazy(1000);
System.out.println(lazy);
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-536480.html
到了這里,關(guān)于JAVA實(shí)時獲取kafka各個主題下分區(qū)消息的消費(fèi)情況的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!