在事务流api Alpakka中,事务Id要传递什么值?

我想用Alpakka使用Transactional.Source Api消耗记录,然后用Transactional.flow将记录生成到另一个主题,但是文档中说需要传递TransactionId。

我应该如何创建TransactionId,比如下面的代码。

```via(Transactional.flow(producerSettings, transactionalId))```

是按制作人还是按制作人在阿尔帕卡的记录?

解决方案:

事务性.id在抵御僵尸方面起着重要作用。但是维护一个标识符在不同的生产者会话中是一致的,同时也能正确地围出僵尸,这就有点棘手了。

正确围堵僵尸的关键是确保对于给定的transactional.id来说,读-进程-写循环中的输入主题和分区总是相同的。如果这一点不正确,那么一些消息就有可能通过事务提供的栅栏泄漏。

例如,在一个分布式流处理应用中,假设topic-partition tp0最初是由transactional.id T0处理的。 如果在后来的某个时刻,它可以被映射到另一个具有transactional.id T1的生产者身上,那么T0和T1之间就没有围栏了。所以,tp0的消息有可能被重新处理,违反了精确一次处理的保证。

实际操作中,要么必须将输入分区和transactional.ids之间的映射存储在外部存储中,要么对其进行一些静态编码。Kafka Streams选择了后一种方式来解决这个问题。事务如何执行,以及如何调整它们

Apache Kafka中的交易

给TA打赏
共{{data.count}}人
人已打赏
未分类

分析服务查询副本。在主副本上只加载和处理单个数据模型而不从副本中删除?

2022-9-8 2:12:36

未分类

sed或awk:用之前出现的次数来代替。

2022-9-8 2:12:38

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索