high-concurrent-queue
  • Introduction
  • 消息中间件的基本概念
  • ActiveMQ
    • JMS规范
    • JAVA原生客户端
    • 与Spring集成
    • 持久化机制
    • ack机制
    • ActiveMQ修改连接的用户名密码
  • Rabbitmq
    • AMQP规范
    • 安装配置
    • 原生Java客户端使用
    • 与Spring集成
    • ack机制
    • 持久化机制
    • 消息确认进制
      • 消息确认机制(AMQP事务)
      • 消息确认机制(Confirm模式)
    • 延迟队列
      • 消费端限流
      • TTL
      • 死信队列
      • 延迟队列——消息延迟推送
      • 实现延迟任务
    • direct、topic、fanout的使用以及区别
  • 实战场景
    • 系统解耦
    • 定时消息
    • 消峰填谷
    • 分布式事务
    • mq-rpc
    • 消息分发
  • kafka
    • kafka实战
Powered by GitBook
On this page
  • 背景
  • ActiveMQ的几种存储模式选择
  • 持久化操作
  • 参考文档:

Was this helpful?

  1. ActiveMQ

持久化机制

背景

场景问题:服务器断电重启,未被消费的消息是否会在重启之后被继续消费? 两种选择:非持久性模式/持久性模式

非持久性模式:

服务器断电(关闭)之后,使用非持久性模式时,没有被消费的消息不会继续消费全部丢失; 程序会报一个连接关闭异常停止运行,继续启动服务器运行程序,不会接收任何消息。

持久性模式:

服务器断电(关闭)后,使用持久性模式时,没有被消费的消息会继续消费; 程序也会报连接关闭异常,但再次启动服务器和程序后,接收方还能继续原来的消息再次接收。

ActiveMQ的几种存储模式选择

  • AMQ消息存储 (默认的消息存储)

  • KahaDB 消息存储(提供容量的提升和恢复能力) 基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器

  • JDBC 消息存储(消息基于JDBC存储)

  • 使用Mysql或Oracle数据库存储消息

  • Memory 消息存储(基于内容的消息存储) ActiveMQ支持将消息保存到内存中,将Broker的“prsistent” 属性设置为“false”。

持久化操作

  • 持久化文件

ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了。

打开安装目录下的配置文件:

D:\ActiveMQ\apache-activemq\conf\activemq.xml在越80行会发现默认的配置项:

<persistenceAdapter>

<kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器。

 他的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。

然后再发送消息的时候改变第二个参数为:

MsgDeliveryMode.Persistent

Message保存方式有2种 PERSISTENT:保存到磁盘,consumer消费之后,message被删除。 NON_PERSISTENT:保存到内存,消费之后message被清除。 注意:堆积的消息太多可能导致内存溢出。

然后打开生产者端发送一个消息:

不启动消费者端,同时在管理界面查看:

发现有一个消息正在等待,这时如果没有持久化,ActiveMQ宕机后重启这个消息就是丢失,而我们现在修改为文件持久化,重启ActiveMQ后消费者仍然能够收到这个消息。

  • 持久化为数据库

我们从支持Mysql为例,将:

  • commons-dbcp-1.4.jar

  • commons-pool-1.6.jar

  • mysql-connector-java-5.1.30.jar

D:\ActiveMQ\apache-activemq\lib目录下。

打开并修改配置文件:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
            <kahaDB directory="${activemq.data}/kahadb"/>
        -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
   <!-- 加入的配置  --> 
  <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value=""/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

启动ActiveMQ,分析数据表结构发现多了3张表:

activemq_acks、activemq_lock、activemq_msgs

activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

CONTAINER:消息的目的地 SUB_DEST:如果是使用static集群,这个字段会有集群其他系统的信息 CLIENT_ID:每个订阅者客户端ID SUB_NAME:订阅者名称 SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作 LAST_ACKED_ID:记录消费过的消息的id。 PRIORITY:优先级 XID: activemq_lock:用于记录哪一个Broker是Master Broker。这张表只有在集群环境中才会用到,在集群中,只能有一个Broker来接收消息,那么这个Broker就是主Broker,其他的作为从Broker,用来备份等待。

ID:主键

TIME:时间

BROKER_NAME:Broker名称

activemq_msgs:用于存储消息,Topic和Queue消息都会保存在这张表中

ID:自增主键 CONTAINER:容器名称 MSGID_PROD:消息发送者客户端的主键 MSGID_SEQ:发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数 MSG:消息本体的java序列化对象的二进制数据 PRIORITY:优先级,从0-9,数值越大优先级越高

然后分别启动生产者和消费者,在MySQL中进行消息数据查看。

  • KahaDB方式

KahaDB是从ActiveMQ 5.4开始默认的持久化插件,也是我们项目现在使用的持久化方式。

KahaDb恢复时间远远小于其前身AMQ并且使用更少的数据文件,所以可以完全代替AMQ。 kahaDB的持久化机制同样是基于日志文件,索引和缓存。

配置方式:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>

d irectory : 指定持久化消息的存储目录

journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置

(1)KahaDB主要特性 1、日志形式存储消息; 2、消息索引以B-Tree结构存储,可以快速更新; 3、完全支持JMS事务; 4、支持多种恢复机制;

(2)KahaDB的结构

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。 消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

Data logs: Data logs用于存储消息日志,消息的全部内容都在Data logs中。 同AMQ一样,一个Data logs文件大小超过规定的最大值,会新建一个文件。同样是文件尾部追加,写入性能很快。 每个消息在Data logs中有计数引用,所以当一个文件里所有的消息都不需要了,系统会自动删除文件或放入归档文件夹。

Metadata cache : 缓存用于存放在线消费者的消息。如果消费者已经快速的消费完成,那么这些消息就不需要再写入磁盘了。 Btree索引会根据MessageID创建索引,用于快速的查找消息。这个索引同样维护持久化订阅者与Destination的关系,以及每个消费者消费消息的指针。

Metadata store 在db.data文件中保存消息日志中消息的元数据,也是以B-Tree结构存储的,定时从Metadata cache更新数据。Metadata store中也会备份一些在消息日志中存在的信息,这样可以让Broker实例快速启动。 即便metadata store文件被破坏或者误删除了。broker可以读取Data logs恢复过来,只是速度会相对较慢些。

  • LevelDB方式

从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。

目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,可能是以后的趋势。

在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

  • AMQ方式

性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。 当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。 主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。

配置片段如下:

<persistenceAdapter>
     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

虽然AMQ性能略高于Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。

参考文档:

Previous与Spring集成Nextack机制

Last updated 5 years ago

Was this helpful?

https://www.cnblogs.com/binyue/p/5371479.html
https://www.cnblogs.com/madyina/p/4127312.html
https://www.jianshu.com/p/deb1816271d1
wps30F4.tmp
wps3105.tmp
wps3106.tmp