Skip to content

消息发布订阅

简介

当前 数字底座大多数系统均基于微服务架构进行设计与开发,按照业务逻辑的不同被拆分成一组可以独立开发与部署的微服务,服务之间通过 HTTP 调用和 RPC 这两种轻量级通信方式来实现相互通信。但这两种通信方式都属于同步点对点通信的范畴,会造成服务之间存在高耦合、请求易阻塞以及请求无缓冲等缺陷。随着服务规模的不断扩张以及服务请求量的急剧增加,这些缺陷也被不断地放大从而影响着整个系统的性能和稳定性,例如组织架构的人员同步和系统操作日志的存储等,因此必须采用松耦合的、支持系统动态加入和退出的数据交换手段,而许多消息中间件的发布订阅模式正式解决这个问题的框架,如 redis、kafka、mqtt、activeMQ、rabbitMQ、Hazelcast 等都提供 publish/subscribe 通讯模式,消息发布者(producer)、消息中间件(broker)、消息订阅者(consumer)构成了三者之间的关系:

1566979135009 与 broker 连接的是客户端(client),既可以充当发布者、也可以充当消费者角色,他们通过主题(topic)建立传输通道,发布者往某个 topic 发布消息(消息格式自己定义,只要消费者能够理解即可,因此一般采用JSON格式传输消息),消费者监听同一个 topic,在监听器里接收到数据,然后在自己的系统中保存这些数据,以便后续利用。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

img

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

img

activeMQ、rabbitMQ由于支持多种协议(jms、amqp、stomp、mqtt、openwire等),性能不太好,因此没有采用。redis基于内存操作,速度最快,kafka基于文件存储消息,吞吐量大,适合传输类似日志类的大批量信息,mqtt主要用于物联网的数据传输,协议简单,便于在物联设备上实现,故数字底座主要使用kafka作为消息中间件。

功能引入

消息发布

Y9提供了消息发布模块 risenet-y9boot-starter-publish-kafka 及消息订阅模块 risenet-y9boot-starter-listener-kafka

消息发布模块主要做的工作是将 topic 消息通过 PUBLISH 命令发送至消息中间件中,其中 risenet-y9boot-starter-publish-kafka 组件主要实现 risenet-y9boot-common-util 组件中的接口类:Y9PublishService。

添加依赖

risenet-y9boot-starter-publish-kafka 添加到 pom.xml 的依赖列表里

xml
<dependency>
    <groupId>net.risesoft</groupId>
    <artifactId>risenet-y9boot-starter-publish-kafka</artifactId>
     <version>[最新正式版本]</version>
</dependency>

如需使用 SNAPSHOT 版本,且 maven 没有 POM 继承链至数字底座的 y9-digitalbase-parent 则还需将有生的私服仓库地址添加到 pom.xml 文件中。

xml
<repositories>
    <repository>
        <id>y9-internet-repo</id>
        <url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <!-- <updatePolicy>always</updatePolicy> -->
            <!-- always,daily(default),interval:XXX,never -->
        </snapshots>
    </repository>
</repositories>

修改配置文件

yaml
spring:
    kafka:
        bootstrap-servers: localhost:9092
y9:
    feature:
        publish:
            kafka:
                enabled: true
properties
spring.kafka.bootstrap-servers=localhost:9092
#启用消息发布
y9.feature.publish.kafka.enabled=true

示例代码

java
@RestController
@RequiredArgsConstructor
@Slf4j
public class PublishMessageController {

    private static final String TEST_TOPIC = "test-topic";

    private final Y9PublishService publishService;

    @RequestMapping("/publishMessage")
    public void publishMessage() {
        String message = "test-message";
        publishService.publishObject(message, TEST_TOPIC);
        LOGGER.info("往 kafka 的主题 [{}] 发布消息 [{}]", TEST_TOPIC, message);
    }
}

消息订阅

risenet-y9boot-starter-listener-kafka 引入了 spring-kafka 依赖,可以通过 @KafkaListener 注解和配置 MessageListenerContainer 来接收 kafka 的 topic 的消息。

