Spring Kafka Transaction
  ViFPlSCOcstX 2023年11月02日 54 0


文章目录

  1. plain kafka client api transaction个三种情况(1. comsumer ack ,2 comsumer + producer commit,3 comsumer +producer@send 一块commit)
    comsumer ack模式适用于poll and simple processor
  2. KafkaTransactionManager详解,模板化了上述的哪些操作
  3. KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
  4. @KafkaListener注解的处理过程
  5. @Transactional + producer详解​​Spring-Kafka(五)—— 使用Kafka事务的两种方式​
  6. KafkaTemplate + 事务详解,KafkaTemplate只是用来发送消息的
  7. KafkaTransactionManager 和 DataSourceTransactionmanager协同

Kafka API # Consumer ACK

Spring Kafka # Consumer ACK

@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}

Spring Kafka Transaction

kafka的事务牵扯到kafka的读写消息;
spring kafka 使用KafkaMessageListenerContainer封装读kafka,使用KafkaTemplate封装写kafka;
事务相关的实现也是围绕着KafkaMessageListenerContainer和KafkaTemplate展开的;

KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory

给KafkaMessageListenerContainer配置一个KafkaAwareTransactionManger的实现

如果给KafkaMessageListenerContainer配置了一个KafkaAwareTransactionManger的实现,KafkaMessageListenerContainer就会初始化一个TransactionTemplate对象,通过将KafkaMessageListenerContainer持有的MessageListener或者BatchMessageListener的实现的调用方法TransactionTemplate#execute方法中,实现在listener调用前后执行事务的开启,提交和回滚操作;

如果我们在listener中使用KafkaTemplate#send方法发送数据,则这些调用会自动加入到调用listener之前开启的事务中;

listner中的操作结束执行完成之后,,在调用KafkaAwareTransactionManger的commit提交事务之前,KafkaMessageListenerContainer会调用Producer#sendOffsetsToTransaction()方法,将offset的提交也加入到这个事务中;

private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
@SuppressWarnings(RAW_TYPES) Producer producer) throws InterruptedException {
if (this.wantsFullRecords) {
this.batchListener.onMessage(records,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null, this.consumer);
}
else {
doInvokeBatchOnMessage(records, recordList);
}
if (!this.isAnyManualAck && !this.autoCommit) {
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
this.acks.put(record);
}
if (producer != null) {
sendOffsetsToTransaction(producer);
}
}
}

private void invokeOnMessage(final ConsumerRecord<K, V> record,
@SuppressWarnings(RAWTYPES) @Nullable Producer producer) {

if (record.value() instanceof DeserializationException) {
throw (DeserializationException) record.value();
}
if (record.key() instanceof DeserializationException) {
throw (DeserializationException) record.key();
}
if (record.value() == null && this.checkNullValueForExceptions) {
checkDeser(record, ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
if (record.key() == null && this.checkNullKeyForExceptions) {
checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(record);
ackCurrent(record, producer);
}

public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings(RAW_TYPES)
@Nullable Producer producer) {

if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (producer == null) {
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(offsetsToCommit);
}
else {
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
}
}
else {
this.acks.add(record);
}
}
else if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (producer != null) {
try {
sendOffsetsToTransaction(producer);
}
catch (Exception e) {
this.logger.error("Send offsets to transaction failed", e);
}
}
}

@SuppressWarnings({ "unchecked", RAW_TYPES })
private void sendOffsetsToTransaction(Producer producer) {
handleAcks();
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
}

给KafkaMessageListenerContainer配置一个非KafkaAwareTransactionManger实现;

这种配置方式提供了一个Kafka Transaction与其他的事务同步的能力;

这里以DataSourceTransactionManager为例,如果给KafkaMessageListenerContainer配置了一个DataSourceTransactionManager,那么KafkaMessageListenerContainer在调用其持有的listener之前,就会在TransactionTempalte的能力下通过DataSourceTransactionmanager开启一个JDBC事务。listener调用完成之后,就会根据情况调用DataSourceTransactionmanager的提交或者回滚;
那么poll的ack如何返回给kafka server呢;

@Bean
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
final KafkaTemplate template) {
ContainerProperties props = new ContainerProperties("foo");
props.setGroupId("group");
props.setTransactionManager(new SomeOtherTransactionManager());
...
props.setMessageListener((MessageListener<String, String>) m -> {
template.send("foo", "bar");
template.send("baz", "qux");
template.sendOffsetsToTransaction(//这里是重点,自己发送Offsets给kafka server
Collections.singletonMap(new TopicPartition(m.topic(), m.partition()),
new OffsetAndMetadata(m.offset() + 1)));
});
return new KafkaMessageListenerContainer<>(cf, props);
}

