1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package org.dragon.producer;
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;
public class KafkaProducerTransactionTest { public static void main(String[] args) throws InterruptedException, ExecutionException { HashMap<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.ACKS_CONFIG, "-1"); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); config.put(ProducerConfig.RETRIES_CONFIG, 5); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 5); config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); producer.initTransactions();
try { producer.beginTransaction(); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( "test2", "" + i, "我是你爹" + i ); Future<RecordMetadata> send = producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("回调信息:消息发送成功"); } }); System.out.println("发送数据"); send.get(); } producer.commitTransaction(); }catch(Exception e) { producer.abortTransaction(); e.printStackTrace(); }finally{ producer.close(); } } }
|