2010-11-6

jms topoc和queue的区别

ms规范里的两种message传输方式Topic和Queue,两者的对比如下表():

 

 

Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

 


--
we drink green tea

jms 详解

1、P2P模型 
在P2P模型中,有下列概念:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。 
 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中) 
 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。 
 接收者在成功接收消息之后需向队列应答成功 
如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模型。 
举例: 
//注册消息监听器,当有消息发送过来的时候会调用onMessage方法(实现MessageListener 接口) 
Java代码 
  1. import javax.ejb.ActivationConfigProperty;  
  2. import javax.ejb.MessageDriven;  
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.   
  8. @MessageDriven(activationConfig={  
  9.             @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),  
  10.             @ActivationConfigProperty(propertyName="destination", propertyValue="queue/myqueue")  
  11.     }  
  12. )  
  13. public class QueueMessageBean implements MessageListener {  
  14.   
  15.     public void onMessage(Message msg) {  
  16.         //共有下面几种消息类型  
  17.         //1 Text  
  18.         //2 Map  
  19.         //3 Object  
  20.         //4 stream  
  21.         //5 byte  
  22.         TextMessage txtMsg = (TextMessage)msg;  
  23.         String s = "";  
  24.         try {  
  25.             s = txtMsg.getText();  
  26.         } catch (JMSException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.         System.out.println("QueueMessageBean接收到了消息:" + s);  
  30.     }  
  31. }  
  32. //客户端调用  
  33. import javax.jms.Message;  
  34. import javax.jms.MessageProducer;  
  35. import javax.jms.Queue;  
  36. import javax.jms.QueueConnection;  
  37. import javax.jms.QueueConnectionFactory;  
  38. import javax.jms.QueueSession;  
  39. import javax.naming.InitialContext;  
  40.   
  41.   
  42. public class Test {  
  43.     public static void main(String[] args) throws Exception {  
  44.     InitialContext ctx = new InitialContext();  
  45.     //获得QueueConnectionFactory对象  
  46.     QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");  
  47.     //创建QueueConnection对像   
  48.     QueueConnection connection = factory.createQueueConnection();  
  49.     //创建会话  
  50.     //arg1:与事物有关,true表示最后提交,false表示自动提交  
  51.     //arg2:表示消息向中间件发送确认通知,这里采用的是自动通知的类型  
  52.     QueueSession session = (QueueSession) connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);  
  53.     //取得destination  
  54.     Queue queue = (Queue) ctx.lookup("queue/myqueue");  
  55.     //消息生产者  
  56.     MessageProducer sender = session.createProducer(queue);  
  57.     //定义消息  
  58.     Message msg = session.createTextMessage("消息来了");  
  59.     //发送消息  
  60.     sender.send(queue, msg);  
  61.     session.close();  
  62.     connection.close();  
  63.           
  64.     }  
  65. }  


2、Pub/Sub模式 
在Pub/Sub模型中,有下列概念: 主题(Topic)、发布者(Publisher)、订阅者(Subscriber)。客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。 
 每个消息可以有多个消费者 
 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅之后,才能消费发布者的消息,而且,为了消费消息,订阅者必须保持运行的状态。 
当然,为了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。 
如果你希望发送的消息可以不被做任何处理、或者被一个消费者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。 

