当前位置: 代码迷 >> 综合 >> org.apache.kafka.streams.errors.InvalidStateStoreException
  详细解决方案

org.apache.kafka.streams.errors.InvalidStateStoreException

热度:80   发布时间:2023-12-16 01:57:52.0

在使用kafka  streams 时使用了globalktable,需要用到statestore,在使用过程中出现如下异常:

Exception in thread "mywordcount-27abf25a-4a79-47ac-8114-86694871acee-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=student-grade, partition=0, offset=0at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store student-course-statestore is currently closed.at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:71)at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:156)at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:137)at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)... 5 more
Exception in thread "mywordcount-27abf25a-4a79-47ac-8114-86694871acee-GlobalStreamThread" org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250)at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {student-course-0=5}at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)... 1 more

原因是在连接kafka服务时没有配置statestore的存储路径。

解决方法:

prop.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\IT\\tool\\kafka-state-store");//设置状态仓库的存储路径
        

  相关解决方案