朋也的博客 » 首页 » 文章
作者:朋也
日期:2018-11-08
类别:ActiveMQ学习笔记
版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
第一节提到了消息提供者在创建session的时候第一个参数是事务的意思,如果将其改成 true ,那么当发送完消息后就要 commit() 一下,消息才能发出
public class Producer {
public static void main(String[] args) throws JMSException {
// 省略了的代码...
// 如果session不支持事件,就是FALSE,支持事务就是true
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 省略了的代码...
// 上面开启了事务,这里就要提交一下
session.commit();
// 当然如果上面发送消息出问题了,也可以通过 rollback() 回滚掉
// session.rollback();
}
}
消息消费者在消费消息的时候 session 的事务就没有意义了,true, false 都一样的消费,不过为了保持一致,还是建议都写成一样的,既:提供者开启事务,消费者也开启事务
创建session的时候,第二个参数是签收模式,有以下几种
TextMessage.acknowledge()
方法才会签收(消费)MessageProducer的send方法最多有5个参数
void send(
Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive)
throws JMSException;
Destination destination = session.createQueue("amq-demo");
activemq.xml
里进行配置才能生效,找到 policyEntries
标签,在里面加上 <policyEntry queue="amq-demo" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>
即可MessageConsumer是session创建的,一般给一个Destination即可,不过它还有两个参数
// 创建队列消费者
MessageConsumer createConsumer(
Destination destination,
java.lang.String messageSelector,
boolean NoLocal)
throws JMSException;
// 创建主题消费者
TopicSubscriber createDurableSubscriber(
Topic topic,
String name,
String messageSelector,
boolean noLocal)
throws JMSException;
MapMessages
类似Java的实体类这里说一下MessageSelector
举见个例子
// 选择名字为 z3 的消息
String selector1 = "name = 'z3'";
// 选择年龄大于20的消息
String selector2 = "age > 20";
// 选择名字为 z3 和 年龄大于20的消息
String selector3 = "name = 'z3' AND age > 20";
MessageConsumer consumer = session.createConsumer(destination, selector1);
消息的接收方式,通过 receive()
方法来接收,该方法是阻塞的,一般会用一个监听来实现
HelloWorld里接收方式是通过 while(true){}
来实现的,这种代码不推荐
while(true) {
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("消费数据:" + msg.getText());
}
下面是通过监听的方式来接收消息
public class Consumer {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("amq-demo");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MsgListener());
// while(true) {
// TextMessage msg = (TextMessage) consumer.receive();
// System.out.println("消费数据:" + msg.getText());
// }
}
static class MsgListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("消费数据:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
看起来是不是优雅多了 : )
创建临时消息
消息提供者还可以创建临时消息,通过 session.createTemporaryQueue()
方法创建,这种方式创建的消息,当connection关闭之后,消息也就没有了
除了队列,ActiveMQ还可以创建主题,消费者可以通过订阅主题来消费消息,用法跟队列基本一致,下面上代码
提供者
public class Producer {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection connection = connectionFactory.createConnection();
connection.start();
// 如果session不支持事件,就是FALSE,支持事务就是true
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("amq-topic-demo");
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < 100; i++) {
TextMessage msg = session.createTextMessage("生产消息" + i);
producer.send(destination, msg);
}
// session.commit();
// session.rollback();
connection.close();
}
}
消费者1 与 消费者2 是一样的,把日志内容改一下就可以了
public class Consumer1 {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("amq-topic-demo");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MsgListener());
}
static class MsgListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("消费者1 消费数据:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
这个启动就要先启动所有的消费者,然后再启动提供者,这样提供者发布的主题消息,消费者才能进行消费,运行一下,可以看到消费者1跟消费者2都消费了提供者提供的消息