ActiveMQ初认识

是什么

        ActiveMQ是Apache推出的,一款开源的,完全支持J MS1.1和J 2EE 1.4规范的J MS  Provider实现的消息中间件 (Message Oriented Middlew are,MOM)

为什么使用

     实现J MS Provider,用来帮助实现高可用、高性能、可伸缩、 易用和安全的企业级面向消息服务的系统

主要特性

⒈ 多种语言和协议编写客户端。语言:   Jav a, C , C + + , C #, R uby , Per l, Py t hon, PHP。应用协议: O penW ir e, S t om p      R ES T , W S N ot ificat ion, XMPP, AMQ P。其中O penW ir e用于Jav a, C , C + + , C #,S t om p用于R uby , Per l, Py t hon, PHP
⒉ 完全支持JMS 1 . 1 和J2 EE  1 . 4 规范 (持久化,X A消息,事务)
⒊ 对S pr ing的支持,Ac t iv eMQ 可以很容易内嵌到使用S pr ing的系统里面去,而且也支持S pr ing2 . 0 的特性
⒋ 通过了常见J2 EE服务器(如 G er onim o, JBos s   4 , G las s F is h, W ebL ogic )的测试,其中通过JC A  1 . 5   r es our ce  adapt or s 的配置,可以让Ac t iv eMQ 可以自动的部署到任何兼容J2 EE  1 . 4  商业服务器上
⒌ 支持多种传送协议:in-V M, TC P, S S L , N IO , U DP, JG r oups , JX TA
⒍ 支持通过JDBC 和jour nal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
⒐ 支持与Ax is 的整合
⒑ 可以很容易得调用内嵌JMS   pr ov ider ,进行测试

使用场景

异构应用之间的整合
事件驱动的体系结构
企业应用集成
地理分散
信息广播
构建动态系统

spring中集成activeMQ

导入jms接口jar包和spring框架相关jar包

引入日志相关j a r包,如sl f 4j,l og4j,c ommons-l oggi ng

     因为s pr ing框架内部使用com m on- logging作为日志实现,我们为了方便代码调试,所以需要引入log4 j, 而s lf4 j是日志规范。

导入activeMQ核心jar:activemq-core-5.6.0.jar

编写spring配置文件

消息生产者spring-jms-producer.xml
< ! - -  定义连接工厂  - ->
< bean  id= " t ar get C onnec t ionF ac t or y "   c las s = " or g. apache. ac t iv em q. Ac t iv eMQ C onnec t ionF ac t or y ">
< pr oper t y   nam e= " br oker U R L "   v alue= " t cp: //localhos t : 6 1 6 1 6 "   />
< /bean>
< bean  id= " connec t ionF ac t or y "
c las s = " or g. s pr ingfr am ew or k. jm s . connec t ion. S ingleC onnec t ionF ac t or y ">
< pr oper t y   nam e= " t ar get C onnec t ionF ac t or y "   r ef= " t ar get C onnec t ionF ac t or y "   />
< /bean>
< bean  id= " jm s T em plat e"   c las s = " or g. s pr ingfr am ew or k. jm s . cor e. Jm s T em plat e">
< pr oper t y   nam e= " connec t ionF ac t or y "   r ef= " connec t ionF ac t or y "   />
< /bean>
< bean  id= " queueDes t inat ion"   c las s = " or g. apache. ac t iv em q. com m and. Ac t iv eMQ Q ueue">
< cons t r uc t or - ar g>
< v alue> queue< /v alue>
< /cons t r uc t or - ar g>
< /bean>
消息消费者spring-jms-consumer.xml
< ! - -  定义连接工厂  - ->
< bean  id= " t ar get C onnec t ionF ac t or y "   c las s = " or g. apache. ac t iv em q. Ac t iv eMQ C onnec t ionF ac t or y ">
< pr oper t y   nam e= " br oker U R L "   v alue= " tcp://localhost:61616"   />
< /bean>
< bean  id= " connec t ionF ac t or y "
c las s = " or g. s pr ingfr am ew or k. jm s . connec t ion. S ingleC onnec t ionF ac t or y ">
< pr oper t y   nam e= " t ar get C onnec t ionF ac t or y "   r ef= " t ar get C onnec t ionF ac t or y "   />
< /bean>
< bean  id= " queueDes t inat ion"   c las s = " or g. apache. ac t iv em q. com m and. Ac t iv eMQ Q ueue">
< cons t r uc t or - ar g>
< v alue> queue< /v alue>
< /cons t r uc t or - ar g>
< /bean>
< bean  id= " cons um er Mes s ageL is t ener "   c las s = " ac t iv em q. jm s . s pr ing. C ons um er Mes s ageL is t ener "  >
< /bean>
< bean  id= " jm s C ont ainer "
c las s = " or g. s pr ingfr am ew or k. jm s . lis t ener . Default Mes s ageL is t ener C ont ainer ">
< pr oper t y   nam e= " connec t ionF ac t or y "   r ef= " connec t ionF ac t or y "   />
< pr oper t y   nam e= " des t inat ion"   r ef= " queueDes t inat ion"   />
< pr oper t y   nam e= "m es s ageL is t ener "   r ef= " cons um er Mes s ageL is t ener "   />
< ! - -  消息接收超时时间,单位毫秒  - ->
< pr oper t y   nam e= " r eceiv eT im eout "   v alue= " 6 0 0 0 0 " />
< ! - -  是否开启消息事务  - ->
< pr oper t y   nam e= " s es s ionT r ans ac t ed"   v alue= " fals e"   />
< ! - -AU TO _ AC KN O W L EDG E
C L I EN T _ AC KN O W L EDG E
DU PS _ O K_ AC KN O W L EDG E
S ES S IO N _ TR AN S AC T ED
- ->
< pr oper t y   nam e= " s es s ionAcknow ledgeModeN am e"   v alue= " C L I EN T _ AC KN O W L EDG E"   />
< /bean>

