消息发布订阅
简介
当前 数字底座大多数系统均基于微服务架构进行设计与开发,按照业务逻辑的不同被拆分成一组可以独立开发与部署的微服务,服务之间通过 HTTP 调用和 RPC 这两种轻量级通信方式来实现相互通信。但这两种通信方式都属于同步点对点通信的范畴,会造成服务之间存在高耦合、请求易阻塞以及请求无缓冲等缺陷。随着服务规模的不断扩张以及服务请求量的急剧增加,这些缺陷也被不断地放大从而影响着整个系统的性能和稳定性,例如组织架构的人员同步和系统操作日志的存储等,因此必须采用松耦合的、支持系统动态加入和退出的数据交换手段,而许多消息中间件的发布订阅模式正式解决这个问题的框架,如 redis、kafka、mqtt、activeMQ、rabbitMQ、Hazelcast 等都提供 publish/subscribe 通讯模式,消息发布者(producer)、消息中间件(broker)、消息订阅者(consumer)构成了三者之间的关系:
与 broker 连接的是客户端(client),既可以充当发布者、也可以充当消费者角色,他们通过主题(topic)建立传输通道,发布者往某个 topic 发布消息(消息格式自己定义,只要消费者能够理解即可,因此一般采用JSON格式传输消息),消费者监听同一个 topic,在监听器里接收到数据,然后在自己的系统中保存这些数据,以便后续利用。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

