KAFKA KSTREAM - 使用带窗口的抽象处理器[英] Kafka KStream - using AbstractProcessor with a Window

问题描述

IM希望将窗口批次从KStre流组合在一起,并将其写入辅助商店.

我期待看到.punctuate()大约每30秒调用.改为我被保存了这里.

(原始文件是几千行长)

摘要 - .punctuate()似乎随机地被称为随机然后重复.它似乎匿名到通过 processOrcontext.schedule().


编辑:

每个相同的代码运行,每四分钟大约每四分钟呼叫到.punctuate().这次我没有看到疯狂的重复值.源没有变化 - 只是不同的结果.

使用以下代码:

main

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

处理器upplier

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

编辑:

要确保我的调试器并没有放慢这一点,我建立了它并在与我的Kafka进程相同的盒子上运行它.这次它甚至没有尝试滞后4分钟或更长时间 - 在几秒钟内输出伪呼叫到.punctuate().许多(大多数)这些没有干预呼叫.process().

推荐答案

更新:这部分答案适用于Kafka版本0.11或更早版本(对于Kafka 1.0及以后见下文)

在Kafka流中,点击基于流时间和 not 系统时间(aka处理时间).

每个默认值流时间是事件时间,即嵌入在Kafka中的时间戳记录自己.由于您未设置非默认TimestampExtractor(请参阅 http://docs.confluent.io/current/streams/developer-guide.html#optional-configurations-parameters ),对punctuate的调用仅取决于该过程关于您流程的记录的事件时间.因此,如果您需要多分钟处理记录的"30秒"(事件时间),则punctuate将被称为30秒(挂钟时间)...

这也可以解释您的不规则调用模式(即突发和长延迟).如果您的数据事件时间为"跳转",并且您的数据在主题中已完全可用,则Kafka Stream也在内部维护<强>流时间中"跳转".

我会假设,您可以使用WallclockTimestampExtractor来解决您的问题(请参阅 http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor

还有一个提到的东西:流时间如果处理数据,则只有高级提前 - 如果您的应用程序到达输入主题的末尾并等待数据,则不会调用punctuate .即使您使用WallclockTimestampExtractor,也适用.

btw:目前有关于Streams的标点符号行为的讨论: https://github .com/apache/kafka/pull/1689

Kafka 1.0及更高版本

的答案 自从Kafka 1.0以来,可以基于挂钟时间或事件时间注册标点: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

其他推荐答案

刚完成读取答案这个问题我认为也是如此.它的主旨是:

  1. 流消费者执行记录的民意调查
  2. 所有返回的记录都完全处理.
  3. 然后按配置的延迟安排点标回调.

点是标点不存在的是固定的时间间隔事件,#2的时间变化将导致标点的执行期间的等效变化.

....但是阅读那个链接,他比我更好.

其他推荐答案

确定 - 我认为这是Kafka中的一个错误.

这就是:

在我的原始测试中,我使用单个机器来运行生产者和 consumer .我会运行生产者几分钟才能生成一些测试数据,然后运行我的测试.这将为我最初发布的奇怪输出.

然后我决定将制作人推到后台和离开它运行.现在,我在呼叫到.punctuate()之间看到100%完美的30秒间隔.没有更多的问题.

换句话说 - 如果kafka服务器没有处理任何入站数据,那么它似乎与运行 kstreams 流程似乎一致.

本文地址:https://www.itbaoku.cn/post/978422.html