编写生产者消费者代码,进行消息发送接收测试


性能优化

尽量使用非持久化消息

当你的消息没必要持久化时,可以使用非持久化消息来提升消息发送性能,可以通过MessageProducer.setDeliverMode( )来设置消息持久化,默认为t r ue即持久化到硬盘。如果设置为fals e,则重新启动MQ后,消息会丢失。非持久化消息的发送效率比持久化消息高一个数量级,因为持久化消息需要阻塞等待消息队列的回执消息且磁盘读写IO

通过事务批量发送消息

//第一个参数设置为t r ue,开启消息事务
connec t ion. c r eat eS es s ion(Boolean.TRUE,   S es s ion. AU TO _ AC KN O W L EDG E);
Topic   t opic  =   s es s ion. c r eat eTopic (" T es t . T r ans ac t ions " );
Mes s agePr oducer   pr oducer  =   s es s ion. c r eat ePr oducer (t opic );
int   count  = 0 ;
Dat e  s t ar t Dat e =   new  Dat e();
for   (int   i = 0 ;   i <   5 0 0 0 0 ;   i+ + )  {
Mes s age m es s age =   s es s ion. c r eat eT ex t Mes s age("m es s age  "  +   i);
pr oducer . s end(m es s age);
if ( i!=0 && i%2000==0) {
session.commit( ) ;
}
}
每2 0 0 0 条消息批量提交一次

TCP协议调优
设置消息队列的内存空间
ActiveMQ消息预取限制
消息异步分派

      所谓消息异步分派就是直接将消息发送给消费者,而不是将消息先发送到消息队列,然后消费者从队列里去取。将alw ay s Ay nc设置为fals e,表示如果仅仅只有一个s es s ion关联到connec t ion,在这种场景下,将直接从t r ans por t 层把消息直接发送到Mes s age  C ons um er  线程上。

消息压缩

Ac t iv eMQ C onnec t ionF ac t or y   connec t ionF ac t or y  =   new   Ac t iv eMQ C onnec t ionF ac t or y ();
connec t ionF ac t or y . setUseCompression(t r ue);
或者连接U RL 里设置
t cp: //linux . y ida. com : 6 1 6 1 6 ?jms.useCompression= t r ue

使用bytesMessage
消息确认机制优化
消息处理添加线程池

publicclassSmsMoPoolimplementsMessageListener {privatefinalstatic Logger logger = LoggerFactory.getLogger(SmsMoPool.clas
s);
private DefaultEventPubliser moEventPublisher;
privatefinal EventFactory eventFactory = new DefaultEventFactory();
private DefaultDataGather dataGather;
private ExecutorService pool = Executors.newFixedThreadPool(5);
@Override
publicvoid onMessage(final Message message) {
pool.execute(new Runnable() {
@Override
publicvoid run() {
final ObjectMessage msg = (ObjectMessage) message;
Serializable obj = null;
try {
obj = msg.getObject();
} catch (JMSException e) {
logger.error("从消息队列获取消息异常", e);
}
if (obj != null) {
dataGather.incrementDateCount(MasEntityConstants.TRAFFIC_SMS_MO_IN);
AgentToServerReq req = (AgentToServerReq) obj;
Event event = eventFactory.createMoEvent(req);
moEventPublisher.publishEvent(event);
}
}
});
}
}

不用频繁建立和关闭连接
对session对象池化
关闭connection的copyMessageonSend

connec t ionF ac t or y . s et C opy Mes s ageO nS end(fals e);
或者
ac t iv em q. xm l的< t r ans por t C onnec t or > 元素上设置copy Mes s ageO nS end= " fals e"

生产者/消费者流控
设置ActiveMQ消息可以使用的最大内存/磁盘空间
ActiveMQ的JVM参数调优

对比kafka分布式实现




相关内容推荐