activeMQ、rabbitMQ由于支持多种协议(jms、amqp、stomp、mqtt、openwire等),性能不太好,因此没有采用。redis基于内存操作,速度最快,kafka基于文件存储消息,吞吐量大,适合传输类似日志类的大批量信息,mqtt主要用于物联网的数据传输,协议简单,便于在物联设备上实现,故数字底座主要使用kafka作为消息中间件。
功能引入
消息发布
Y9提供了消息发布模块 risenet-y9boot-starter-publish-kafka 及消息订阅模块 risenet-y9boot-starter-listener-kafka
消息发布模块主要做的工作是将 topic 消息通过 PUBLISH 命令发送至消息中间件中,其中 risenet-y9boot-starter-publish-kafka 组件主要实现 risenet-y9boot-common-util 组件中的接口类:Y9PublishService。
添加依赖
将 risenet-y9boot-starter-publish-kafka 添加到 pom.xml 的依赖列表里
<dependency>
<groupId>net.risesoft</groupId>
<artifactId>risenet-y9boot-starter-publish-kafka</artifactId>
<version>[最新正式版本]</version>
</dependency>如需使用 SNAPSHOT 版本,且 maven 没有 POM 继承链至数字底座的 y9-digitalbase-parent 则还需将有生的私服仓库地址添加到 pom.xml 文件中。
<repositories>
<repository>
<id>y9-internet-repo</id>
<url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<!-- <updatePolicy>always</updatePolicy> -->
<!-- always,daily(default),interval:XXX,never -->
</snapshots>
</repository>
</repositories>修改配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
y9:
feature:
publish:
kafka:
enabled: truespring.kafka.bootstrap-servers=localhost:9092
#启用消息发布
y9.feature.publish.kafka.enabled=true示例代码
@RestController
@RequiredArgsConstructor
@Slf4j
public class PublishMessageController {
private static final String TEST_TOPIC = "test-topic";
private final Y9PublishService publishService;
@RequestMapping("/publishMessage")
public void publishMessage() {
String message = "test-message";
publishService.publishObject(message, TEST_TOPIC);
LOGGER.info("往 kafka 的主题 [{}] 发布消息 [{}]", TEST_TOPIC, message);
}
}消息订阅
risenet-y9boot-starter-listener-kafka 引入了 spring-kafka 依赖,可以通过 @KafkaListener 注解和配置 MessageListenerContainer 来接收 kafka 的 topic 的消息。
starter 中集成了组织节点同步的监听器,监听方法中将组织身份的消息发布为 spring 的 Y9EventOrg 事件,引入工程即可直接利用 spring 的事件机制处理消息,无需编写 kafka 的相关逻辑代码了。这个监听器可通过开关打开。
添加依赖
将 risenet-y9boot-starter-listener-kafka 添加到 pom.xml 的依赖列表里
<dependency>
<groupId>net.risesoft</groupId>
<artifactId>risenet-y9boot-starter-listener-kafka</artifactId>
<version>[最新正式版本]</version>
</dependency>如需使用 SNAPSHOT 版本,且 maven 没有 POM 继承链至数字底座的 y9-digitalbase-parent 则还需将有生的私服仓库地址添加到 pom.xml 文件中。
<repositories>
<repository>
<id>y9-internet-repo</id>
<url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<!-- <updatePolicy>always</updatePolicy> -->
<!-- always,daily(default),interval:XXX,never -->
</snapshots>
</repository>
</repositories>修改配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: y9demo-sync
y9:
systemName: y9demo-sync
feature:
listener:
kafka:
enabled: true
message-org-enabled: true # 启用组织节点同步spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=y9demo-sync
y9.systemName=y9demo-sync
y9.feature.listener.kafka.enabled=true
y9.feature.listener.kafka.message-org-enabled=true # 启用组织节点同步示例代码
- 订阅自定义主题消息
@RestController
@RequiredArgsConstructor
@Slf4j
public class SubscribeMessageController {
private static final String TEST_TOPIC = "test-topic";
@KafkaListener(topics = TEST_TOPIC)
public void listenMessage(String message) {
LOGGER.info("收到订阅的 kafka 主题 [{}] 的消息 [{}]", TEST_TOPIC, message);
}
}- 处理组织节点同步消息
@Component
public class Y9OrgListener implements ApplicationListener<Y9EventOrg> {
protected final Logger log = LoggerFactory.getLogger(Y9OrgListener.class);
public Y9OrgListener() {
log.info("init SpringEventListener...");
}
@Override
public void onApplicationEvent(Y9EventOrg event) {
log.info(event.getEventType());
if (Y9OrgEventTypeConst.ORGANIZATION_ADD.equals(event.getEventType())) {
Organization org = (Organization)event.getOrgObj();
log.info("--------------------------添加组织机构-------------------------------");
log.info("--------------------" + org.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.DEPARTMENT_ADD.equals(event.getEventType())) {
Department newDept = (Department)event.getOrgObj();
log.info("--------------------------添加部门-------------------------------");
log.info("--------------------" + newDept.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_ADD.equals(event.getEventType())) {
Group group = (Group)event.getOrgObj();
log.info("--------------------------添加用户组-------------------------------");
log.info("--------------------" + group.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_ADD.equals(event.getEventType())) {
Position position = (Position)event.getOrgObj();
log.info("--------------------------添加岗位-------------------------------");
log.info("--------------------" + position.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.PERSON_ADD.equals(event.getEventType())) {
Person person = (Person)event.getOrgObj();
log.info("--------------------------添加人员-------------------------------");
log.info("--------------------" + person.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.ORGANIZATION_UPDATE.equals(event.getEventType())) {
Organization org = (Organization)event.getOrgObj();
log.info("--------------------------修改组织机构-------------------------------");
log.info("--------------------" + org.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.DEPARTMENT_UPDATE.equals(event.getEventType())) {
Department newDept = (Department)event.getOrgObj();
log.info("--------------------------修改部门-------------------------------");
log.info("--------------------" + newDept.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_UPDATE.equals(event.getEventType())) {
Group group = (Group)event.getOrgObj();
log.info("--------------------------修改用户组-------------------------------");
log.info("--------------------" + group.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_UPDATE.equals(event.getEventType())) {
Position position = (Position)event.getOrgObj();
log.info("--------------------------修改岗位-------------------------------");
log.info("--------------------" + position.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.PERSON_UPDATE.equals(event.getEventType())) {
Person person = (Person)event.getOrgObj();
log.info("--------------------------修改人员-------------------------------");
log.info("--------------------" + person.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.ORGANIZATION_DELETE.equals(event.getEventType())) {
Organization org = (Organization)event.getOrgObj();
log.info("--------------------------删除组织机构-------------------------------");
log.info("--------------------" + org.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.DEPARTMENT_DELETE.equals(event.getEventType())) {
Department newDept = (Department)event.getOrgObj();
log.info("--------------------------删除部门-------------------------------");
log.info("--------------------" + newDept.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_DELETE.equals(event.getEventType())) {
Group group = (Group)event.getOrgObj();
log.info("--------------------------删除用户组-------------------------------");
log.info("--------------------" + group.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_DELETE.equals(event.getEventType())) {
Position position = (Position)event.getOrgObj();
log.info("--------------------------删除岗位-------------------------------");
log.info("--------------------" + position.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.PERSON_DELETE.equals(event.getEventType())) {
Person person = (Person)event.getOrgObj();
log.info("--------------------------删除人员-------------------------------");
log.info("--------------------" + person.getName() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_ADD_PERSON.equals(event.getEventType())) {
PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
log.info("--------------------------用户组添加人员-------------------------------");
log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_REMOVE_PERSON.equals(event.getEventType())) {
PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
log.info("--------------------------用户组删除人员-------------------------------");
log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.GROUP_ORDER.equals(event.getEventType())) {
PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
log.info("--------------------------用户组-人员排序-------------------------------");
log.info("--------------------" + personsGroups.getGroupOrder() + "---------------------");
log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_ADD_PERSON.equals(event.getEventType())) {
PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
log.info("--------------------------岗位添加人员-------------------------------");
log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_REMOVE_PERSON.equals(event.getEventType())) {
PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
log.info("--------------------------岗位删除人员-------------------------------");
log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.POSITION_ORDER.equals(event.getEventType())) {
PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
log.info("--------------------------岗位-人员排序-------------------------------");
log.info("--------------------" + personsPositions.getPositionOrder() + "---------------------");
log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
} else if (Y9OrgEventTypeConst.SYNC.equals(event.getEventType())) {
SyncOrgUnits syncOrgUnits = (SyncOrgUnits)event.getOrgObj();
OrgTypeEnum orgType = syncOrgUnits.getOrgTypeEnum();
switch (orgType) {
case ORGANIZATION:
log.info("--------------------------同步-组织机构-------------------------------");
OrganizationUtil.syncOrganization(syncOrgUnits);
break;
case DEPARTMENT:
log.info("--------------------------同步-部门-------------------------------");
DepartmentUtil.syncDepartment(syncOrgUnits);
break;
case PERSON:
log.info("--------------------------同步-人员-------------------------------");
PersonUtil.syncPerson(syncOrgUnits.getPersons().get(0));
break;
case GROUP:
log.info("--------------------------同步-用户组-------------------------------");
GroupUtil.syncGroup(syncOrgUnits);
break;
case POSITION:
log.info("--------------------------同步-岗位-------------------------------");
PositionUtil.syncPosition(syncOrgUnits);
break;
default:
break;
}
}
}
}示例项目
示例项目地址:https://gitee.com/risesoft-y9/y9-core/tree/main/y9-digitalbase-example/risenet-y9demo-sync-kafka