Skip to content

spring-kafka

简介

kafka 是一个消息队列产品,基于 Topic partitions 的设计,能达到非常高的消息发送处理性能。Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能。

Y9 是在 spring-kafka 的基础上进行定义的,加上了启用开关,主要配置参照 spring 的来设置即可。详见:https://docs.spring.io/spring-boot/reference/messaging/kafka.html#page-title

功能引用

引用与配置

(1)修改 pom.xml

将本公司的私服仓库地址添加到 pom.xml 文件中。

xml
<repositories>
    <repository>
        <id>y9-internet-repo</id>
        <url>https://svn.youshengyun.com:9900/nexus/repository/maven-public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <!-- <updatePolicy>always</updatePolicy> -->
            <!-- always,daily(default),interval:XXX,never -->
        </snapshots>
    </repository>
</repositories>

(2)将 risenet-y9boot-starter-jpa-public 添加到 pom.xml 的依赖列表里.

xml
<dependency>
    <groupId>net.risesoft</groupId>
    <artifactId>risenet-y9boot-starter-kafka</artifactId>
     <version>[最新版本]</version>
</dependency>

(3)修改属性文件 application.properties

properties
#是否启用kafka
y9.common.kafkaEnabled=true

#Kafka配置
spring.kafka.producer.bootstrap-servers=localhost:9092

#Kafka消费者配置
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.group-id=y9
yaml
spring:
    kafka:
        producer:
            bootstrap-servers: localhost:9092
        #Kafka消费者配置
        consumer: 
            bootstrap-servers: localhost:9092
            auto-commit-interval: 1000
            enable-auto-commit: true
            group-id: y9demo-sync
y9:
    common:
        kafkaEnabled: true

使用说明

(1)定义了 KafkaTemplate :y9KafkaTemplate。

使用示例:

java
@Autowired
private KafkaTemplate<String, Object> y9KafkaTemplate;

@RequestMapping("/test")
public Boolean test() {
    try {
		HashMap<String, Object> map = new HashMap<String, Object>();
		map.put("systemName", "riseplatform");
		map.put("personName", "systemadmin");
		String jsonString = Y9JacksonUtil.writeValueAsString(map);
		y9KafkaTemplate.send("y9_demo_event", jsonString);
		return true;
	} catch (Exception e) {
		e.printStackTrace();
	}
    return false;
}

Released under the GPL-3.0 License.