//注册消息监听器,当有消息发送过来的时候会调用onMessage方法(实现MessageListener 接口) 
Java代码 
  1. import javax.ejb.ActivationConfigProperty;  
  2. import javax.ejb.MessageDriven;  
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.   
  8. @MessageDriven(activationConfig={  
  9.             @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),  
  10.             @ActivationConfigProperty(propertyName="destination", propertyValue="topic/myTopic")  
  11.     }  
  12. )  
  13. public class TopicMessageBean implements MessageListener {  
  14.   
  15.     public void onMessage(Message msg) {  
  16.         //共有下面几种消息类型  
  17.         //1 Text  
  18.         //2 Map  
  19.         //3 Object  
  20.         //4 stream  
  21.         //5 byte  
  22.         TextMessage txtMsg = (TextMessage)msg;  
  23.         String s = "";  
  24.         try {  
  25.             s = txtMsg.getText();  
  26.         } catch (JMSException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.         System.out.println("TopicMessageBean接收到了消息:" + s);  
  30.     }  
  31. }  
  32.   
  33. //客户端测试  
  34. import javax.jms.MessageProducer;  
  35. import javax.jms.Topic;  
  36. import javax.jms.TopicConnection;  
  37. import javax.jms.TopicConnectionFactory;  
  38. import javax.jms.TopicSession;  
  39. import javax.naming.InitialContext;  
  40.   
  41.   
  42. public class Test {  
  43.     public static void main(String[] args) throws Exception {  
  44.     InitialContext ctx = new InitialContext();  
  45.     //获得QueueConnectionFactory对象  
  46.     TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");  
  47.     //创建QueueConnection对像   
  48.     TopicConnection connection = factory.createTopicConnection();  
  49.     //创建会话  
  50.     //arg1:与事物有关,true表示最后提交,false表示自动提交  
  51.     //arg2:表示消息向中间件发送确认通知,这里采用的是自动通知的类型  
  52.     TopicSession session = (TopicSession) connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);  
  53.     //取得destination  
  54.     Topic queue = (Topic) ctx.lookup("topic/myTopic");  
  55.     //消息生产者  
  56.     MessageProducer publisher = session.createProducer(queue);  
  57.     //定义消息  
  58.     Message msg = session.createTextMessage("消息来了");  
  59.     //发送消息  
  60.     publisher.send(queue, msg);  
  61.     session.close();  
  62.     connection.close();  
  63.           
  64.     }  
  65. }  


二种模型的实现结果:对于p2p模型的每个消息只能有一个消费者  如果我们定义二个消息接受者的Bean那么只能有一端会接收到消息。当你把部署在Jboss中的消息接收Bean去掉以后,然后发送消息 此时消息在队列中,一旦你重新部署他会立刻就接收到刚刚发送的消息所以它没有时间的依赖性, pub/sub模型可以有多个消费者 在这个模型中如果我们定义多个接收消息的Bean当我们在客户端发送消息的时候二个bean都会接收到消息,所以他有多个消费者 但是如果你把Jboss部署中的消息接收bean去掉之后,发送消息。然后在重新部署,那么消息也无法接收到,所以说他有时间的依赖性。 

//代码中几个概念的理解 
Connection Factory 
创建Connection对象的工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。 

Destination 
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。 

所以,Destination实际上就是两种类型的对象:Queue、Topic。 

可以通过JNDI来查找Destination。 

Connection: 
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。 
Session: 
Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。 
消息生产者: 
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息! 
消息消费者: 
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以通过session的createDurableSubscriber方法来创建持久化的订阅者。 
MessageListener: 
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。 

MDB介绍: 
对客户端来说,message-driven bean就是异步消息的消费者。当消息到达之后,由容器负责调用MDB。客户端发送消息到destination,MDB作为一个MessageListener接收消息


--
we drink green tea

ActiveMQ jms 详解

[1]
在介绍ActiveMQ之前,首先简要介绍一下JMS规范。
JMS的简介:
(1)
JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。JMS 使您能够通过消息收发服务从一个 JMS 客户机向另一个 JML 客户机交流消息。

JMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ.

(2)
JMS典型的应用场景:
操作可异步执行.
发email了,   发msn消息了. 
或者一些比较耗时的操作,   比如要在某目录下生成一个大报表.   操作者把指令发出去就完事.

[2]
JMS的基本构件:
(1)
Broker
什么是Broker呢?可以把JMS Brokers 看成是服务器端。这个服务器可以独立运行.也可以随着其他容器
以内嵌方式云心,如下配置:
使用显示的Java代码创建
BrokerService broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616");
broker.start();
使用BrokerFacotry创建
BrokerService broker = BrokerFactory.getInstance().createBroker(someURI);
使用Spring Bean创建
<bean id=”broker” class=”org.apache.activemq.xbean.BrokerFactoryBean”>
    <property name=”config” value=”classpath:org/apache/activemq/xbean/activemq.xml” />
    <property name=”start” value=”true” />
