一、消息系统模块说明
- common : 公共模块(一般放到maven私服中,供生产者和消费者加载)
- producer : 生产者服务(一般单独部署,依赖common包)
- consumer: 消费者服务(一般单独部署,依赖common包)
本文通过一个用户注册的案例来介绍spring与kafka客户端集成的技术实现
- 用户注册:
- 插入数据库等操作
- 发送用户注册消息(生产者),其他相关事务在用户注册后,发送邮件或其他的操作(消费者)
二、公共模块定义
1.公共模块主要内容
- 消息内容实体(供生产者和消费者使用)
- 消息内容序列化与反序列化工具类
- 消息内容序列化:消息生产者将消息内容进行序列化发送到broker
- 消息内容反序列化:消费者从broker拉取消息,将消息内容进行反序列化
注:消息为什么要序列化或反序列化呢?
因为我们发送的消息内容一般不可能是单纯的字符串,也是结构化数据,如一个数组列表,或者一个数据实体类,或者一个Map,因此,我们需要将结构化的数据转化为字符串,因此就要用到序列化类.
2.导入kafka依赖包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
3.消息内容实体
根据业务需要,定义消息内容实体,一般可能与数据库表结构相关,这里以一个简单的用户注册的例子来说明,定义一个用户类
@Data
public class User implements Serializable {
private static final long serialVersionUID = -3247930189609406661L;
private long userId;
private String nickname;
private String email;
private String account;
private String password;
private int status;
private long createTime;
public User() {
}
public User(String nickname, String email, String account, String password, int status, long createTime) {
this.userId = userId;
this.nickname = nickname;
this.email = email;
this.account = account;
this.password = password;
this.status = status;
this.createTime = createTime;
}
public User(long userId, String nickname, String email, String account, String password, int status, long createTime) {
this.userId = userId;
this.nickname = nickname;
this.email = email;
this.account = account;
this.password = password;
this.status = status;
this.createTime = createTime;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", nickname='" + nickname + '\'' +
", email='" + email + '\'' +
", account='" + account + '\'' +
", password='" + password + '\'' +
", status=" + status +
", createTime=" + createTime +
'}';
}
}
注:
- 消息实体类一定要实现Serializable这个接口(因为消息内容要序列化,必须显示说明它是可序列化的)
4.定义消息内容序列化类和反序列化类
(1)定义一个转换工具类:用于字节数组与对象之间的转换
import java.io.*;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午3:02
*/
public class Converter {
/**
* bean转化为byte数组
* @param object
* @return
*/
public static byte[] bean2ByteArray(Object object) {
byte[] bytes = null;
ByteArrayOutputStream byteArrayOutputStream = null;
ObjectOutputStream outputStream = null;
try {
/**
* 1.定义字节数组输出流和对象输出流
*/
byteArrayOutputStream = new ByteArrayOutputStream();
outputStream = new ObjectOutputStream(byteArrayOutputStream);
/**
* 2.将对象写入输出流中
*/
outputStream.writeObject(object);
outputStream.flush();
/**
* 3.字节数组输出流转化为字节数组
*/
bytes = byteArrayOutputStream.toByteArray();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
/**
* 4.关闭流,释放资源
*/
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (byteArrayOutputStream != null) {
try {
byteArrayOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return bytes;
}
/**
* byte数组转化为bean
* @param bytes
* @return
*/
public static Object byteArray2Bean(byte[] bytes) {
Object object = null;
ByteArrayInputStream byteArrayInputStream = null;
ObjectInputStream inputStream = null;
try {
/**
* 1.定义字节数组输入流
*/
byteArrayInputStream= new ByteArrayInputStream(bytes);
/**
* 2.定义输入流
*/
inputStream = new ObjectInputStream(byteArrayInputStream);
/**
* 3.从输入流中读取对象数据
*/
object = inputStream.readObject();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
/**
* 4.关闭流,释放资源
*/
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (byteArrayInputStream != null) {
try {
byteArrayInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return object;
}
}
(2)定义序列化类
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午2:56
*/
public class KafkaObjectSerializer implements Serializer<Object> {
@Override
public void configure(Map<String, ?> config, boolean isKey) {
}
/**
* 实现序列化的方法
* @param topic 主题
* @param data 消息内容数据
* @return
*/
@Override
public byte[] serialize(String topic, Object data) {
return Converter.bean2ByteArray(data);
}
@Override
public void close() {
}
}
注:
- 序列化类主要是实现
org.apache.kafka.common.serialization.Serializer
这个类- serialize 方法:实现序列化的方法
- 序列化类主要是消息生产者使用,在发送消息前先消息内容进行序列化
(3)实现反序列化方法
mport org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午5:43
*/
public class KafkaObjectDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> config, boolean isKey) {
}
/**
* 反序列化方法
* @param topic 主题
* @param data 消息内容
* @return
*/
@Override
public Object deserialize(String topic, byte[] data) {
return Converter.byteArray2Bean(data);
}
@Override
public void close() {
}
}
注:
- 反序列化类主要是实现
org.apache.kafka.common.serialization.Deserializer
这个类- deserialize 方法:实现反序列化的方法
- 反序列化类主要是消息消费者使用,在收到消息消费前,将生产者序列化的消息内容进行反序列化,还原成它本来的样子
- 序列化和反序列化是一一对应的,消息结构一定要约定好,定义在一个公共模块中
三、消息生产者Producer
1.导入kafka依赖包
在项目maven配置pom.xml中加入:
<!-- kafka包 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<!-- 自定义的公共模块 -->
<dependency>
<groupId>com.shixinke.github.kafka.practise.common</groupId>
<artifactId>kafka-practise-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
2.kafka相关的应用配置
spring.application.name=kafka-practise-producer
spring.kafka.producer.batch-size=16384
spring.kafka.producer.acks=all
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.client-id=${spring.application.name}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.shixinke.github.kafka.practise.common.util.KafkaObjectSerializer
- spring.kafka.producer.batch-size : 批量发送消息的最大容量
- spring.kafka.producer.acks : 消息确认方式
- spring.kafka.producer.bootstrap-servers : 连接的broker地址
- spring.kafka.producer.buffer-memory : 发送缓冲区大小
- spring.kafka.producer.client-id : 当前应用的标识
- spring.kafka.producer.key-serializer : 消息key的序列化类
- spring.kafka.producer.value-serializer : 消息内容的序列化类(指定到公共模块的序列化类)
3.编写kafka配置类
(1)配置参数类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-2 下午4:33
*/
@Component
@ConfigurationProperties(prefix = "spring.kafka.producer")
@Data
public class KafkaProducerProperties {
private String batchSize;
private String acks;
private String bootstrapServers;
private Long bufferMemory;
private String clientId;
private String keySerializer;
private String valueSerializer;
}
- 对应应用配置中的配置项
(2)kafka对象配置类
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-2 下午4:31
*/
@Configuration
public class KafkaProducerConfiguration {
@Resource
private KafkaProducerProperties kafkaProducerProperties;
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<String, Object>(8);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerProperties.getAcks());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerProperties.getBatchSize());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerProperties.getBufferMemory());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerProperties.getClientId());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerProperties.getValueSerializer());
return props;
}
@Bean
public <V> KafkaTemplate<String, V> kafkaTemplate() {
return new KafkaTemplate<String, V>(new DefaultKafkaProducerFactory<String, V>(producerConfig()));
}
}
- 通过kafka配置参数类来实例化KafkaTemplate类(调用生产者对象的类)
4.生产者服务类
(1)消息生产者服务接口
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.concurrent.Future;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午2:05
*/
public interface KafkaMQProducerService {
public <K, V> Future<RecordMetadata> send(String topic, K key, V content);
}
(2)消息生产者服务实现类
import com.shixinke.github.kafka.practise.producer.service.KafkaMQProducerService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.Future;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午2:06
*/
@Service
public class KafkaMQProducerServiceImpl implements KafkaMQProducerService {
/**
* 在生产者配置中声明的KafkaTemplate的Bean类
*/
@Resource
private KafkaTemplate kafkaProducer;
@Override
public <K, V> Future<RecordMetadata> send(String topic, K key, V content) {
ProducerRecord<String, V> record = new ProducerRecord<String, V>(topic, String.valueOf(key), content);
return kafkaProducer.send(record);
}
}
5.业务服务(调用消息生产者的类)
(1)业务服务接口
import com.shixinke.github.kafka.practise.common.data.Result;
import com.shixinke.github.kafka.practise.producer.dto.UserDTO;
public interface UserService {
public Result create(UserDTO userDTO);
}
(2)业务服务实现
mport com.shixinke.github.kafka.practise.common.data.Result;
import com.shixinke.github.kafka.practise.common.data.User;
import com.shixinke.github.kafka.practise.producer.dto.UserDTO;
import com.shixinke.github.kafka.practise.producer.service.KafkaMQProducerService;
import com.shixinke.github.kafka.practise.producer.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Instant;
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-2 下午4:54
*/
@Service
public class UserServiceImpl implements UserService {
@Resource
private KafkaMQProducerService producerService;
@Override
public Result create(UserDTO userDTO) {
User user = new User(userDTO.getNickname(), userDTO.getEmail(), userDTO.getAccount(), userDTO.getPassword(), 0, Instant.now().getEpochSecond());
/**
* 数据操作在此不再赘述(插入数据库,更新缓存)
*/
user.setUserId(3000001);
/**
* 上面操作成功后,向消息队列发送一条消息,比如通知邮件服务要发邮件
*/
producerService.send("user_reg", user.getUserId(), user);
return Result.success(user.getUserId());
}
}
6.触发生产消息的事件
(1)接收用户端参数
@Data
public class UserDTO {
private String nickname;
private String account;
private String email;
private String password;
@Override
public String toString() {
return "UserDTO{" +
"nickname='" + nickname + '\'' +
", account='" + account + '\'' +
", email='" + email + '\'' +
", password='" + password + '\'' +
'}';
}
}
(2)接收用户请求并处理
@RestController
@RequestMapping("/user")
@Slf4j
public class UserController {
@Resource
private UserService userService;
@PostMapping("register")
public Result register(UserDTO userDTO) {
Result result = null;
try {
result = userService.create(userDTO);
} catch (Exception ex) {
log.error("注册出错:", ex);
result = Result.error(ex.getMessage());
}
return result;
}
}
(3)触发事件
在浏览器或其他工具中模拟用户注册:
四、消息消费者Consumer
1.导入kafka依赖包
在项目maven配置pom.xml中加入:
<!-- kafka包 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<!-- 自定义的公共模块 -->
<dependency>
<groupId>com.shixinke.github.kafka.practise.common</groupId>
<artifactId>kafka-practise-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
2.项目中kafka消费者相关配置
spring.application.name=kafka-practise-consumer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=user
spring.kafka.consumer.client-id=${spring.application.name}
spring.kafka.consumer.subscribe-topics=user_reg,user_update
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval-ms=1000
spring.kafka.consumer.concurrency=2
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.shixinke.github.kafka.practise.common.util.KafkaObjectDeserializer
- spring.kafka.consumer.bootstrap-servers : 连接的broker地址
- spring.kafka.consumer.group-id : 消费者组名称
- spring.kafka.consumer.client-id : 当前消费者标识
- spring.kafka.consumer.subscribe-topics : 消费的主题(自定义选项)
- spring.kafka.consumer.enable-auto-commit : 是否自动提交
- spring.kafka.consumer.auto-commit-interval-ms : 自动提交频率
- spring.kafka.consumer.concurrency : 并发数
- spring.kafka.consumer.auto-offset-reset : 当当前偏移量不存在或未指定偏移量时消费的偏移量
- spring.kafka.consumer.key-deserializer:消息key的反序列化类
- spring.kafka.consumer.value-deserializer:消息内容的反序列化类(使用公共模块中的反序列化类)
3.应用配置类
(1)消费者配置参数类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "spring.kafka.consumer")
@Data
public class KafkaConsumerProperties {
private String bootstrapServers;
private String groupId;
private String clientId;
private String enableAutoCommit;
private String autoCommitIntervalMs;
private String autoOffsetReset;
private String keyDeserializer;
private String valueDeserializer;
private String subscribeTopics;
private int concurrency;
}
- 与应用配置文件中的kafka配置对应
(2)消费者对象配置类
/**
* @author shixinke
* @version 1.0
* @Description
* @Date 19-2-1 下午4:05
*/
@Configuration
public class KafkaConsumerConfiguration {
@Resource
private KafkaConsumerProperties kafkaProperties;
/**
* 通用的监听bean
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(kafkaProperties.getConcurrency());
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
/**
* 针对用户模块的监听bean
* @return
*/
@Bean("userContainerFactory")
public ConcurrentKafkaListenerContainerFactory userContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(kafkaProperties.getConcurrency());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory());
return factory;
}
/**
* 消费者工厂
* @return
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getProperties());
}
/**
* 配置参数设置(用于初始化)
* @return
*/
public Map<String, Object> getProperties() {
Map<String, Object> props = new HashMap<String, Object>(8);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getEnableAutoCommit());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getAutoOffsetReset());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDeserializer());
return props;
}
}
4.消息事件监听器(消费实现)
@Component
@Slf4j
public class UserMessageListener {
@KafkaListener(id ="user_reg", topics = "user_reg", containerFactory = "userContainerFactory")
public void subscribeCreateUser(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
log.info("收到消息:{}", records);
/**
* 手动提交偏移量
*/
ack.acknowledge();
} catch (Exception ex) {
try {
ack.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 通过KafkaListener这个注解来标识这是一个消息事件监听
- List
> records : 消息内容列表 - Acknowledgment ack : 消息确认对象
- List
收到消息:[ConsumerRecord(topic = user_reg, partition = 0, offset = 1, CreateTime = 1549155259062, serialized key size = 7, serialized value size = 250, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3000001, value = User(userId=3000001, nickname=诗心客, email=ishixinke@qq.com, account=shixinke, password=123456, status=0, createTime=1549155258))]