这样配置一个持有DataSourceTransactionmanager对象的KafkaMessageListenerContainer<K, V>,就可以实现如下的模式。从数据的读取,然后操作数据库,数据库处理成功后,offset才会提交给kafka server;如果数据库操作失败,则offset不回提交给kafka server,在超过指定次数之前,还能再次拉取到此次的数据;

@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaWithDataSourceTransactionmanagerListenerContainerFactory")
public void listen(String data) {
// insert data into database
}

给KafkaMessageListenerContainer配置一个ChainedKafkaTransactionManager

上一种方式只能让kafka的事务与另外的一个事务进行同步,ChainedKafkaTransactionManager提供了一个kafka事务与任意多个其他非Kafka事务进行同步的能力;

如果既想控制Kafka的事务,又想于其他事务管理器同步的话,使用ChainedKafkaTransactionManager是个更时髦的选择;

ChainedKafkaTransactionManager实现了KafkaAwareTransactionManager接口,并继承自ChainedTransactionManager;

实现了KafkaAwareTransactionManager接口,意味着ChainedKafkaTransactionManager可以对外提供ProducerFactory的引用;

也就是说将ChainedKafkaTransactionManager配置给KafkaMessageListenerContainer,KafkaMessageListenerContainer就拥有了第一种模式所描述的那样,container来发送offsets,而不用开发者在listener的实现里发送offsets了;

将ChainedKafkaTransactionManager配置到ContainerProperties来构建一个KafkaMessageListenerContainer就能实现这样的能力;

单纯的Kafka Producer 事务

给KafkaTemplate配置一个KafkaAwareTransactionManager

给KafkaTemplate配置一个非KafkaAwareTransactionManager

Kafka Template 和 其他事务管理器同步

这里以DataSourceTransactionmanager为例;
TransactionSynchronization

Kafka API加入事务的概念是,获取之前操作使用的已经调用了initTransaction和beginTransaction方法的Producer对象,直接调用对应的操作,就叫将当前操作加入到这个Producer所在的事务中;

如果KafkaTemplate持有的ProducerFactory<K, V>接口的实现支持事务,即ProducerFactory#transactionCapable返回true;

KafkaTemplate是用来发送数据的模板,Kafka的发送数据相关的api支持事务操作;

只要构建KafkaTemplate时传入的producerFactory.transactionCapable()是支持事务的,那么KafkaTemplate就是支持事务的;KafkaTemplate在使用producerFactory获取一个Producer时,就会同时beginTranaction();

Kafka Producer开启事务的时间点

KafkaTemplate持有一个producers字段,其定义如下:

private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal<>();

使用KafkaTemplate的一系列send方法发送数据时(详细信息请查看KafkaTemplate#doSend方法),首先通过KafkaTemplate#getTheProducer方法获取一个Producer接口的实现,在getTheProducer方法中,首先尝试从producers获取当前线程的已经创建好的Producer,如果没有获取到,则会创建一个,并放到KafkaTemplate#producers中,重点就是这个创建的逻辑:

  public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory) {

Assert.notNull(producerFactory, "ProducerFactory must not be null");

@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(producerFactory);
if (resourceHolder == null) {
Producer<K, V> producer = producerFactory.createProducer();

try {
producer.beginTransaction();
}
catch (RuntimeException e) {
producer.close();
throw e;
}

resourceHolder = new KafkaResourceHolder<K, V>(producer);
bindResourceToTransaction(resourceHolder, producerFactory);
}
return resourceHolder;
}

private static <K, V> void bindResourceToTransaction(KafkaResourceHolder<K, V> resourceHolder,
ProducerFactory<K, V> producerFactory) {
TransactionSynchronizationManager.bindResource(producerFactory, resourceHolder);
resourceHolder.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager
.registerSynchronization(new KafkaResourceSynchronization<K, V>(resourceHolder, producerFactory));
}
}

可以看到getTransactionalResourceHolder方法中,创建了一个Producer并且调用了producer.beginTransaction()开启事务,也就是说如果我们有如下的代码,KafkaTemplate配置支持事务:

kafkaTemplate.sendDefault("foo")
kafkaTemplate.sendDefault("bar")