</bean>
还可以使用XBean或Spring 2.0等多种配置方式配置,
通过ActiveMQConnectionFactory还可以隐含的创建内嵌的broker,这个broker就不是一个独立的服务了。
<bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>
    <property name=”connectionFactory” ref=”jmsFactory”/>
    <property name=”defaultDestination” ref=”destination” />
    <property name=”destinationResolver” ref=”默认是DynamicDestionResolver” />
    <property name=”pubSubDomain”><value>true or false默认是false,
       false是QueneDestination, true是TopicDestination</value>
</bean>
上面的defaultDestination是指默认发送和接收的目的地,我们也可以不指定,而是通过目的地名称让jmsTemplate自动帮我们创建.

(2)
1 连接工厂
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
2 连接
JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
3 会话
JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
(3)
目的地:
目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种
消息传递域:Point-to-Point消息(P2P),点对点;发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)
两者的区别:
  P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。
        P2P消息,每个消息只能有一个消费者。
  Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。
    Pub/Sub,每个消息可以有多个消费者。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
(3)
3.1
消息生产者
消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
3.2
消息消费者
消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种
方法之一:
? 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。(异步操作)
? 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
3.3
消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:
简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 
(MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
(4)
JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。

[3]
ActiveMQ简介:
ActiveMQ 是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。
安装
http://activemq.apache.org/download.html 下载5.0.0发行包,解压即可,
启动
window环境运行解压目录下的/bin/activemq.bat 
测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动
window环境运行 netstat -an|find "61616"
监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:
http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/ 
点击demo应用中的“ Market data publisher ”,就会发一些测试的消息。转到admin页面的topics menu下面(queue和topic的区别见 
http://andyao.javaeye.com/blog/153173 ),可以看到消息在增长。
ActiveMQ5.0的配置文件在解压目录下的/conf目录下面。主要配置文件为activemq.xml.

[4]
实例一:(没有结合spring框架)
public class QueueProducer {
/*
* 创建的简图
ConnectionFactory---->Connection--->Session--->Message
Destination + Session------------------------------------>Producer
Destination + Session------------------------------------>MessageConsumer
*/ 
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Queue queue;
//设置回复的目的地
Queue replyQueue;
// MessageProducer:消息发送者
MessageProducer producer;
MessageConsumer replyer;
connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD,
    "tcp://192.168.1.191:61616");
try {
   // 构造从工厂得到连接对象
   connection = connectionFactory.createConnection();
   // 启动
   connection.start();
   // 获取操作连接
   session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
   // 创建队列: 可以在
http://localhost:8161/admin/queue.jsp中看到
   queue=new ActiveMQQueue("jason.queue2");
   replyQueue=new ActiveMQQueue("jason.replyQueue");
   // 得到消息生成者【发送者】:需要由Session和Destination来创建
   producer = session.createProducer(queue);
   // 创建消息
   TextMessage message = session.createTextMessage("jason学习ActiveMq 发送的消息");
   //在消息中设置回复的目的地,
   //对方用MessageProducer sender=session.createProducer(message.getJMSReplyTo());创建回复者
   message.setJMSReplyTo(replyQueue);
   // 发送一个non-Persistent的消息
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
   producer.send(message);
            // 发送一个Persistent的消息
   producer.setDeliveryMode(DeliveryMode.PERSISTENT);
   producer.send(session.createTextMessage("这是一个Persistent的消息!重启JMS,仍可获取"));
   System.out.println("发送消息:jason学习ActiveMq 发送的消息");
   System.out.println("这是一个Persistent的消息!重启JMS,仍可获取");
   
   //用回复的目的地定义回复接收者,且设置侦听
   replyer=session.createConsumer(replyQueue);
   replyer.setMessageListener
   (
    new MessageListener()
    {
     public void onMessage(Message message)
     {
      try {
       TextMessage txtmess = (TextMessage) message;
          System.out.println("consumer的回复内容是: "+txtmess.getText());
      } catch (Exception e) {
       e.printStackTrace();
      }
     }
    } 
   );
   session.commit();
} catch (Exception e) {
   e.printStackTrace();
} finally {
   try {
    if (null != connection)
     connection.close();
   } catch (Throwable ignore) {
   }
}
}
}
接收者:
//public class Receiver { 
public class QueueConsumer implements MessageListener{ 
public static void main(String[] args)    

