消息发布订阅
简介
当前 Y9 大多数系统均基于微服务架构进行设计与开发,按照业务逻辑的不同被拆分成一组可以独立开发与部署的微服务,服务之间通过 HTTP 调用和 RPC 这两种轻量级通信方式来实现相互通信。但这两种通信方式都属于同步点对点通信的范畴,会造成服务之间存在高耦合、请求易阻塞以及请求无缓冲等缺陷。随着服务规模的不断扩张以及服务请求量的急剧增加,这些缺陷也被不断地放大从而影响着整个系统的性能和稳定性,例如组织架构的人员同步和系统操作日志的存储等,因此必须采用松耦合的、支持系统动态加入和退出的数据交换手段,而许多消息中间件的发布订阅模式正式解决这个问题的框架,如 redis、kafka、mqtt、activeMQ、rabbitMQ、Hazelcast 等都提供 publish/subscribe 通讯模式,消息发布者(producer)、消息中间件(broker)、消息订阅者(consumer)构成了三者之间的关系:
与 broker 连接的是客户端(client),既可以充当发布者、也可以充当消费者角色,他们通过主题(topic)建立传输通道,发布者往某个 topic 发布消息(消息格式自己定义,只要消费者能够理解即可,因此一般采用JSON格式传输消息),消费者监听同一个 topic,在监听器里接收到数据,然后在自己的系统中保存这些数据,以便后续利用。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
activeMQ、rabbitMQ由于支持多种协议(jms、amqp、stomp、mqtt、openwire等),性能不太好,因此没有采用。redis基于内存操作,速度最快,kafka基于文件存储消息,吞吐量大,适合传输类似日志类的大批量信息,mqtt主要用于物联网的数据传输,协议简单,便于在物联设备上实现,故Y9平台主要使用kafka作为消息中间件。
功能引入
Y9提供了消息发布模块:
risenet-y9boot-starter-publish-kafka
及消息订阅模块:
risenet-y9boot-starter-listener-kafka
消息发布模块主要做的工作是将 topic 消息通过 PUBLISH 命令发送至消息中间件中,其中 risenet-y9boot-starter-publish-kafka 组件主要实现 risenet-y9boot-common-util 组件中的接口类:Y9PublishService。
统一封装了四种消息类型:
1、Y9MessageOrg:组织身份消息,其topic标题为:y9_org_event,该消息类型主要为组织身份相关的消息类,例如:新增人员、修改人员等消息。
2、Y9MessageCommon:公共类消息,其topic标题为:y9_common_event,该消息类型主要为公共类型的消息,例如:租户数据源更新消息、application 属性值刷新。
3、Y9MessageTask:办件类消息,其topic标题为:y9_task_event,该消息类型主要为工作流组件发布的消息,例如新增办件、办结件等。
maven pom.xml 修改
(1)修改pom.xml
<repositories>
<repository>
<id>y9-internet-repo</id>
<url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<!-- <updatePolicy>always</updatePolicy> -->
<!-- always,daily(default),interval:XXX,never -->
</snapshots>
</repository>
</repositories>
(2)将 risenet-y9boot-starter-publish-kafka 添加到 pom.xml 的依赖列表里.
<dependency>
<groupId>net.risesoft</groupId>
<artifactId>risenet-y9boot-starter-publish-kafka</artifactId>
<version>[最新版本]</version>
</dependency>
修改属性文件 application.properties
spring.kafka.producer.bootstrap-servers=localhost:9092
#启用消息发布
y9.feature.publish.kafka.enabled=true
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
y9:
feature:
publish:
kafka:
enabled: true
使用示例
package com.example;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import net.risesoft.enums.platform.OrgTypeEnum;
import net.risesoft.model.platform.OrgUnit;
import net.risesoft.model.platform.Organization;
import net.risesoft.y9.Y9LoginUserHolder;
import net.risesoft.y9.pubsub.Y9PublishService;
import net.risesoft.y9.pubsub.constant.Y9OrgEventTypeConst;
import net.risesoft.y9.pubsub.message.Y9MessageOrg;
@RestController
@RequestMapping("/")
public class MainController {
@Resource(name = "y9PublishService")
private Y9PublishService y9PublishService;
/**
* 发布新增事件
*/
@GetMapping(value = "/publish")
public String publishSyncDataSourceEvent() {
Organization org = new Organization();
org.setId("1");
org.setName("1");
org.setOrgType(OrgTypeEnum.ORGANIZATION);
org.setTabIndex(1);
Y9MessageOrg<OrgUnit> msg =
new Y9MessageOrg<>(org, Y9OrgEventTypeConst.ORGANIZATION_ADD, Y9LoginUserHolder.getTenantId());
y9PublishService.publishMessageOrg(msg);
return "publish success";
}
}
示例代码
码云地址:https://gitee.com/risesoft-y9/y9-core
该示例代码路径地址:https://gitee.com/risesoft-y9/y9-core/tree/main/y9-digitalbase-example/risenet-y9demo-sync-kafka