在项目中使用到了cleanUp()及state store,可能会出现下面这种异常:
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\IT\tool\kafka-state-store\mywindowcount\0_0at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:234)at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:950)at teststreams.WindowCount.main(WindowCount.java:79)
Caused by: java.nio.file.DirectoryNotEmptyException: C:\IT\tool\kafka-state-store\mywindowcount\0_0at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source)at sun.nio.fs.AbstractFileSystemProvider.delete(Unknown Source)at java.nio.file.Files.delete(Unknown Source)at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:763)at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:746)at java.nio.file.Files.walkFileTree(Unknown Source)at java.nio.file.Files.walkFileTree(Unknown Source)at org.apache.kafka.common.utils.Utils.delete(Utils.java:746)at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:290)at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231)... 2 more
这是因为在每次程序运行时都会产生state store文件,所以下次再启动时需要将配置的STATE_DIR_CONFIG路径下的该application_id下上次执行时生成的文件删掉。
程序执行时生成的文件: