您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

springcloud微服务架构开发实战:分布式消息总线

时间:2020-09-29 09:27:40  来源:  作者:

消息总线的定义

前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制。

在3.6节中也详细介绍了消息通信的实现方式。消息总线就是一种基于消息的通信机制。

消息总线是一种通信工具,可以在机器之间互相传输消息、文件等,它扮演着—种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向。发送端只需要向消息总线发出消息,而不用管消息被如何转发。

Spring Cloud Bus通过轻量消息代理连接各个分布的节点。管理和传播所有分布式项目中的消息,本质是利用了MQ的广播机制在分布式的系统中传播消息,目前常用的有Kafka和RabbitMQ等。

springcloud微服务架构开发实战:分布式消息总线

 

消息总线常见的设计模式

在消息总线中,常见的设计模式有点对点模式及订阅/发布模式。

1.点对点(P2P)

点对点模式包含三个角色。

  • 消息队列( Queue )。
  • 生产者( Producer ) 。
  • 消费者(Consumer )。

点对点模式中的每个消息都被发送到一个特定的队列,消费者从队列中获取消息。队列保留着消息,直到它们被消费或超时。图16-1展示了点对点模式的运行流程图。

springcloud微服务架构开发实战:分布式消息总线

 

点对点模式具有以下特点。

  • 每个消息只有一个消费者,即消息一旦被消费,就不在消息队列中了。
  • 生产者和消费者之间在时间上没有依赖性,也就是说当生产者发送了消息之后,不管消费者有没有正在运行,都不会影响到消息被发送到队列。
  • 消费者在成功接收消息之后需向队列应答成功,这样消息队列才能知道消息是否被成功消费。

2.订阅/发布(PublSub )

订阅/发布模式包含三个角色。

  • 主题(Topic )。
  • 发布者(Publisher )。
  • 订阅者(Subscriber )。

订阅/发布模式中,多个发布者将消息发送到对应的主题,系统将这些消息传递给多个订阅者。图16-2展示了订阅/发布模式的运行流程图。

springcloud微服务架构开发实战:分布式消息总线

 

订阅/发布模式具有以下特点。

  • 每个消息可以有多个消费者。
  • 主题可以被认为是消息的传输中介,发布者发布消息到主题,订阅者从主题订阅消息。
  • 主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

消息总线的意义

在微服务架构中,经常会使用REST服务作为服务间的通信机制。REST以其轻量、简单、易理解而著称,但这种通信机制也并非适合所有的场景。例如,在一些高并发、高可靠、实时的场景,则需要消息总线来帮忙。

概括起来,消息总线具有以下几个优点。

1.实时性高

与REST 服务的“请求—响应”模式不同,消息总线的实时性非常高。使用了消息总线,生产者一方只要把消息往队列里一扔,就可以立马返回,响应用户了。无须等待处理结果,实现了异步处理。

同时,对于消费者而言,消费者对于消息的到达感知也非常及时。消费者会对消息总线进行监听,只要有消息进入队列,就可以马上得到通知。这种优势是REST 服务所不能具备的。在REST服务中,要想及时获取到更新通知,就不得不进行轮询。这往往非常低效。

2生产者与消费者解耦

在消息总线中,生产者负责将消息发送到队列中,而消费者把消息从队列中取出来。生产者无须等待消费者启动,消费者也无须关心生产者是否已经处于就绪状态。所以,这种模式能很好地实现生产者与消费者的解耦。

然而,如果是在REST服务中,服务调用方必须等待服务的提供方准备好了才能调用,否则就会调用失败。

3.故障率低

消息总线拥有对其他通信方式更高的成功率。一方面,生产者与消费者之间实现了解耦,所以,生产者与消费者之间不存在强关联关系,即便是生产者或消费者任意一方掉线了,也不会影响消息最终的送达;另一方面,消息总线往往会结合数据库来实现消息的持久化,并设置状态标识。只有消息消费成功,才会去修改状态标识。

消息总线同时还承担着缓冲区的作用。大量业务消息首先会进入消息队列进行缓存,消息的消费者可以根据自己的处理能力来进行消费,所以不管消息的数据量有多少,都不会对消费者造成冲击。

消息总线常见的实现方式

《分布式系统常用技术及案例分析》一书列举了非常多的流行的、开源的分布式消息服务,如Apache ActiveMQ、RabbitMQ、Apache RocketMQ、Apache Kafka等。这些消息中间件都实现了点对点模式及订阅/发布模式等常见的消息模式。