    QueueConsumer re=new QueueConsumer(); 
    //循环只是为了让程序每2秒进行一次连接侦听是否有消息可以获取. 
    while(true)     
    { 
      re.consumeMessage(); 
      try { 
        Thread.sleep(2000); 
      } catch (InterruptedException e) { 
        e.printStackTrace(); 
      } 
    } 
    //对于主动接收的,只须直接执行:re.consumeMessage();即可. 
    //其中的while(true),会一次性将所有的消息获取过来. 

    
public void consumeMessage() 

    ConnectionFactory connectionFactory; 
    Connection connection = null; 
    Session session; 
    Queue queue; 
    MessageConsumer consumer;

    connectionFactory = new ActiveMQConnectionFactory( 
        ActiveMQConnection.DEFAULT_USER, 
        ActiveMQConnection.DEFAULT_PASSWORD, 
        "tcp://192.168.1.191:61616"); 
    try { 
      connection = connectionFactory.createConnection(); 
      connection.start(); 
      session = connection.createSession(Boolean.FALSE, 
          Session.AUTO_ACKNOWLEDGE); 
      queue=new ActiveMQQueue("jason.queue2"); 
      consumer = session.createConsumer(queue); 
        
      // 接受消息方式一:主动的去接受消息,用consumer.receive
      //只能获取一条消息 -->不采用 
//      TextMessage message = (TextMessage) consumer.receive(1000); 
//      if (null != message) { 
//        System.out.println("收到消息" + message.getText()); 
//      }    
      //可以不断循环,获取所有的消息.--->关键. 
//      while (true) { 
//        TextMessage message = (TextMessage) consumer.receive(1000); 
//        if (null != message) { 
//          System.out.println("收到消息" + message.getText()); 
//        } else { 
//          break;    //没有消息时,退出 
//        } 
//      } 
        
      /*接受消息方式二:基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。 
      */ 
      consumer.setMessageListener(this); 
       

    } catch (Exception e) { 
      e.printStackTrace(); 
    } finally { 
      try { 
        if (null != connection) 
          connection.close(); 
      } catch (Throwable ignore) { 
      } 
    }    

    
public void onMessage(Message message) 

    try { 
      if (message instanceof TextMessage) { 
        TextMessage txtmess = (TextMessage) message; 
        System.out.println("收到的消息是:" + txtmess.getText()); 
      //回复发送者
        MessageProducer sender=session.createProducer(message.getJMSReplyTo());
    sender.send(session.createTextMessage("已收到你的消息"));
      } 
      else 
        System.out.println("收到的消息是:" + message); 
    } catch (Exception e) { 
    }     

}
说明:
(2)
VM Transport
VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
TCP Transport
TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
tcp://localhost:61616
(3)
3.1
启动activeMQ后,用户创建的queues会被保存在activeMQ解压目录下的\data\kr-store\data中.
3.2
创建queue,可以在代码中创建,也可以直接进入
http://localhost:8161/admin--->点击queue--->在上面的field中填下你要创建的queue名-->点击创建即可.
3.3
若用户创建的queue,不是持久化的,则在重启activeMQ后,数据文件中的内容会被清空,但文件仍存在.
(4)
messageProducer发送消息后,会在保存在目的地,即上面的queue中,也即就是在\data\kr-store\data目录下的文件中.
messageReceiver(连接到相同目的地的接收者),不需要立即接收.只要activeMQ的服务端不关闭,当运行接收者,连接到activeMQ的服务端时,就可以获取activeMQ服务端上已发送的消息.
发送/接收的消息情况及数量及消息的内容与处理(删除),可以在
http://localhost:8161/admin/queue.jsp中查看,操作.
(5)
可以将activeMQ的服务端放于一PC中,发送者位于另一PC,接收者也位于另一PC中.
只要:tcp://activeMQ的服务端IP:activeMQ的服务端口,进行连接即可.
(6)
queue消息,只被消费一次.

topic的实例(无结合spring)
public class TopicTest {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = factory.createConnection();
        connection.start();
        //创建一个Topic
        Topic topic= new ActiveMQTopic("testTopic");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //注册消费者1
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //注册消费者2
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //创建一个生产者,然后发送多个消息。
        MessageProducer producer = session.createProducer(topic);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }
    }
}
输出如下:
Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2


--
we drink green tea