Skip to content

消息发布订阅

简介

当前 Y9 大多数系统均基于微服务架构进行设计与开发,按照业务逻辑的不同被拆分成一组可以独立开发与部署的微服务,服务之间通过 HTTP 调用和 RPC 这两种轻量级通信方式来实现相互通信。但这两种通信方式都属于同步点对点通信的范畴,会造成服务之间存在高耦合、请求易阻塞以及请求无缓冲等缺陷。随着服务规模的不断扩张以及服务请求量的急剧增加,这些缺陷也被不断地放大从而影响着整个系统的性能和稳定性,例如组织架构的人员同步和系统操作日志的存储等,因此必须采用松耦合的、支持系统动态加入和退出的数据交换手段,而许多消息中间件的发布订阅模式正式解决这个问题的框架,如 redis、kafka、mqtt、activeMQ、rabbitMQ、Hazelcast 等都提供 publish/subscribe 通讯模式,消息发布者(producer)、消息中间件(broker)、消息订阅者(consumer)构成了三者之间的关系:

1566979135009 与 broker 连接的是客户端(client),既可以充当发布者、也可以充当消费者角色,他们通过主题(topic)建立传输通道,发布者往某个 topic 发布消息(消息格式自己定义,只要消费者能够理解即可,因此一般采用JSON格式传输消息),消费者监听同一个 topic,在监听器里接收到数据,然后在自己的系统中保存这些数据,以便后续利用。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

img

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

img

activeMQ、rabbitMQ由于支持多种协议(jms、amqp、stomp、mqtt、openwire等),性能不太好,因此没有采用。redis基于内存操作,速度最快,kafka基于文件存储消息,吞吐量大,适合传输类似日志类的大批量信息,mqtt主要用于物联网的数据传输,协议简单,便于在物联设备上实现,故Y9平台主要使用kafka作为消息中间件。

功能引入

Y9提供了消息发布模块:

risenet-y9boot-starter-publish-kafka

及消息订阅模块:

risenet-y9boot-starter-listener-kafka

消息发布模块主要做的工作是将 topic 消息通过 PUBLISH 命令发送至消息中间件中,其中 risenet-y9boot-starter-publish-kafka 组件主要实现 risenet-y9boot-common-util 组件中的接口类:Y9PublishService。

统一封装了四种消息类型:

1、Y9MessageOrg:组织身份消息,其topic标题为:y9_org_event,该消息类型主要为组织身份相关的消息类,例如:新增人员、修改人员等消息。

2、Y9MessageCommon:公共类消息,其topic标题为:y9_common_event,该消息类型主要为公共类型的消息,例如:租户数据源更新消息、application 属性值刷新。

3、Y9MessageTask:办件类消息,其topic标题为:y9_task_event,该消息类型主要为工作流组件发布的消息,例如新增办件、办结件等。

maven pom.xml 修改

(1)修改pom.xml

xml
<repositories>
    <repository>
        <id>y9-internet-repo</id>
        <url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <!-- <updatePolicy>always</updatePolicy> -->
            <!-- always,daily(default),interval:XXX,never -->
        </snapshots>
    </repository>
</repositories>

(2)将 risenet-y9boot-starter-publish-kafka 添加到 pom.xml 的依赖列表里.

xml
<dependency>
    <groupId>net.risesoft</groupId>
    <artifactId>risenet-y9boot-starter-publish-kafka</artifactId>
     <version>[最新版本]</version>
</dependency>

修改属性文件 application.properties

properties
spring.kafka.producer.bootstrap-servers=localhost:9092
#启用消息发布
y9.feature.publish.kafka.enabled=true
yaml
spring:
    kafka:
        producer:
            bootstrap-servers: localhost:9092
y9:
    feature:
        publish:
            kafka:
                enabled: true

使用示例

java
package com.example;

import javax.annotation.Resource;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import net.risesoft.enums.platform.OrgTypeEnum;
import net.risesoft.model.platform.OrgUnit;
import net.risesoft.model.platform.Organization;
import net.risesoft.y9.Y9LoginUserHolder;
import net.risesoft.y9.pubsub.Y9PublishService;
import net.risesoft.y9.pubsub.constant.Y9OrgEventTypeConst;
import net.risesoft.y9.pubsub.message.Y9MessageOrg;

@RestController
@RequestMapping("/")
public class MainController {

    @Resource(name = "y9PublishService")
    private Y9PublishService y9PublishService;

    /**
     * 发布新增事件
     */
    @GetMapping(value = "/publish")
    public String publishSyncDataSourceEvent() {
        Organization org = new Organization();
        org.setId("1");
        org.setName("1");
        org.setOrgType(OrgTypeEnum.ORGANIZATION);
        org.setTabIndex(1);
        Y9MessageOrg<OrgUnit> msg =
            new Y9MessageOrg<>(org, Y9OrgEventTypeConst.ORGANIZATION_ADD, Y9LoginUserHolder.getTenantId());
        y9PublishService.publishMessageOrg(msg);
        return "publish success";
    }
}

示例代码

码云地址:https://gitee.com/risesoft-y9/y9-core

该示例代码路径地址:https://gitee.com/risesoft-y9/y9-core/tree/main/y9-digitalbase-example/risenet-y9demo-sync-kafka

Released under the GPL-3.0 License.