starter 中集成了组织节点同步的监听器,监听方法中将组织身份的消息发布为 spring 的 Y9EventOrg 事件,引入工程即可直接利用 spring 的事件机制处理消息,无需编写 kafka 的相关逻辑代码了。这个监听器可通过开关打开。

添加依赖

risenet-y9boot-starter-listener-kafka 添加到 pom.xml 的依赖列表里

xml
<dependency>
    <groupId>net.risesoft</groupId>
    <artifactId>risenet-y9boot-starter-listener-kafka</artifactId>
     <version>[最新正式版本]</version>
</dependency>

如需使用 SNAPSHOT 版本,且 maven 没有 POM 继承链至数字底座的 y9-digitalbase-parent 则还需将有生的私服仓库地址添加到 pom.xml 文件中。

xml
<repositories>
    <repository>
        <id>y9-internet-repo</id>
        <url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <!-- <updatePolicy>always</updatePolicy> -->
            <!-- always,daily(default),interval:XXX,never -->
        </snapshots>
    </repository>
</repositories>

修改配置文件

yaml
spring:
    kafka:
        bootstrap-servers: localhost:9092
        consumer:
            group-id: y9demo-sync
y9:
    systemName: y9demo-sync
    feature:
        listener:
            kafka:
                enabled: true
                message-org-enabled: true # 启用组织节点同步
properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=y9demo-sync
y9.systemName=y9demo-sync
y9.feature.listener.kafka.enabled=true
y9.feature.listener.kafka.message-org-enabled=true # 启用组织节点同步

示例代码

  • 订阅自定义主题消息
java
@RestController
@RequiredArgsConstructor
@Slf4j
public class SubscribeMessageController {

    private static final String TEST_TOPIC = "test-topic";

    @KafkaListener(topics = TEST_TOPIC)
    public void listenMessage(String message) {
        LOGGER.info("收到订阅的 kafka 主题 [{}] 的消息 [{}]", TEST_TOPIC, message);
    }
}
  • 处理组织节点同步消息
java

@Component
public class Y9OrgListener implements ApplicationListener<Y9EventOrg> {
    protected final Logger log = LoggerFactory.getLogger(Y9OrgListener.class);

    public Y9OrgListener() {
        log.info("init SpringEventListener...");
    }