发送foo字符串时,KafkaTemplate首先获取一个Producer对象,并开启事务,发送bar字符串时,KafkaTemplate通过ProducerFactoryUtils#getTransactionalResourceHolder方法,调用TransactionSynchronizationManager获取绑定到当前事务的Producer对象,直接返回;也就是说,当发送bar字符串的时候,就自动加入到了发送foo字符串时开启的事务中;

综上所述,Kafka Producer开启事务的时间点为当前事务所在线程的Producer第一次被创建的时候。

Kafka Producer提交事务的时间点

如果给KafkaTemplate配置了其他支持同步的事务管理器,比如DataSourceTransactionManager,而不是KafkaAwareTransactionManager的实现,KafkaTemplate#getTheProducer方法里的逻辑通过间接调用ProducerFactoryUtils#bindResourceToTransaction方法,会创建一个KafkaResourceSynchronization<K, V>对象,绑定到Producer当前所操作的事务;

KafkaResourceSynchronization是TransactionSynchronization接口的实现,相关内容请查看以往博客;

private static final class KafkaResourceSynchronization<K, V> extends
ResourceHolderSynchronization<KafkaResourceHolder<K, V>, Object> {

private final KafkaResourceHolder<K, V> resourceHolder;

KafkaResourceSynchronization(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {
super(resourceHolder, resourceKey);
this.resourceHolder = resourceHolder;
}

@Override
protected boolean shouldReleaseBeforeCompletion() {
return false;
}

@Override
public void afterCompletion(int status) {
try {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.commit();
}
else {
this.resourceHolder.rollback();
}
}
finally {
super.afterCompletion(status);
}
}

@Override
protected void releaseResource(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {
ProducerFactoryUtils.releaseResources(resourceHolder);
}

}

可以看到其在afterCompletion回调方法中会判断绑定给KafkaTemplate的非KafkaAwareTransactionManager的事务管理器,比如DataSourceTransactionManager,在commit之后的状态,是成功提交了,TransactionSynchronization.STATUS_COMMITTED,还是TransactionSynchronization.STATUS_ROLLED_BACK了,还是未知状态TransactionSynchronization.STATUS_UNKNOWN;这里以DataSourceTransactionManager为例,如果DataSourceTransactionManager提交了事务,并且成功了,KafkaResourceSynchronization里的逻辑就会间接调用Kafka Producer#commitTransaction(),否则就会间接调用Kafka Producer.abortTransaction()中断事务,最后调用super.afterCompletion(status),释放当前线程中之前的逻辑绑定的各个资源;

不管给KafkaTemplate配置的啥TransactionManager

KafkaTemplate.executeInTransaction()不使用事务管理器,而是直接调用了Kafka Producer的事务API;

@Override
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
Assert.notNull(callback, "'callback' cannot be null");
Assert.state(this.transactional, "Producer factory does not support transactions");
Producer<K, V> producer = this.producers.get();
// 在此可以看到,executeInTransaction只能单独调用,不能参与的其他的事务中,但是其他的事务操作可以参与到executeInTransaction已经开启的事务中,比如KafkaTempalte.send系列方法
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
String transactionIdSuffix;
if (this.producerFactory.isProducerPerConsumerPartition()) {
transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
TransactionSupport.clearTransactionIdSuffix();
}
else {
transactionIdSuffix = null;
}

producer = this.producerFactory.createProducer();

try {
producer.beginTransaction();
}
catch (Exception e) {
closeProducer(producer, false);
throw e;
}

this.producers.set(producer);
try {
T result = callback.doInOperations(this);
try {
producer.commitTransaction();
}
catch (Exception e) {
throw new SkipAbortException(e);
}
return result;
}
catch (SkipAbortException e) { // NOSONAR - exception flow control
throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace
}
catch (Exception e) {
producer.abortTransaction();
throw e;
}
finally {
if (transactionIdSuffix != null) {
TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);
}
this.producers.remove();
closeProducer(producer, false);
}
}
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});

sendDefault有能力开启事务,但是也可以参与到executeInTransaction开启的事务中;

t.sendDefault("thing1", "thing2");
boolean result = template.executeInTransaction(t -> {
t.sendDefault("cat", "hat");
return true;
});

如果是上面这种形式,那就不行了, t.sendDefault(“thing1”, “thing2”);已经在当前线程中创建了Producer,就调用beginTransaction 开启了事务,再执行executeInTransaction就要报错了;

参考

​Kafka client 消息接收的三种模式​


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
ViFPlSCOcstX