1.?官網(wǎng)下載ActiveMQ包,官網(wǎng)地址:Index of /dist/activemq
這里講述的是apache-activemq-5.16.4這個版本,版本請勿隨意使用,其與JDK有相互依賴關(guān)系,如下圖:
2.?解壓使用
2.1 將下載好的包解壓到本地磁盤:
?2.2?啟動并訪問。進(jìn)入bin文件夾,雙擊打開“activemq.bat”。
如遇到打開“activemq.bat”黑框一閃而過時,請用命令方式啟動。啟動命令:
activemq-admin.bat start
?瀏覽器訪問:http://127.0.0.1:8161/,用戶名和密碼都是admin
3.?依賴引入
pom.xml增加相關(guān)依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.16.4</version>
</dependency>
?4.?相關(guān)代碼
Constant.java
/**
* 常量類
*/
public class Constant {
public static String USER = "admin";
public static String PASSWORD = "admin";
public static String URL = "failover://tcp://127.0.0.1:61616";
public static String SUBJECT = "XXX-MSG";
}
MsgTypeEnum.java
public enum MsgTypeEnum {
TEXT("text", "文本信息"),
// MAP("map", "Map信息"),
// STREAM("stream", "流信息"),
OBJECT("object", "對象信息"),
BYTES("byte", "字節(jié)信息");
public String key;
public String value;
/**
* 創(chuàng)建一個新的實例MsgTypeEnum.
*
* @param key
* @param value
*/
MsgTypeEnum(String key, String value) {
this.key = key;
this.value = value;
}
}
MqMessage.java
import lombok.Data;
import java.io.Serializable;
@Data
public class MqMessage implements Serializable {
/**
* 序列化ID
*/
private static final long serialVersionUID = 7543452786622377175L;
private String name;
private Integer id;
}
MqSender.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.Serializable;
/**
* 發(fā)送mq消息
*/
public class MqSender {
private Destination destination = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
/**
* 獲取conn
*
* @return conn conn
*/
public Connection getConn() {
return conn;
}
/**
* 初始化
*
* @throws Exception
*/
public void initialize() throws Exception {
// 連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Constant.USER, Constant.PASSWORD, Constant.URL);
conn = connectionFactory.createConnection();
// 事務(wù)性會話,自動確認(rèn)消息
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
destination = session.createQueue(Constant.SUBJECT);
// destination = session.createTopic(SUBJECT);
// 消息的提供者(生產(chǎn)者)
producer = session.createProducer(destination);
// 不持久化消息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void sendMessage(MsgTypeEnum msgTypeEnum, Object msgObject) throws Exception {
initialize();
// 連接到JMS提供者(服務(wù)器)
conn.start();
switch (msgTypeEnum) {
// 發(fā)送字節(jié)消息
case BYTES:
BytesMessage msg2 = session.createBytesMessage();
msg2.writeBytes(msgObject.toString().getBytes());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(msg2);
break;
// 發(fā)送Map消息
// case MAP:
// MapMessage msg = session.createMapMessage();
// msg.setBoolean("boolean", true);
// msg.setShort("short", (short) 0);
// msg.setLong("long", 123456);
// msg.setString("MapMessage", "ActiveMQ Map Message!");
// producer.send(msg);
// break;
// 發(fā)送對象消息
case OBJECT:
ObjectMessage msg12 = session.createObjectMessage();
msg12.setObject((Serializable) msgObject);
producer.send(msg12);
break;
// 發(fā)送流消息
// case STREAM:
// StreamMessage msg1 = session.createStreamMessage();
// msg1.writeBoolean(false);
// msg1.writeLong(1234567890);
// producer.send((StreamMessage) msg1);
// break;
// 發(fā)送文本消息
case TEXT:
TextMessage msg11 = session.createTextMessage();
msg11.setText((String) msgObject);
producer.send(msg11);
break;
default:
break;
}
close();
}
/**
* 關(guān)閉連接
*
* @throws JMSException
*/
public void close() throws JMSException {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
}
}
?MqAccept.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Enumeration;
/**
* 接收mq消息
*/
public class MqAccept implements MessageListener {
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private boolean stop = false;
/**
* 初始化
*
* @throws Exception
*/
private void initialize() throws Exception {
// 連接工廠是用戶創(chuàng)建連接的對象.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Constant.USER, Constant.PASSWORD, Constant.URL);
// 連接工廠創(chuàng)建一個jms connection
conn = connectionFactory.createConnection();
// 是生產(chǎn)和消費的一個單線程上下文。會話用于創(chuàng)建消息的生產(chǎn)者,消費者和消息。會話提供了一個事務(wù)性的上下文。
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事務(wù)
// 目的地是客戶用來指定他生產(chǎn)消息的目標(biāo)還有他消費消息的來源的對象.
dest = session.createQueue(Constant.SUBJECT);
// dest = session.createTopic(SUBJECT);
// 會話創(chuàng)建消息的生產(chǎn)者將消息發(fā)送到目的地
consumer = session.createConsumer(dest);
}
/**
* 消費消息
*
* @throws JMSException
* @throws Exception
*/
public void startReceiveMessage() throws JMSException, Exception {
initialize();
conn.start();
consumer.setMessageListener(this);
// 等待接收消息
while (!stop) {
Thread.sleep(5000);
}
}
@SuppressWarnings("rawtypes")
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getLong("long"));
System.out.println(message.getBoolean("boolean"));
System.out.println(message.getShort("short"));
System.out.println(message.getString("MapMessage"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
MqMessage mqMessage = (MqMessage) message.getObject();
System.out.println("name: " + mqMessage.getName());
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
stop = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 關(guān)閉連接
*
* @throws JMSException
*/
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
}
}
MQSendMsg.java
public class MQSendMsg {
public static void sendMsg(MsgTypeEnum type, Object msgObj) throws Exception {
MqSender sender = new MqSender();
// 連接到JMS提供者(服務(wù)器)
sender.initialize();
sender.getConn().start();
sender.sendMessage(type, msgObj);
sender.close();
}
}
5.?測試代碼
TestReceive.java
public class TestReceive {
public static void main(String[] args) throws Exception {
new MqAccept().startReceiveMessage();
}
}
TestSend.java
public class TestSend {
public static void main(String[] args) throws Exception {
MQSendMsg.sendMsg(MsgTypeEnum.TEXT, "這是測試哈123...");
}
}
測試發(fā)送接收功能
文章來源:http://www.zghlxwxcb.cn/news/detail-437717.html
基礎(chǔ)使用到此結(jié)束!文章來源地址http://www.zghlxwxcb.cn/news/detail-437717.html
到了這里,關(guān)于Windows下ActiveMQ的安裝和簡單使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!