    @Override
    public void onApplicationEvent(Y9EventOrg event) {
        log.info(event.getEventType());

        if (Y9OrgEventTypeConst.ORGANIZATION_ADD.equals(event.getEventType())) {
            Organization org = (Organization)event.getOrgObj();
            log.info("--------------------------添加组织机构-------------------------------");
            log.info("--------------------" + org.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.DEPARTMENT_ADD.equals(event.getEventType())) {
            Department newDept = (Department)event.getOrgObj();
            log.info("--------------------------添加部门-------------------------------");
            log.info("--------------------" + newDept.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_ADD.equals(event.getEventType())) {
            Group group = (Group)event.getOrgObj();
            log.info("--------------------------添加用户组-------------------------------");
            log.info("--------------------" + group.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_ADD.equals(event.getEventType())) {
            Position position = (Position)event.getOrgObj();
            log.info("--------------------------添加岗位-------------------------------");
            log.info("--------------------" + position.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.PERSON_ADD.equals(event.getEventType())) {
            Person person = (Person)event.getOrgObj();
            log.info("--------------------------添加人员-------------------------------");
            log.info("--------------------" + person.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.ORGANIZATION_UPDATE.equals(event.getEventType())) {
            Organization org = (Organization)event.getOrgObj();
            log.info("--------------------------修改组织机构-------------------------------");
            log.info("--------------------" + org.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.DEPARTMENT_UPDATE.equals(event.getEventType())) {
            Department newDept = (Department)event.getOrgObj();
            log.info("--------------------------修改部门-------------------------------");
            log.info("--------------------" + newDept.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_UPDATE.equals(event.getEventType())) {
            Group group = (Group)event.getOrgObj();
            log.info("--------------------------修改用户组-------------------------------");
            log.info("--------------------" + group.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_UPDATE.equals(event.getEventType())) {
            Position position = (Position)event.getOrgObj();
            log.info("--------------------------修改岗位-------------------------------");
            log.info("--------------------" + position.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.PERSON_UPDATE.equals(event.getEventType())) {
            Person person = (Person)event.getOrgObj();
            log.info("--------------------------修改人员-------------------------------");
            log.info("--------------------" + person.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.ORGANIZATION_DELETE.equals(event.getEventType())) {
            Organization org = (Organization)event.getOrgObj();
            log.info("--------------------------删除组织机构-------------------------------");
            log.info("--------------------" + org.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.DEPARTMENT_DELETE.equals(event.getEventType())) {
            Department newDept = (Department)event.getOrgObj();
            log.info("--------------------------删除部门-------------------------------");
            log.info("--------------------" + newDept.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_DELETE.equals(event.getEventType())) {
            Group group = (Group)event.getOrgObj();
            log.info("--------------------------删除用户组-------------------------------");
            log.info("--------------------" + group.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_DELETE.equals(event.getEventType())) {
            Position position = (Position)event.getOrgObj();
            log.info("--------------------------删除岗位-------------------------------");
            log.info("--------------------" + position.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.PERSON_DELETE.equals(event.getEventType())) {
            Person person = (Person)event.getOrgObj();
            log.info("--------------------------删除人员-------------------------------");
            log.info("--------------------" + person.getName() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_ADD_PERSON.equals(event.getEventType())) {
            PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
            log.info("--------------------------用户组添加人员-------------------------------");
            log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_REMOVE_PERSON.equals(event.getEventType())) {
            PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
            log.info("--------------------------用户组删除人员-------------------------------");
            log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.GROUP_ORDER.equals(event.getEventType())) {
            PersonsGroups personsGroups = (PersonsGroups)event.getOrgObj();
            log.info("--------------------------用户组-人员排序-------------------------------");
            log.info("--------------------" + personsGroups.getGroupOrder() + "---------------------");
            log.info("--------------------" + personsGroups.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_ADD_PERSON.equals(event.getEventType())) {
            PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
            log.info("--------------------------岗位添加人员-------------------------------");
            log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_REMOVE_PERSON.equals(event.getEventType())) {
            PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
            log.info("--------------------------岗位删除人员-------------------------------");
            log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.POSITION_ORDER.equals(event.getEventType())) {
            PersonsPositions personsPositions = (PersonsPositions)event.getOrgObj();
            log.info("--------------------------岗位-人员排序-------------------------------");
            log.info("--------------------" + personsPositions.getPositionOrder() + "---------------------");
            log.info("--------------------" + personsPositions.getPersonOrder() + "---------------------");
        } else if (Y9OrgEventTypeConst.SYNC.equals(event.getEventType())) {
            SyncOrgUnits syncOrgUnits = (SyncOrgUnits)event.getOrgObj();
            OrgTypeEnum orgType = syncOrgUnits.getOrgTypeEnum();
            switch (orgType) {
                case ORGANIZATION:
                    log.info("--------------------------同步-组织机构-------------------------------");
                    OrganizationUtil.syncOrganization(syncOrgUnits);
                    break;
                case DEPARTMENT:
                    log.info("--------------------------同步-部门-------------------------------");
                    DepartmentUtil.syncDepartment(syncOrgUnits);
                    break;
                case PERSON:
                    log.info("--------------------------同步-人员-------------------------------");
                    PersonUtil.syncPerson(syncOrgUnits.getPersons().get(0));
                    break;
                case GROUP:
                    log.info("--------------------------同步-用户组-------------------------------");
                    GroupUtil.syncGroup(syncOrgUnits);
                    break;
                case POSITION:
                    log.info("--------------------------同步-岗位-------------------------------");
                    PositionUtil.syncPosition(syncOrgUnits);
                    break;
                default:
                    break;
            }
        }
    }
}

示例项目

示例项目地址:https://gitee.com/risesoft-y9/y9-core/tree/main/y9-digitalbase-example/risenet-y9demo-sync-kafka

Released under the GPL-3.0 License.