1. Producer流程
新建ProducerRecord类后,传入topic、key、value等数据构建Record之后,距离发送至kafka集群还需要经历若干过程。
拦截器列表,对数据进行过滤,更改等行为,处理异常不会导致流程终止。
获取Kafka集群元数据
对数据进行序列化
根据元数据选择分区和Broker
数据校验
进入数据发送缓存区,批次发送
send
2. 代码测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class KafkaProducerInterceptorTest { public static void main(String[] args) throws InterruptedException { HashMap<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( "test1", "key"+i, "我是你爹"+i ); producer.send(record); Thread.sleep(500); }
producer.close(); } }
|
拦截器自定义类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package org.dragon.producer;
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class ValueInterceptorTest implements ProducerInterceptor<String, String> {
@Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.key(), producerRecord.value() + producerRecord.value()); }
@Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override public void close() {
}
@Override public void configure(Map<String, ?> map) {
} }
|
3. 测试结果
可以看到value是复制了2次,成功。