如何使用Spring云流和Kafka绑定一个Store?

我想使用Kafka状态存储类型为 KeyValueStore 在一个使用Spring Cloud Stream的Kafka Binder的示例应用程序中,按照 文件,应该很简单.这是我的主类。

@SpringBootApplication
public class KafkaStreamTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamTestApplication.class, args);
    }

    @Bean
    public BiFunction<KStream<String, String>, KeyValueStore<String,String>, KStream<String, String>> process(){
        return (input,store) -> input.mapValues(v -> v.toUpperCase());

    }


    @Bean
    public StoreBuilder myStore() {
        return Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("my-store"), Serdes.String(),
                Serdes.String());
    }
}

我想KeyValueStore应该作为 “process “方法的第二个参数来传递,但是应用程序无法启动,出现了下面的消息。

Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.state.KeyValueStore among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:103) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1855) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1792) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]

解决方案:

我找到了关于如何使用读取一个存储的解决方案。单元测试 在Spring Cloud Stream中。

下面的代码是我如何将该解决方案应用到我的代码中的.变换器使用Spring bean方法提供的Store”myStore

@SpringBootApplication
public class KafkaStreamTestApplication {

    public static final String MY_STORE_NAME = "my-store";

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamTestApplication.class, args);
    }


    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process2(){
        return (input) -> input.
              transformValues(() -> new MyValueTransformer(), MY_STORE_NAME);

    }


    @Bean
    public StoreBuilder<?> myStore() {
        return Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(MY_STORE_NAME), Serdes.String(),
                Serdes.String());
    }

}
public class MyValueTransformer implements ValueTransformer<String, String> {

    private KeyValueStore<String,String> store;
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        store = (KeyValueStore<String, String>) this.context.getStateStore(KafkaStreamTestApplication.MY_STORE_NAME);

    }

    @Override
    public String transform(String value) {
        String tValue = store.get(value);

        if(tValue==null) {
            store.put(value, value.toUpperCase());
        }

        return tValue;
    }

    @Override
    public void close() {
        if(store!=null) {
            store.close();
        }
    }

}

本文来自投稿,不代表实战宝典立场,如若转载,请注明出处:https://www.shizhanbaodian.com/19132.html

(0)
上一篇 1天前
下一篇 1天前

相关推荐

发表评论

登录后才能评论