`
zhangnianli
  • 浏览: 4053 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

activeMQ文档

概述

JMS Java程序提供了一种创建、发送、接收和读取企业消息系统中消息的通用方法。它们的底层实现技术各不相同,比如Sun MQ, IBM MQ,BEA MQ,Apache ActiveMQ等。它们是通过定义被管理的对象来实现。被管理的对象是由管理员通过使用JMS系统提供者的管理工具创建和定制,然后被JMS客户端使用。JMS客户端通过接口来调用这些被管理的对象,从而具备跨平台特性

ActiveMQ的优势:

1、实现JMS1.1规范,支持J2EE1.4以上

2、可运行于任何jvm和大部分web容器(ActiveMQ works great in any JVM

3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT等等)

4、支持多种协议(stompopenwireREST

5、良好的spring支持(ActiveMQ has great Spring Support

6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ.

7、与OpenJMSJbossMQ等开源jms provider相比,ActiveMQApache的支持,持续发展的优势明显。

相关类介绍:

l  ConnectionFactory——客户端使用这个被管理对象来创建一个Connection

l  Connection——一个到JMS 提高商的活动连接。

l  Destination——封装了消息目的地标识的被管理对象。

l  Session——一个用于发送和接收消息的单线程上下文。

l  MessageProducer——一个由Session 创建用于往目的地发送消息的对象。

l  MessageConsumer——一个由Session 创建用于接收发送到目的地的消息的对象。

这些对象之间的关系如下图所示:

ActiveMQ下两种模式比较:

两种模式:Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者)

点对点模式语义:

一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该messageconsumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。

发布/订阅者模式语义:

一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

 

下图ClientAClientB分别对应上面两种模式:

部署服务器

Windows:

1http://activemq.apache.org/activemq-510-release.html,下载5.5.0 Windows

2、下载后直接解压, 运行bin目录下的activemq.bat文件。

3、 访问http://localhost:8161/admin,进入后台管理,如下图:

4、点击上图标题栏中的QueuesTopics,可以分别进入点对点或广播模式的服务状态页面。

5、程序访问URLtcp://localhost:61616

Linux

1http://activemq.apache.org/activemq-510-release.html,下载5.5.0 linux/unix

2、解压缩安装文件到运行目录, 使用命令 tar -xzvf 文件名.tar.gz

3、启动、关闭方式:./bin/activemq start/stop

4、访问http://119.255.194.51:8161/admin,进入后台管理,类似于上图。

5、程序访问URLtcp://119.255.194.51:61616

设置程序访问用户名和密码:

l  设置证书文件,放用户名和密码:${activemq.base}/conf/credentials.properties

activemq.username=logcd  

activemq.password=028cd 

l  connectionFactory中,使用用户名和密码。

<!--加载属性配置文件,activemq.xml中如果不存在则添加--> 

  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 

     <property name="locations"> 

        <value>file:///${activemq.base}/conf/credentials.properties</value> 

     </property>        

  </bean>

<!--Broker中,配置插件, ,位置在transportConnectors标签之前就可以了--> 

    <plugins>   

        <simpleAuthenticationPlugin>   

            <users>   

                <authenticationUser username="${activemq.username}"

 password="${activemq.password}" groups="users,admins"/>   

            </users>   

        </simpleAuthenticationPlugin>   

    </plugins>

l  配置simpleAuthenticationPlugin,简单认证插件

ConnectionFactory cf = new ActiveMQConnectionFactory("logcd""028cd""tcp://127.0.0.1:61616"

设置控制台管理页面访问密码

需要密码验证:打开conf/jetty.xml文件,找到 

 <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">

  <property name="name" value="BASIC" />

  <property name="roles" value="admin" />

  <property name="authenticate" value="false" />  #将“false”改为“true”即可

 </bean>

 

 设定密码:打开conf/jetty-realm.properties文件,找到

 admin: admin,admin    

 用户名:密码,权限

开发流程

点对点(PTP)模式开发流程:

1、生产者(producer)开发流程(ProducerTool.java):

1.1 创建Connection:根据urluserpassword创建一个jms Connection

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory (user, password, url);

Connection connection = connectionFactory.createConnection ();

1.2 创建Session

connection基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。

Session session = connection.createSession (false, Session.AUTO_ACKNOWLEDGE);

1.3 创建Destination对象:

需指定其对应的主题(subject)名称,producerconsumer将根据subject来发送/接收对应的消息。

Destination destination = session.createTopic (subject);

1.4 创建MessageProducer

根据Destination创建MessageProducer对象,同时设置其持久模式。

MessageProducer producer = session.createProducer (destination);

producer.setDeliveryMode (DeliveryMode.NON_PERSISTENT);

1.5 发送消息到队列(Queue):

封装TextMessage消息,使用MessageProducersend方法将消息发送出去。

TextMessage msg = session.createTextMessage (message);

connection.start ();

producer.send (msg);

 

2、消费者(consumer)开发流程(ConsumerTool.java):

2.1 实现MessageListener接口:

消费者类须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。

  TextMessage txtMsg = (TextMessage) message;

String msg = txtMsg.getText ();

2.2 创建Connection

根据urluserpassword创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId

2.3 创建SessionDestination

ProducerTool.java中的流程类似,不再赘述。

2.4创建replyProducer【可选】:

可以用来将消息处理结果发送给producer

2.5 创建MessageConsumer

根据Destination创建MessageConsumer对象,并监听发布者发布的消息

MessageConsumer   consumer = session.createConsumer (destination);

consumer.setMessageListener (this);

connection.start ();

广播模式(pub/sub)开发流程

PTP消息模式类似。

参考下载的压缩包中example中的例子(TopicPublisher.java/TopicListener.java)。

持久化操作(以MySql为例)

持久化操作分为四种:JdbcPersistenceJournaledJDBC AmqPersistence(默認方式),kahaPersistence

持久化操作时注意,一定先订阅主题,让服务器知道此主题要持久化.这样在以后的操作中先发布先订阅就没有关系了.

JdbcPersistence

pure JDBC without a journal, 目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL,Oracle, Postgresql, SQLServer, Sybase

 

1、 修改activemq.xml文件中的默认存储方式:

如果下面代码已存在,放开注释即可:

<persistenceAdapter>

    <jdbcPersistenceAdapter dataSource="#MySQL-DS"/>

</persistenceAdapter>

2、 配置MySQL数据源,在</broker>节点后面,增加MySQL数据源配置:

如果下面代码已存在,放开注释即可

<!-- MySQL DataSource -->

<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

    <property name="username" value="misc_root"/>

    <property name="password" value="misc_root_pwd"/>

    <property name="poolPreparedStatements" value="true"/>

</bean>

3、 手动创建数据库:activemq,与上面URL是的相同.这时启动服务.会发现多了3张数据表: activemq_acks, | activemq_lock  | activemq_msgs,activemq_msgs中会看到持久化后的信息。

4、 配置文件修改好之后,将mysql JDBC驱动包mysql-connector-java-5.1.9.jar放到%ACTIVEMQ_HOME%\lib\下。

5、 程序代码如下注意:

a.创建生产者对象(MessagProducer)时,设置为持久模式,代码如下:

    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

b.创建消费者对象(MessageConsumer)时,设置为持久订阅,代码如下:

    consumer = session.createDurableSubscriber ((Topic) destination, "mysub");

jdbcPersistenceAdapter 节点可配置的属性如下:

 

属性                       默认值                      描述

adapter                                             指定持久存储的DB

cleanupPeriod               300000                 指定删除DB中已收到确认信

                                                    息的消息的时间周期(ms)

createTablesOnStartup       true                    是否在启动的时候创建相应的表

databaseLocker             DefaultDatabaseLocker instance

防止多个broker同时访问DB的锁实例

dataDirectory               activemq-data           指定Derby存储数据文件的路径

dataSource                  #derby                  指定要引用的数据源

lockAcquireSleepInterval    1000                    等待获取锁的时间长度(ms)

lockKeepAlivePeriod         30000                   定期向lock表中写入当前时间,

指定了锁的存活时间

useDatabaseLock             true                    在主从配置的时候是否使用排他锁

transactionIsolation        Connection.TRANSACTION_READ_UNCOMMITTED

指定事务隔离级别

 

注意:当消息发布时,消息会自动存储到数据库中,如果消息被订阅者消费,默认的是五分钟会自动删除表中的数据,如果服务器down掉后也不会影响。

 

JournaledJDBC

Journal with JDBC, ActiveMQ中,Journal持久方式提供了快速可靠的性能。Journal文件中的数据用于在ActiveMQ发生崩溃,下次重启时恢复消息的依据。当Region将消息存储到MessageStore的时候,store会将这条消息存储到Journal文件中,同样的,订阅者发回的应答消息也会存储在Journal文件中。在checkPoint时间点到来时,MessageStore将其内存中未发送的消息都存储到数据库中,将已发送并且得到应答的消息从数据库中删除,同时将Journal中的数据清空。这样就保证了任何时刻,未得到客户应答的消息都保存在硬盘介质——要么在Journal文件中,要么在数据库中。

 

1、 修改activemq.xml文件中的默认存储方式:

<persistenceFactory> 

<journaledJDBC  journalLogFiles="5"  dataSource="#mysql-ds" /> 

</persistenceFactory>

其它的配置如上面,和JdbcPersistence的主要区别在于:

在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。

2、 journaledJDBC 节点的有以下可配置属性:

属性                        默认值                  描述

adapter                                         指定持久存储的DB

createTablesOnStartup       true                是否在启动的时候创建相应的表

dataDirectory               activemq-data       指定Derby存储数据文件的路径

dataSource                  #derby              指定要引用的数据源

journalArchiveDirectory                         指定归档日志文件存储的路径

journalLogFiles             2                   指定日志文件的数量

journalLogFileSize              20MB                指定每个日志文件的大小

journalThreadPriority           10                  指定写入日志的线程的优先级

useDatabaseLock                 true                在主从配置的时候是否使用排他锁

useJournal                      true                指定是否使用日志

AmqPersistence

把消息存儲到data\kr-store\data目錄下的二進制文件中,AMQ Message StoreActiveMQ5.0缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是20M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。

以下是其配置的一个例子:

${activemq.base}/data替代掉Foo

<!-- default persistence -->

<persistenceAdapter>

<amqPersistenceAdapter syncOnWrite="false" directory=" Foo" maxFileLength="20 mb"/>

</persistenceAdapter>

kahaPersistence

把消息存儲到data\kr-store\data目錄下的二進制文件中,保存文件的路徑可以修改,Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data  logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

以下是其配置的一个例子:

${activemq.base}/data替代掉Foo      

<persistenceAdapter>

<kahaPersistenceAdapter directory="Foo" maxDataFileLength="33554432"/>

</persistenceAdapter>

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics