queue增加刪除元素
- 增加元素
- add方法在添加元素的時候,若超出了度列的長度會直接拋出異常:
- put方法,若向隊(duì)尾添加元素的時候發(fā)現(xiàn)隊(duì)列已經(jīng)滿了會發(fā)生阻塞一直等待空間,以加入元素
- offer方法在添加元素時,如果發(fā)現(xiàn)隊(duì)列已滿無法添加的話,會直接返回false
- 刪除元素
- poll: 若隊(duì)列為空,返回null。
- remove:若隊(duì)列為空,拋出NoSuchElementException異常。
- take:若隊(duì)列為空,發(fā)生阻塞,等待有元素
BlockingQueue:
- 解決線程通信的問題
- 阻塞方法:put、take
其他實(shí)現(xiàn)類:
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue/ SynchronousQueue/ DelayQueue
BlockingQueue實(shí)例
package com.nowcoder.mycommunity;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
try {
for(int i = 0; i < 100; ++ i){
queue.put(i);
Thread.sleep(20);
System.out.println(Thread.currentThread().getName() + " producer" + queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
public BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
queue.take();
Thread.sleep(new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + " consuer" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
kafka
- kafka是一個分布式的流媒體平臺
- 主要應(yīng)用:消息系統(tǒng)、日志收集、用戶行為追蹤、流式處理
- 特點(diǎn):高吞吐量、消息持久化(存放在磁盤上,btw,磁盤順序讀寫速度并不慢)、高可靠性、高擴(kuò)展性
Broker
kafka的服務(wù)器,每一臺服務(wù)器稱為一個Broker
Zookeeper
管理其他集群,包括kafka的集群??梢詥为?dú)下載
Topic/ Partition/ Offset
消息隊(duì)列可能是一對多的形式,生產(chǎn)者將一條消息放在多個隊(duì)列中,然后消費(fèi)者從各自的隊(duì)列中取消息。
下圖為一個Topic,Topic中可能會含有很多Partition,Offset為Partition的索引
Leader Replica/ Follower Replica
kafka的數(shù)據(jù)不止存儲一份,他會存為多份,即使某一個分區(qū)壞了還可以有備份。
leader Replica(祖副本):當(dāng)嘗試從分區(qū)獲取數(shù)據(jù)時,祖副本可以處理請求,返回?cái)?shù)據(jù)
Follower Replica(隨從副本):只能備份,不能響應(yīng)請求
如果祖副本掛掉,集群會從Follower Replica中選一個作為新的leader
kafka命令
官方文檔
配置
進(jìn)入到configure目錄下,修改consumer.properties
使用
進(jìn)入到kafka的目錄中文章來源:http://www.zghlxwxcb.cn/news/detail-524053.html
// 啟動zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
// 啟動kafka
> ./bin/kafka-server-start.sh config/server.properties
// --create:創(chuàng)建主題
// --bootstrap-server localhost:9092:在哪個服務(wù)器創(chuàng)建主題,kafka默認(rèn)端口為9092
// --replication-factor 1:副本為1
// --partitions 1:分區(qū)為1
// --topic test:主題的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.
// 查看該服務(wù)器上的主題
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
// 創(chuàng)建生產(chǎn)者向某個服務(wù)器的某個主題中發(fā)消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>world
// 創(chuàng)建一個消費(fèi)者,讀取某個服務(wù)器上某個主題下的消息隊(duì)列,從頭開始讀取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world
Spring整合Kafka
引入依賴
pom.xml文章來源地址http://www.zghlxwxcb.cn/news/detail-524053.html
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version>
</dependency>
配置Kafka
- 配置server
- 配置consumer
# Kafka Properties
# 服務(wù)器地址
spring.kafka.bootstrap-servers==localhost:9092
#消費(fèi)者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自動提交
spring.kafka.consumer.enable-auto-commit=true
# 自動提交的時間間隔,單位毫秒
spring.kafka.consumer.auto-commit-interval=3000
訪問Kafka
- producer
- consumer
Spring整合Kafka的例子
package com.nowcoder.mycommunity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test", "hello");
kafkaProducer.sendMessage("test", "world");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer{
@Autowired
public KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content){
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer{
// 加上listener注解,Spring會自動注入
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
到了這里,關(guān)于java阻塞隊(duì)列/kafka/spring整合kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!