activeMQ文档
概述
JMS 为Java程序提供了一种创建、发送、接收和读取企业消息系统中消息的通用方法。它们的底层实现技术各不相同,比如Sun MQ, IBM MQ,BEA MQ,Apache ActiveMQ等。它们是通过定义被管理的对象来实现。被管理的对象是由管理员通过使用JMS系统提供者的管理工具创建和定制,然后被JMS客户端使用。JMS客户端通过接口来调用这些被管理的对象,从而具备跨平台特性
ActiveMQ的优势:
1、实现JMS1.1规范,支持J2EE1.4以上
2、可运行于任何jvm和大部分web容器(ActiveMQ works great in any JVM)
3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT等等)
4、支持多种协议(stomp,openwire,REST)
5、良好的spring支持(ActiveMQ has great Spring Support)
6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ.)
7、与OpenJMS、JbossMQ等开源jms provider相比,ActiveMQ有Apache的支持,持续发展的优势明显。
相关类介绍:
l ConnectionFactory——客户端使用这个被管理对象来创建一个Connection。
l Connection——一个到JMS 提高商的活动连接。
l Destination——封装了消息目的地标识的被管理对象。
l Session——一个用于发送和接收消息的单线程上下文。
l MessageProducer——一个由Session 创建用于往目的地发送消息的对象。
l MessageConsumer——一个由Session 创建用于接收发送到目的地的消息的对象。
这些对象之间的关系如下图所示:
ActiveMQ下两种模式比较:
两种模式:Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者)
点对点模式语义:
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
发布/订阅者模式语义:
一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
下图ClientA和ClientB分别对应上面两种模式:
部署服务器
Windows:
1、http://activemq.apache.org/activemq-510-release.html,下载5.5.0 Windows。
2、下载后直接解压, 运行bin目录下的activemq.bat文件。
3、 访问http://localhost:8161/admin,进入后台管理,如下图:
4、点击上图标题栏中的Queues或Topics,可以分别进入点对点或广播模式的服务状态页面。
5、程序访问URL:tcp://localhost:61616
Linux
1、http://activemq.apache.org/activemq-510-release.html,下载5.5.0 linux/unix。
2、解压缩安装文件到运行目录, 使用命令 tar -xzvf 文件名.tar.gz。
3、启动、关闭方式:./bin/activemq start/stop
4、访问http://119.255.194.51:8161/admin,进入后台管理,类似于上图。
5、程序访问URL:tcp://119.255.194.51:61616
设置程序访问用户名和密码:
l 设置证书文件,放用户名和密码:${activemq.base}/conf/credentials.properties
activemq.username=logcd
activemq.password=028cd
l 在connectionFactory中,使用用户名和密码。
<!--加载属性配置文件,在activemq.xml中如果不存在则添加-->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:///${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--在Broker中,配置插件, ,位置在transportConnectors标签之前就可以了-->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}"
password="${activemq.password}" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
l 配置simpleAuthenticationPlugin,简单认证插件
ConnectionFactory cf = new ActiveMQConnectionFactory("logcd", "028cd", "tcp://127.0.0.1:61616")
设置控制台管理页面访问密码
需要密码验证:打开conf/jetty.xml文件,找到
<bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<property name="authenticate" value="false" /> #将“false”改为“true”即可
</bean>
设定密码:打开conf/jetty-realm.properties文件,找到
admin: admin,admin
用户名:密码,权限
开发流程
点对点(PTP)模式开发流程:
1、生产者(producer)开发流程(ProducerTool.java):
1.1 创建Connection:根据url,user和password创建一个jms Connection。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory (user, password, url);
Connection connection = connectionFactory.createConnection ();
1.2 创建Session:
在connection基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
Session session = connection.createSession (false, Session.AUTO_ACKNOWLEDGE);
1.3 创建Destination对象:
需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。
Destination destination = session.createTopic (subject);
1.4 创建MessageProducer:
根据Destination创建MessageProducer对象,同时设置其持久模式。
MessageProducer producer = session.createProducer (destination);
producer.setDeliveryMode (DeliveryMode.NON_PERSISTENT);
1.5 发送消息到队列(Queue):
封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。
TextMessage msg = session.createTextMessage (message);
connection.start ();
producer.send (msg);
2、消费者(consumer)开发流程(ConsumerTool.java):
2.1 实现MessageListener接口:
消费者类须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText ();
2.2 创建Connection:
根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。
2.3 创建Session和Destination:
与ProducerTool.java中的流程类似,不再赘述。
2.4创建replyProducer【可选】:
可以用来将消息处理结果发送给producer。
2.5 创建MessageConsumer:
根据Destination创建MessageConsumer对象,并监听发布者发布的消息
MessageConsumer consumer = session.createConsumer (destination);
consumer.setMessageListener (this);
connection.start ();
广播模式(pub/sub)开发流程
与PTP消息模式类似。
参考下载的压缩包中example中的例子(TopicPublisher.java/TopicListener.java)。
持久化操作(以MySql为例)
持久化操作分为四种:JdbcPersistence,JournaledJDBC ,AmqPersistence(默認方式),kahaPersistence
持久化操作时注意,一定先订阅主题,让服务器知道此主题要持久化.这样在以后的操作中先发布先订阅就没有关系了.
JdbcPersistence:
pure JDBC without a journal, 目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL,Oracle, Postgresql, SQLServer, Sybase。
1、 修改activemq.xml文件中的默认存储方式:
如果下面代码已存在,放开注释即可:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
2、 配置MySQL数据源,在</broker>节点后面,增加MySQL数据源配置:
如果下面代码已存在,放开注释即可
<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="misc_root"/>
<property name="password" value="misc_root_pwd"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
3、 手动创建数据库:activemq,与上面URL是的相同.这时启动服务.会发现多了3张数据表: activemq_acks, | activemq_lock | activemq_msgs,在activemq_msgs中会看到持久化后的信息。
4、 配置文件修改好之后,将mysql JDBC驱动包mysql-connector-java-5.1.9.jar放到%ACTIVEMQ_HOME%\lib\下。
5、 程序代码如下注意:
a.创建生产者对象(MessagProducer)时,设置为持久模式,代码如下:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
b.创建消费者对象(MessageConsumer)时,设置为持久订阅,代码如下:
consumer = session.createDurableSubscriber ((Topic) destination, "mysub");
jdbcPersistenceAdapter 节点可配置的属性如下:
属性 默认值 描述
adapter 指定持久存储的DB
cleanupPeriod 300000 指定删除DB中已收到确认信
息的消息的时间周期(ms)
createTablesOnStartup true 是否在启动的时候创建相应的表
databaseLocker DefaultDatabaseLocker instance
防止多个broker同时访问DB的锁实例
dataDirectory activemq-data 指定Derby存储数据文件的路径
dataSource #derby 指定要引用的数据源
lockAcquireSleepInterval 1000 等待获取锁的时间长度(ms)
lockKeepAlivePeriod 30000 定期向lock表中写入当前时间,
指定了锁的存活时间
useDatabaseLock true 在主从配置的时候是否使用排他锁
transactionIsolation Connection.TRANSACTION_READ_UNCOMMITTED
指定事务隔离级别
注意:当消息发布时,消息会自动存储到数据库中,如果消息被订阅者消费,默认的是五分钟会自动删除表中的数据,如果服务器down掉后也不会影响。
JournaledJDBC:
Journal with JDBC, 在ActiveMQ中,Journal持久方式提供了快速可靠的性能。Journal文件中的数据用于在ActiveMQ发生崩溃,下次重启时恢复消息的依据。当Region将消息存储到MessageStore的时候,store会将这条消息存储到Journal文件中,同样的,订阅者发回的应答消息也会存储在Journal文件中。在checkPoint时间点到来时,MessageStore将其内存中未发送的消息都存储到数据库中,将已发送并且得到应答的消息从数据库中删除,同时将Journal中的数据清空。这样就保证了任何时刻,未得到客户应答的消息都保存在硬盘介质——要么在Journal文件中,要么在数据库中。
1、 修改activemq.xml文件中的默认存储方式:
<persistenceFactory>
<journaledJDBC journalLogFiles="5" dataSource="#mysql-ds" />
</persistenceFactory>
其它的配置如上面,和JdbcPersistence的主要区别在于:
在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。
2、 journaledJDBC 节点的有以下可配置属性:
属性 默认值 描述
adapter 指定持久存储的DB
createTablesOnStartup true 是否在启动的时候创建相应的表
dataDirectory activemq-data 指定Derby存储数据文件的路径
dataSource #derby 指定要引用的数据源
journalArchiveDirectory 指定归档日志文件存储的路径
journalLogFiles 2 指定日志文件的数量
journalLogFileSize 20MB 指定每个日志文件的大小
journalThreadPriority 10 指定写入日志的线程的优先级
useDatabaseLock true 在主从配置的时候是否使用排他锁
useJournal true 指定是否使用日志
AmqPersistence
把消息存儲到data\kr-store\data目錄下的二進制文件中,AMQ Message Store是ActiveMQ5.0缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是20M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。
以下是其配置的一个例子:
${activemq.base}/data替代掉Foo
<!-- default persistence -->
<persistenceAdapter>
<amqPersistenceAdapter syncOnWrite="false" directory=" Foo" maxFileLength="20 mb"/>
</persistenceAdapter>
kahaPersistence
把消息存儲到data\kr-store\data目錄下的二進制文件中,保存文件的路徑可以修改,Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
以下是其配置的一个例子:
${activemq.base}/data替代掉Foo
<persistenceAdapter>
<kahaPersistenceAdapter directory="Foo" maxDataFileLength="33554432"/>
</persistenceAdapter>
分享到:
相关推荐
消息中间件的学习笔记
NULL 博文链接:https://sswh.iteye.com/blog/1974169
NULL 博文链接:https://sswh.iteye.com/blog/1974131
NULL 博文链接:https://sswh.iteye.com/blog/1974156
activeMQ学习笔记,JMS有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务...
NULL 博文链接:https://yingzhuo.iteye.com/blog/1566635
mq的方式有两种:点到点和发布/订阅,全部有代码,可以直接运行
activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记
active mq的学习手册,例子等。你可以一步步地上手。
ActiveMQ+In+Action学习笔记
这是我学习activeMQ时总结的文档,其中有实例,还有一些我遇到的问题。
脑图内容涵盖视频的99%的笔记,含有自己编写的代码文件,外加了自己对一些问题的测试与回答。 消息中间件之ActiveMQ 消息中间件已经成为互联网企业应用系统内部通信的核心手段,是目前企业内主流标配技术, 它...
JMS教程,学习笔记,基于XML和JMS的异构数据交换集成
Linux面试专题及答案+ActiveMQ消息中间件面试专题+Java基础面试题+MySQL性能优化的21个最佳实践+微服务面试专题及答案+深入理解java虚拟机+设计模式面试专题及答案+开源框架面试专题及答案+并发编程及答案+Spring...
activeMq,rabbitMq,activity工作流,docker,dubbo,netty,rpc,springcloud,zookeeper学习笔记
NULL 博文链接:https://yuxisanren.iteye.com/blog/1912587
Java架构面试专题(含答案)和学习笔记.rar
Java架构面试专题汇总(含答案)和学习笔记 ActiveMQ消息中间件面试专题; Dubbo面试及答案(上); Java基础面试题; 等资料。
ActiveMQ技术的基础详细学习笔记,总结了ActiveMQ的各个知识点,可以用来复习以及对基础知识的巩固,对新人的学习很有帮助。