博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第十一章 企业项目开发--消息队列activemq
阅读量:6670 次
发布时间:2019-06-25

本文共 20477 字,大约阅读时间需要 68 分钟。

注意:本章代码基于 

代码的github地址:

 

消息队列是分布式系统中实现RPC的一种手段。

1、消息队列的基本使用流程

假设:

  • 我们有这样一个需求,当每注册一个admin的之后,就写一条日志log数据到数据库。

分析:

  • 在实际中,我们是不会把日志直接写入数据库的,因为日志数据通常是庞大的,而且日志的产生是频繁的,如果我们使用数据库存储日志,哪怕是使用异步存储,也是极耗性能的。在企业中,对于日志的处理方式很多,比较简单的一种是,日志直接产生于nginx或后端服务器(eg.resin),我们写一个定时任务,每隔一段时间,将产生的日志文件使用shell和Python进行正则过滤,取出有用信息,之后进行处理统计,最后将处理后的数据写入数据库。

在这里我们作为演示,,每当注册一个admin的之后,我们异步写一条日志log数据到数据库。

下边的举例也是对代码的解释。

  • server1:部署ssmm0-userManagement
  • server2:部署ssmm0-rpcWeb
  • server3:部署消息队列服务器

当server1执行一个"http://localhost:8080/admin/register?username=canglang25&password=1457890"操作,即向数据库插一条admin信息时,同时将日志log信息写入server3,之后不会等待log信息被server2消费掉就直接返回(异步);

server2循环接收server3中的消息队列中的消息,并将这些log消息写入数据库。

 