以下例子演示的是使用ActiveMQ实现生产者—消费者的JAVA实现方式。

生产者程序Producer.java:

public class Producer{
private static final Logger LOGGER=LoggerFactory.getLogger (Producer.
class);
private static final string BROKER_URE = ActiveMQConnection.DEFAULT_
BROKER URL;private static final String SUBJECT= "waylau-queue";
public static void main (String[] args) throws JMSException f
//初始化连接工厂
ConnectionFactory connectionFactory= new ActiveMQConnection
Factory(BROKER_URL);
//获得连接
Connection conn = connectionFactory.createConnection();
//启动连接
conn.start(;
//创建session,第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
Session session = conn.createSession(false, Session.AUTO_
ACKNOWLEDGE);
//创建队列
Destination dest = session.createQueue(SUBJECT);
//createTopic方法用来创建Topic
//session.createTopic ("TOPIC");
//通过session 可以创建消息的生产者
MessageProducer producer = session.createProducer(dest);
for(int i=0;i<100;i++){
//初始化一个MQ消息
TextMessage message= session.createTextMessage ("Welcome to
waylau.com"+i);
//发送消息
producer. send(message);
LOGGER.info("send message {}",i);
//关闭 MQ 连接
conn.close();
}
}

消费者程序Consumer.java:

public class Consumer implements MessageListener {
private static finalLogger LOGGER = LoggerFactory.getLogger
(Consumer.class);
private static final String BROKER_URL = ActiveMQConnection.DEFAULT
BROKER URL;private static final string SUBJECT = "waylau-queue";
public static void main(String[] args) throws JMSExceptionf
//初始化 ConnectionFactory
ConnectionFactory connectionFactory =new ActiveMOConnection
Factory(BROKER_URL);
//创建Mo连接
Connection conn = connectionFactory.createConnection();
//启动连接
conn .start(;
//创建会话
Session session= conn.createSession (false,Session.AUTO_
ACKNOWLEDGE);
//通过会话创建目标
Destination dest = session.createQueue(SUBJECT);
//创建 MO 消息的消费者
MessageConsumer consumer = session.createConsumer(dest);
//初始化 MessageListener
consumer me=newConsumer();
//给消费者设定监听对象
consumer .setMessageListener (me);
@override
public void onMessage(Message message){
TextMessage txtMessage =(TextMessage)message;
try{
LOGGER.info("get message " + txtMessage.getText());
}catch (JMSException e) {
LOGGER.error("error {}",e));
}
}

执行命令来启动ActiveMQa:

bin/activemg start

生产者执行如下命令:

mvn clean compile exec:java -Dexec.mainClass=com.waylau.activemq.ProducerApp

输出如下。

20:12:10.807 [ActiveMQ Task-1]INEO org.apache.activemq.transport.
failover.FailoverTransport- Successfully connected to tcp://localhost:61616
20:12:10.928[main] INFOcom.waylau.activemq.Producer- send message 0
20:12:10.963 [main] INPO com.waylau.activemq.Producer- send message 1
20:12:10.992 [main] INFO com.waylau.activemq.Producer - send message 2
20:12:11.019[main] INFO com.waylau.activemq.Producer - send message 3
20:12:11.036[main] INFOcom.waylau.activemq.Producer- send message 4
20:12:11.058 [main] INFO com.waylau.activemq.producer -send message 5
20:12:11.085[main] INFOcom.waylau.activemq.Producer - send message6
20:12:11.113 [main] INFOcom.waylau.activemq.Producer - send message 7
20:12:11.141[main] INFOcom.waylau.activemq.Producer - send message 8
20:12:11.191 [main] INFO com.waylau.activemq.Producer- send message 9

消费者执行如下命令:

mvn clean compile exec:java-Dexec.mainClass=com.waylau.activemq. ConsumerApp

输出如下。

20:12:05.262[ActiveMQ Task-1] INFO org.apache.activemq.transport.
failover.FailoverTransport- Successfully connected to tcp://localhost:
61616
20:12:10.875 [ActiveMQ Session Task-1] INEOcom.waylau.activemg.Consumer -
get message welcome to waylau.com o
20:12:10.939 [ActiveMQ Session Task-1]INFO com.waylau.activemq.Consumer-
get message welcome to waylau.com 1
20:12:10.965 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 2
20:12:10.994 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
get message Welcome to waylau .com 3
20:12:11.020 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
get message Welcome to waylau.com 4
20:12:11.038 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 5
20:12:11.059 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
get message Welcome to waylau.com6
20:12:11.086[ActiveMQ Session Task-1] INEO com.waylau.activemq. Consumer-
get message welcome to waylau.com 7
20:12:11.114[ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
get message Welcome to waylau.com 8
20:12:11.142 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
get message Welcome to waylau.com 9

上述例子的源码,可以在 https://github.com/waylau/distributed-systems-technologies-and-cas-es-analysis网址的samples目录下找到。

Spring Cloud Bus 实现消息总线

Spring Cloud Bus通过轻量消息代理连接各个分布的节点,管理和传播所有分布式项目中的消息,本质是利用了消息中间件的广播机制在分布式的系统中传播消息。

目前Spring Cloud Bus所支持的常用的消息中间件有RabbitMQ和Kafka,使用时,只须添加spring-cloud-starter-bus-amqp或spring-cloud-starter-bus-kafka依赖即可。同时,需要确保相关的消息中间件连接配置正确。

下面是使用RabbitMQ作为Spring Cloud Bus 的application.yml配置情况。

spring:
rabbitmg:
host: mybroker .com
port:5672
username:user
password:secret

其中,spring.rabbitmq.host配置项用于指定RabbitMQ的主机位置。

Spring Cloud Bus支持消息发送到所有已监听的节点,或者某个特定服务的所有节点。同时,Spring Cloud Bus提供了一些HTTP接口/bus/*,用于触发Spring Cloud Bus内部的事件。

目前,Spring Cloud Bus主要有以下两个接口实现。

  • ./bus/env:发送键值对去更新每个节点的Spring Environment。
  • ./bus/refresh:重新加载每一个应用的配置信息,类似于/refresh。

所以,Spring Cloud Bus结合Spring Cloud Config 的使用,可以实现配置文件的自动更新。



Tags:springcloud   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
1、Feign简介Feign是一种声明式、模板化的HTTP客户端。使用Feign,可以做到声明式调用。尽管Feign目前已经不再迭代,处于维护状态,但是Feign仍然是目前使用最广泛的远程调用框架...【详细内容】
2021-06-29  Tags: springcloud  点击:(113)  评论:(0)  加入收藏
Jenkins 是目前最常用的持续集成工具,拥有近50%的市场份额,他还是很多技术团队的第一个使用的自动化工具。由此可见他的重要性!这份Jenkins宝典从入门介绍到结合Docker+SpringC...【详细内容】
2021-06-09  Tags: springcloud  点击:(145)  评论:(0)  加入收藏
环境:Spring Boot 2.3.9 + Spring Cloud Hoxton.SR8服务发现注册请参考《SpringCloud Zookeeper服务发现及负载均衡 》zookeeper安装配置请参考《Kafka(zookeeper)环境配置超级...【详细内容】
2021-04-06  Tags: springcloud  点击:(276)  评论:(0)  加入收藏
本文将详细分析SpringCloud Gateway是如何实现的。架构SpringCloud Gateway(下面简称SG)基于SpringWebFlux,整体架构如下图所示: SG定义了几个概念: 路由(Route):路由是网关的基本...【详细内容】
2020-12-18  Tags: springcloud  点击:(583)  评论:(0)  加入收藏
前言当我们的网关Gateway程序开发完成之后,需要部署到生产环境,这个时候你的程序不能是单点运行的,肯定是多节点启动(独立部署或者docker等容器部署),防止单节点故障导致整个服...【详细内容】
2020-10-19  Tags: springcloud  点击:(101)  评论:(0)  加入收藏
消息总线的定义前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制。在3.6节中也详细介绍了消息通信的实现方式。消息总线就是一种基于消息的通...【详细内容】
2020-09-29  Tags: springcloud  点击:(140)  评论:(0)  加入收藏
1.分布式开发简介 分布式开发的思考点:如何可以让代码更安全;如何有效的通讯;在进行分布式处理的时候如何进行程序功能划分; web集群:考虑多用户并发访问的处理速度。业务中心:在...【详细内容】
2020-09-14  Tags: springcloud  点击:(52)  评论:(0)  加入收藏
常见微服务的消费者本节就常见的微服务的消费者进行介绍。在Java领域比较常用的消费者框架主要有HttpClient、Ribbon、Feign 等。 Apache HttpClientApache HttpClient是Apa...【详细内容】
2020-09-11  Tags: springcloud  点击:(129)  评论:(0)  加入收藏
使用SpringCloud技术栈搭建微服务集群,可以选择的组件比较多,由于有些组件已经闭源或停更,这里主要选用spring-cloud-alibaba作为我们的技术栈。 服务注册与发现: nacos-discove...【详细内容】
2020-08-19  Tags: springcloud  点击:(204)  评论:(0)  加入收藏
很多程序员每项技术单独拿出来有可能很厉害,例如:springcloud、springboot、redis、nginx、mysql、rabbitMq等,但是普遍缺乏将所有的这些技术整合到一起,从前端到后端,从开发到部...【详细内容】
2020-07-08  Tags: springcloud  点击:(111)  评论:(0)  加入收藏
▌简易百科推荐
为了构建高并发、高可用的系统架构,压测、容量预估必不可少,在发现系统瓶颈后,需要有针对性地扩容、优化。结合楼主的经验和知识,本文做一个简单的总结,欢迎探讨。1、QPS保障目标...【详细内容】
2021-12-27  大数据架构师    Tags:架构   点击:(3)  评论:(0)  加入收藏
前言 单片机开发中,我们往往首先接触裸机系统,然后到RTOS,那么它们的软件架构是什么?这是我们开发人员必须认真考虑的问题。在实际项目中,首先选择软件架构是非常重要的,接下来我...【详细内容】
2021-12-23  正点原子原子哥    Tags:架构   点击:(7)  评论:(0)  加入收藏
现有数据架构难以支撑现代化应用的实现。 随着云计算产业的快速崛起,带动着各行各业开始自己的基于云的业务创新和信息架构现代化,云计算的可靠性、灵活性、按需计费的高性价...【详细内容】
2021-12-22    CSDN  Tags:数据架构   点击:(10)  评论:(0)  加入收藏
▶ 企业级项目结构封装释义 如果你刚毕业,作为Java新手程序员进入一家企业,拿到代码之后,你有什么感觉呢?如果你没有听过多模块、分布式这类的概念,那么多半会傻眼。为什么一个项...【详细内容】
2021-12-20  蜗牛学苑    Tags:微服务   点击:(8)  评论:(0)  加入收藏
我是一名程序员关注我们吧,我们会多多分享技术和资源。进来的朋友,可以多了解下青锋的产品,已开源多个产品的架构版本。Thymeleaf版(开源)1、采用技术: springboot、layui、Thymel...【详细内容】
2021-12-14  青锋爱编程    Tags:后台架构   点击:(20)  评论:(0)  加入收藏
在了解连接池之前,我们需要对长、短链接建立初步认识。我们都知道,网络通信大部分都是基于TCP/IP协议,数据传输之前,双方通过“三次握手”建立连接,当数据传输完成之后,又通过“四次挥手”释放连接,以下是“三次握手”与“四...【详细内容】
2021-12-14  架构即人生    Tags:连接池   点击:(16)  评论:(0)  加入收藏
随着移动互联网技术的快速发展,在新业务、新领域、新场景的驱动下,基于传统大型机的服务部署方式,不仅难以适应快速增长的业务需求,而且持续耗费高昂的成本,从而使得各大生产厂商...【详细内容】
2021-12-08  架构驿站    Tags:分布式系统   点击:(23)  评论:(0)  加入收藏
本系列为 Netty 学习笔记,本篇介绍总结Java NIO 网络编程。Netty 作为一个异步的、事件驱动的网络应用程序框架,也是基于NIO的客户、服务器端的编程框架。其对 Java NIO 底层...【详细内容】
2021-12-07  大数据架构师    Tags:Netty   点击:(16)  评论:(0)  加入收藏
前面谈过很多关于数字化转型,云原生,微服务方面的文章。虽然自己一直做大集团的SOA集成平台咨询规划和建设项目,但是当前传统企业数字化转型,国产化和自主可控,云原生,微服务是不...【详细内容】
2021-12-06  人月聊IT    Tags:架构   点击:(23)  评论:(0)  加入收藏
微服务看似是完美的解决方案。从理论上来说,微服务提高了开发速度,而且还可以单独扩展应用的某个部分。但实际上,微服务带有一定的隐形成本。我认为,没有亲自动手构建微服务的经历,就无法真正了解其复杂性。...【详细内容】
2021-11-26  GreekDataGuy  CSDN  Tags:单体应用   点击:(35)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条