A. spring 集成activitymq持久化文件怎么设置
ActiveMQ另问题要软件能挂掉挂掉怕怕挂掉信息给丢所本节析几种持久化式:
、持久化文件
ActiveMQ默认支持种式要发消息设置消息持久化
打安装目录配置文件:
D:\ActiveMQ\apache-activemq\conf\activemq.xml越80行发现默认配置项:
注意使用kahaDB基于文件支持事务消息存储器靠高性能扩展消息存储器
设计初衷使用简单并尽能快KahaDB索引使用transaction log并且所destination使用index测试表明:用于产环境支持1万active connection每connection独立queue该表现已经足矣应付部需求
再发送消息候改变第二参数:
MsgDeliveryMode.Persistent
Message保存式2种
PERSISTENT:保存磁盘consumer消费message删除
NON_PERSISTENT:保存内存消费message清除
注意:堆积消息太能导致内存溢
打产者端发送消息:
wps30F4.tmp
启消费者端同管理界面查看:
wps3105.tmp
发现消息等待没持久化ActiveMQ宕机重启消息丢失我现修改文件持久化重启ActiveMQ消费者仍能够收消息
wps3106.tmp
二、持久化数据库
我支持Mysql例先载mysql-connector-java-5.1.34-bin.jar包放:
D:\ActiveMQ\apache-activemq\lib目录
打并修改配置文件:
复制代码
<beans
xmlns=""
xmlns:xsi=""
xsi:schemaLocation=" /spring-beans.xsd
/activemq-core.xsd">
file:${activemq.conf}/credentials.properties
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
<!--
The element is used to configure the ActiveMQ broker.
-->
<!-- The is used to prevent
slow topic consumers to block procers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
-->
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
-->
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
-->
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down procers. For more information, see:
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
-->
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
J
B. 如何实现activemq的topic的持久订阅
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿;
李四说,我是李四,有馒头给我留着,我回来拿;
参考例子如下:
// 创建connection
connection = connectionFactory.createConnection();
connection.setClientID("bbb"); //持久订阅需要设置这个。
connection.start();
// 创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建destination
Topic topic = session.createTopic("userSyncTopic"); //Topic名称
//MessageConsumer consumer = session.createConsumer(topic); //普通订阅
MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅
C. activemq 实现消息持久化到数据库的时候。这个我做到了。消息是保存到表中了。但是重新启动activemq之后,
队列在程序执行连接的时候会自动创建,不会应该使用的
D. activemq 持久消息和不持久的区别
持久传输和非持久传输最大的区别是:采用持久传输时,传输的消息会保存到磁盘中(messages are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。
采用非持久传输时,发送的消息不会存储到磁盘中。
采用持久传输时,当Borker宕机 恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。比如,当生产者将消息投递给Broker后,Broker将该消息存储到磁盘中,在Broker将消息发送给Subscriber之前,Broker宕机了,如果采用持久传输,Broker重启后,从磁盘中读出消息再传递给Subscriber;如果采用非持久传输,这条消息就丢失了。
E. rabbitmq消息真的可以持久化吗
可以,它默认会持久化到本地磁盘。所以数据不会丢失。1453857833 Windows C++ 超级简单封装,一个发送接口,一个接收接口,数据用vector<string>存放数据
F. rabbitmq保证消息不丢失
我是在网上粘贴复制的。
mq原则
数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。
2.丢失数据场景
丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景,
(1)rabbitmq
A:生产者弄丢了数据生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。
3.如何防止消息丢失
(1)rabbitmqA:生产者丢失消息①:可以选择使用rabbitmq提供是事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物
channel.txSelect();//开启事物try{//发送消息}catch(Exection e){channel.txRollback();//回滚事物//重新提交}复制代码
缺点:rabbitmq事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
②:可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirmchannel.confirm();//发送成功回调public void ack(String messageId){}// 发送失败回调public void nack(String messageId){//重发该消息}复制代码
二者不同事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
G. 用java代码如何设置activemq消息持久化到数据库中
ActiveMQ持久化消息的二种方式;
1、持久化为文件
这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
procer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
2、持久化为MySql
首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
接下来修改配置文件
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
在配置文件中的broker节点外增加
<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库。
然后重新启动消息队列,会发现多了3张表
1:activemq_acks
2:activemq_lock
3:activemq_msgs
H. activeMQ能不能配置多个实例的负载均衡,并只连接一个MySQL,将消息持久化存入到一个数据库中
ActiveMQ持久化消息的二种方式;1、持久化为文件这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:procer.Send(request,MsgDeliveryMode.Persistent,level,TimeSpan.MinValue);2、持久化为MySql首先需要
I. activemq 怎么处理持久化
如果消息是持久化的,activemq收到消息后会存储在持久性cursor中。对于非持久化消息,会存储在File Cursor中。从名称上File Cursor是持久性cursor,实际上activemq把FilePendingMessageCursor作为非持久性cursor。
File Cursor首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中,这样就不会因为有大量消息没有被消费而导致OOM。
当消息consumer启动时,mq收到消息会激活broker上的DurableTopicSubscrition,该subscription会检查当前的消息cursor里是否有消息,如果有就会dispatch这些消息,这样consumer就能收到之前发送的消息了。转载,仅供参考。
J. activemq 可以持久化多长时间
如果persist=true会一直保存,server重启也不会丢失,除非被消费。