2、消息队列的作用

  • 异步
  • 解耦:server1(消息生产者服务器)和server3(消息消费者服务器)没有直接联系
  • 削峰填谷:当大量请求涌入应用服务器时,应用服务器如果处理不过来,就将这些请求先放入队列,之后再从队列中取出请求慢慢处理(秒杀的一种处理方式

 

3、消息队列的两种方式

  • P2P
    • 消息生产者产生的消息只能由一个消息消费者消费
    • 基于队列queue
    • 执行流程
      • 生产者:创建连接工厂-->创建连接-->启动连接-->创建session-->创建队列,创建生产者,创建消息-->发送消息
      • 消费者:创建连接工厂-->创建连接-->启动连接-->创建session-->创建队列,创建消费者-->接收消息
  • 发布-订阅
    • 消息生产者产生的消息可以由所有订阅了(监听了)该消息的消费者消费
    • 基于主题topic
    • 执行流程
      • 生产者:创建连接工厂-->创建连接-->启动连接-->创建session-->创建topic,创建消息发布者,创建消息-->发布消息
      • 消费者:创建连接工厂-->创建连接-->启动连接-->创建session-->创建topic,创建消息订阅者-->消息订阅者通过监听器接收消息

 

4、实例(基于P2P实现)

4.1、整体代码结构:

 

4.2、模块依赖关系

注:箭头的指向就是当前模块所依赖的模块。(eg.rpcWeb依赖data)

  • userManagement:用户管理模块--war
  • rpcWeb:rpc测试模块(这里用于模拟接收处理消息的应用)--war
  • cache:缓存模块--jar
  • rpc:rpc模块(包含mq/mina/netty)--jar
  • data:数据处理模块--jar
  • common:通用工具类模块--jar

 

4.3、代码

代码整体没变,只列出部分新增代码,完整代码从文首的github进行clone即可。

4.3.1、ssmm0

pom.xml

common
cache
rpc
data
userManagement
rpcWeb
org.slf4j
slf4j-log4j12
1.5.11
View Code

说明:只列出部分新增的代码。

注意:

  • activemq必须配置slf4j-log4j12,而该jar也会被所有的模块用到(因为所有的模块都需要打日志),至于该模块的版本号的选择我们可以根据"启动activemq,并运行自己的程序"从eclipse的console窗口的打印信息来选择。
  • slf4j-log4j12这个jar在pom.xml中引入到依赖池中后,还需要进行实际依赖
  • module部分最好按照依赖关系从底向上排列,这样在"compile"的时候不容易出错

 

4.3.2、ssmm0-common

pom.xml

1 
2
4 5
4.0.0
6 7
8
9
com.xxx
10
ssmm0
11
1.0-SNAPSHOT
12
13 14
com.xxx.ssmm0
15
ssmm0-common
16 17
ssmm0-common
18
jar
19 20
21
22
23
org.bouncycastle
24
bcprov-jdk15on
25
26
27
28
commons-codec
29
commons-codec
30
31
32
View Code

DateUtil:

1 package com.xxx.util; 2  3 import java.text.DateFormat; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 6  7 /** 8  * 线程安全的日期类工具 9  */10 public class DateUtil {11     private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";12     private static ThreadLocal
threadLocal = new ThreadLocal
();13 14 /**15 * 获取DateFormat实例16 */17 public static DateFormat getDateFormat() {18 DateFormat df = threadLocal.get();//从threadLocal中获取当前线程的DateFormat实例副本19 if(df==null){
//如果当前线程实例为null,说明该线程第一次使用该方法20 df = new SimpleDateFormat(DATE_FORMAT);//创建df实例21 threadLocal.set(df);//将df实例放置到threadLocal中去22 }23 return df;24 }25 26 /**27 * 将Date格式化为String字符串28 */29 public static String formatDate(Date date) {30 return getDateFormat().format(date);31 }32 33 /**34 * 获取当前时间35 * @return 字符串(eg.2001-11-12 12:23:34)36 */37 public static String getCurrentTime() {38 //第一种方式39 //return formatDate(new Date());40 41 //第二种方式(也是最推荐的方式)42 DateFormat df = getDateFormat();43 return df.format(System.currentTimeMillis());44 45 //第三种方式46 /*Calendar c = Calendar.getInstance();47 return c.get(Calendar.YEAR)+"-"+c.get(Calendar.MONTH)+"-"+c.get(Calendar.DATE)48 +"-"+c.get(Calendar.HOUR)+"-"+c.get(Calendar.MINUTE)+"-"+c.get(Calendar.SECOND);*/49 }50 51 /*****************测试*****************/52 /*public static void main(String[] args) {53 System.out.println(getCurrentTime());54 }*/55 }
View Code

注意:

  • jdk的SimpleDateFormat类是一个线程不安全的类,一般情况下只要不设置为static型类变量就可以了,但是更安全的做法是使用ThreadLocal类包装一下(如代码所示),当然也可以使用其他的日期工具。
  • 获取当前时间有三种方式(如代码所示),最推荐的是第二种

PropUtil:即之前的FileUtil

 

4.3.3、ssmm0-rpc

pom.xml

1 
2
4 5
4.0.0
6 7
8
9
com.xxx
10
ssmm0
11
1.0-SNAPSHOT
12
13 14
com.xxx.ssmm0
15
ssmm0-rpc
16 17
ssmm0-rpc
18
jar
19 20
21
22
23
24
com.xxx.ssmm0
25
ssmm0-common
26
1.0-SNAPSHOT
27
28
29
30
org.apache.activemq
31
activemq-all
32
5.5.0
33
34
35
View Code

rpc_config.properties

#activemq配置activemq.queueURL=tcp://127.0.0.1:61616activemq.queueName=adminQueue
View Code

说明:

  • 这里直接将数据配置在这里了,实际上可以将数据配置到ssmm0的根pom.xml中去。

ActiveMQP2PUtil:基于P2P的activemq的消息收发工具类

1 package com.xxx.rpc.mq.util;  2   3 import java.io.Serializable;  4 import java.util.Properties;  5   6 import javax.jms.Connection;  7 import javax.jms.ConnectionFactory;  8 import javax.jms.DeliveryMode;  9 import javax.jms.Destination; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.MessageProducer; 14 import javax.jms.ObjectMessage; 15 import javax.jms.Session; 16  17 import org.apache.activemq.ActiveMQConnection; 18 import org.apache.activemq.ActiveMQConnectionFactory; 19  20 import com.xxx.rpc.mq.handler.MessageHandler; 21 import com.xxx.util.PropUtil; 22  23 /** 24  * activemq p2p 工具类  25  */ 26 public class ActiveMQP2PUtil { 27     private static final String RPC_CONFIG_FILE = "rpc_config.properties"; 28     private static String queueURL;        //队列所在的URL 29     private static String queueName;    //队列名称 30     private static ConnectionFactory connectionFactory;    //连接工厂 31      32     static{ 33         Properties props = PropUtil.loadProps(RPC_CONFIG_FILE); 34         queueURL = props.getProperty("activemq.queueURL", "tcp://127.0.0.1:61616"); 35         System.out.println(queueURL); 36         queueName = props.getProperty("activemq.queueName", "adminQueue"); 37         connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  38                                                           ActiveMQConnection.DEFAULT_PASSWORD,  39                                                           queueURL); 40     } 41      42     /** 43      * 发送消息 44      */ 45     public static void sendMessage(Serializable message){ 46         Connection conn = null; 47         try { 48             conn = connectionFactory.createConnection();//创建连接 49             conn.start();//启动连接 50             Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session 51             Destination destination = session.createQueue(queueName);//创建队列 52             MessageProducer producer = session.createProducer(destination); 53             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息设置为非持久化 54             ObjectMessage msg = session.createObjectMessage(message);//创建消息:createObjectMessage()该方法的入参是Serializable型的 55             producer.send(msg);//发送消息 56             session.commit();//提交消息 57         } catch (JMSException e) { 58             e.printStackTrace(); 59         }finally{ 60             if(conn!=null){ 61                 try { 62                     conn.close(); 63                 } catch (JMSException e) { 64                     e.printStackTrace(); 65                 } 66             } 67         } 68     } 69      70     /** 71      * 接收消息 72      * @param handler 自定义的消息处理器 73      */ 74     public static void receiveMessage(MessageHandler handler){ 75         Connection conn = null; 76         try { 77             conn = connectionFactory.createConnection();//创建连接 78             conn.start();//启动连接 79             Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session 80             Destination destination = session.createQueue(queueName);//创建队列 81             MessageConsumer consumer = session.createConsumer(destination);//创建消息消费者 82             while(true){//死循环接收消息 83                 Message msg = consumer.receive();//接收消息 84                 if(msg!=null){ 85                     handler.handle(msg);//处理消息 86                     //System.out.println(msg); 87                 } 88             } 89         } catch (JMSException e) { 90             e.printStackTrace(); 91         }finally{ 92             if(conn!=null){ 93                 try { 94                     conn.close(); 95                 } catch (JMSException e) { 96                     e.printStackTrace(); 97                 } 98             } 99         }100     }101     102     /*public static void main(String[] args) {103         sendMessage("hello world3");104     }*/105 }
View Code

说明:

  • 对照P2P的执行流程来看代码
  • 关于static块的执行时机,可以去看 
    • 在我们启动spring容器时,上述的static块不执行,只有第一次使用到该类的时候才执行
    • 假设我们为该类添加了注解@Component,那么该类会由spring容器来管理,在spring初始化bean之后就会执行该static块(也就是说spring容器启动时,执行static块)
    • 若将该类不添加如上注解,直接实现接口InitializingBean,并且将static代码块中的信息写到afterPropertiesSet()方法中,则spring容器启动时,执行static块
  • 对于消息的接收,这里采用了循环等待机制(即死循环),也可以使用事件通知机制
  • 关于activemq的其他内容之后再说

MessageHandler:消息处理器接口(其实现类是对接收到的消息进行处理的真正部分)

1 package com.xxx.rpc.mq.handler; 2  3 import javax.jms.Message; 4  5 /** 6  * 消息处理器接口 7  */ 8 public interface MessageHandler { 9     public void handle(Message message);10 }
View Code

 

4.3.4、ssmm0-data

pom.xml

1 
2
4 5
4.0.0
6 7
8
9
com.xxx
10
ssmm0
11
1.0-SNAPSHOT
12
13 14
com.xxx.ssmm0
15
ssmm0-data
16 17
ssmm0-data
18
jar
19 20
21
22
23
24
mysql
25
mysql-connector-java
26
27
28
29
org.apache.tomcat
30
tomcat-jdbc
31
32
33
34
org.mybatis
35
mybatis
36
37
38
org.mybatis
39
mybatis-spring
40
41
42
43
javax.servlet
44
javax.servlet-api
45
46
47
48
com.google.guava
49
guava
50
14.0.1
51
52
53
54
com.xxx.ssmm0
55
ssmm0-cache
56
1.0-SNAPSHOT
57
58
59
60
com.xxx.ssmm0
61
ssmm0-rpc
62
1.0-SNAPSHOT
63
64
65
View Code

Log:日志模型类

package com.xxx.model.log;import java.io.Serializable;/** * 日志 */public class Log implements Serializable {    private static final long serialVersionUID = -8280602625152351898L;        private String operation;   // 执行的操作    private String currentTime; // 当前时间    public String getOperation() {        return operation;    }    public void setOperation(String operation) {        this.operation = operation;    }    public String getCurrentTime() {        return currentTime;    }    public void setCurrentTime(String currentTime) {        this.currentTime = currentTime;    }}
View Code

注意:

  • 需要实现序列化接口,在activemq中的消息需要序列化和反序列化

说明:对应的数据库表

LogMapper

1 package com.xxx.mapper.log; 2  3 import org.apache.ibatis.annotations.Insert; 4  5 import com.xxx.model.log.Log; 6  7 /** 8  * 日志Mapper 9  */10 public interface LogMapper {11     12     /**13      * 这里需要注意的是,current_time是数据库的保留参数,两点注意:14      * 1、最好不要用保留参数做变量名15      * 2、如果不经意间已经用了,那么保留参数需要用``括起来(`-->该符号是英文状态下esc键下边的那个键)16      * @param log17      * @return18      */19     @Insert("INSERT INTO log(operation, `current_time`) VALUES(#{operation},#{currentTime})")20     public int insertLog(Log log);21 22 }
View Code

注意:由于疏忽,在创建数据库的时候,属性"当前时间"取名为"current_time",没注意到该词是MySQL的关键字(即保留字)。

  • 最好不要用关键字做变量名
  • 如果不经意间已经用了,那么保留参数需要用``括起来(`-->该符号是英文状态下esc键下边的那个键)

LogDao:

1 package com.xxx.dao.log; 2  3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Repository; 5  6 import com.xxx.mapper.log.LogMapper; 7 import com.xxx.model.log.Log; 8  9 /**10  * 日志DAO11  */12 @Repository13 public class LogDao {14 15     @Autowired16     private LogMapper logMapper;17     /***************注解*****************/18     public boolean insertLog(Log log){19         return logMapper.insertLog(log)==1?true:false;20     }21     22 }
View Code

LogMessageHandler:MessageHandler的实现类,对接收到的log消息进行具体的操作

1 package com.xxx.service.log; 2  3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.ObjectMessage; 6  7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.stereotype.Component; 9 10 import com.xxx.dao.log.LogDao;11 import com.xxx.model.log.Log;12 import com.xxx.rpc.mq.handler.MessageHandler;13 14 15 /**16  * 日志处理器(更适合放在data层)17  * 因为:18  * 1、data依赖于rpc,而rpc不依赖于data,所以如果该类放在rpc层,并且该类需要用到数据库操作(eg.将日志写入数据库),那么就不好办了19  * 2、rpc层说白了,就是一些rpc工具类,实际上与业务无关,与业务有关的,我们可以抽取到该部分来20  */21 @Component22 public class LogMessageHandler implements MessageHandler {23 24     @Autowired25     private LogDao logDao;26     27     public void handle(Message message) {28         System.out.println(logDao);29         ObjectMessage objMsg = (ObjectMessage)message;30         try {31             Log log = (Log)objMsg.getObject();32             logDao.insertLog(log);//将日志写入数据库33         } catch (JMSException e) {34             e.printStackTrace();35         }36 37     }38 39 }
View Code

说明:

  • 该类相当于一个service
  • 该类放在data模块而不是rpc模块,其接口放在了rpc模块,原因:
    • data依赖于rpc,而rpc不依赖于data,所以如果该类放在rpc层,并且该类需要用到数据库操作(eg.将日志写入数据库),那么就不好办了
    • rpc层说白了,就是一些rpc工具类,实际上与业务无关,与业务有关的,我们可以抽取到该部分来

AdminService:

1     /** 2      * 测试activeMQ 3      *  4      * 消息生产者做的事:(部署在服务器A) 5      * 1)添加一个用户 6      * 2)用户添加成功后, 7      * 2.1)创建一个Log(日志类)实例 8      * 2.2)将该日志实例作为消息发送给消息队列 9      * 10      * 消息消费者做的事:(部署在服务器B)11      * 1)从队列接收消息12      * 2)用日志处理器对消息进行操作(将该消息写入数据库)13      */14     public boolean register(Admin admin) {15         boolean isRegisterSuccess = adminDao.register(admin);16         if(isRegisterSuccess) {17             Log log = new Log();18             log.setOperation("增加一个用户");19             log.setCurrentTime(DateUtil.getCurrentTime());20             21             ActiveMQP2PUtil.sendMessage(log);//将消息发送到消息服务器(即activeMQ服务器),不需要等待消息处理结果,直接向下执行22         }23         return isRegisterSuccess;24     }
View Code

说明:

  • 该类只修改了以上方法
  • 将消息发送到消息服务器(即activeMQ服务器),不需要等待消息处理结果,直接向下执行(体现异步

 

4.3.5、ssmm0-rpcWeb

pom.xml

1 
2
4 5
4.0.0
6 7
8
9
com.xxx
10
ssmm0
11
1.0-SNAPSHOT
12
13 14
com.xxx.ssmm0
15
ssmm0-rpcWeb
16 17
ssmm0-rpcWeb
18
war
19 20
21
22
23
24
com.xxx.ssmm0
25
ssmm0-data
26
1.0-SNAPSHOT
27
28
29
30
org.springframework
31
spring-web
32
33
34
org.springframework
35
spring-webmvc
36
37
38
View Code

spring.xml

View Code

web.xml

dispatcherServlet
org.springframework.web.servlet.DispatcherServlet
contextConfigLocation
classpath:spring.xml
1
dispatcherServlet
/
encodingFilter
org.springframework.web.filter.CharacterEncodingFilter
encoding
UTF-8
forceEncoding
true
encodingFilter
/*
/index.jsp
View Code

MessageReceiver:死循环从队列接收消息并将消息传给消息处理器实现类(LogMessageHandler)处理

1 package com.xxx.web.mq; 2  3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6  7 import com.xxx.rpc.mq.util.ActiveMQP2PUtil; 8 import com.xxx.service.log.LogMessageHandler; 9 10 /**11  * 用于接收消息的测试类12  */13 @Controller14 @RequestMapping("/mq")15 public class MessageReceiver {16     17     @Autowired18     private LogMessageHandler handler;19     20     @RequestMapping("/receive")21     public void receiveMessage() {22         ActiveMQP2PUtil.receiveMessage(handler);23     }24     25 }
View Code
  • 该类相当于一个controller

 

5、测试

5.1、安装activemq

1)下载解压"apache-activemq-5.5.0-bin.zip",之后,若是32bit机器,进入"E:\activemq-5.5.0\bin\win32"下,双击"activemq.bat"即可。(当然,如果双击无法启动,可能有其他进程占用61616端口,查一下是哪一个进程,然后去服务中关掉即可)

2)启动服务后,在浏览器输入"http://127.0.0.1:8161/admin/queues.jsp",看到队列页面,则安装并启动成功,该页面是一个队列消息的监控页面,包括

  • 队列名称:Name
  • 当下有多少消息在队列中等待消费:Number Of Pending Messages
  • 有几个消费者:Number Of Consumers
  • 从启动activemq服务到现在一共入队了多少消息:Messages Enqueued
  • 从启动activemq服务到现在一共出队了多少消息:Messages Dequeued
  • Number Of Pending Messages + Messages Dequeued = Messages Enqueued

 

5.2、运行ssmm0-userManagement

浏览器执行"http://localhost:8080/admin/register?username=canglang25&password=1457890"

注意:这里使用了8080端口

 

5.3、运行ssmm0-rpcWeb

浏览器执行"http://localhost:8081/mq/receive"

注意:

  • 这里使用了8081端口
  • 执行该URL后,浏览器会一直在转圈(即一直在等待接收消息),直到关闭jetty服务器

 

说明:jetty在不同的端口下可以同时启动,在同一端口下后边启动的服务会覆盖之前启动的服务

 

6、总结

  • 消息队列入门简单,想要完全掌握很难
  • 关于git的基本使用查看《progit中文版》

转载地址:http://cloxo.baihongyu.com/

你可能感兴趣的文章
转载:Objective-C中的 instancetype 和 id 关键字
查看>>
【Linux】Ubuntu 代理配置
查看>>
GIT 使用
查看>>
mac的safari浏览器调试h5
查看>>
如何通过PHP判断年份是否是闰年----两种方法
查看>>
基于canvas的仪表盘效果
查看>>
JavaScript原生实现《贪吃蛇》
查看>>
js函数式编程
查看>>
APP手工测试01-app专项测试要点-测试、开发环境-敏捷开发
查看>>
1、创业维艰 第一章-从革命者到风险资本家
查看>>
Android类库常用类库一览
查看>>
分布式文件系统HDFS 练习
查看>>
JavaScript闭包理解
查看>>
第一个python程序
查看>>
1-4-之字形打印矩阵
查看>>
nodejs封装http请求
查看>>
【OpenCV】直线检测
查看>>
线段树
查看>>
PHP 构造函数的重载
查看>>
UBUNTU 添加删除用户
查看>>