FAQs for Stream Operation

This topic lists the advanced questions and answers for stream processing job operation.

问:在启动流数据处理任务之前,为什么需要先启动系统流任务?

答:系统流任务分为 Data Reader 和 Data Writer,其作用如下:

Data Reader

主要作用是过滤出流数据处理任务需要的数据输入点,数据从 Kafka Origin Topic 进入到 Internal Topic 时,例如 Origin Topic 里一共有100个测点,但流数据处理任务的计算逻辑只需要用到其中10个测点,通过 Data Reader 能大大减少不必要的数据计算,减少不必要的资源开销。

Data Writer

主要作用是过滤出流数据处理任务的输出点。在流数据处理过程中,原始数据输入点和经流数据处理任务计算后的输出点,都会进入到 Kafka Internal Topic 中(可用于后续流数据处理任务的数据输入点),所以需要 Data Writer 过滤出流数据处理任务的数据输出点。

问:如何估算通过模板配置的流数据处理任务需要的计算资源?

答:通过模板配置的流数据处理任务的性能,平台会单独给出。这里以系统流任务的 Data Reader 为例。通常情况下,Data Reader 在 1Core/2GB 的资源配置时,每秒能处理12000条记录。通过估算流数据处理任务每秒需要处理的数据量,就可估算出流数据处理任务需要的计算资源。


理论上,对于一个流数据处理任务,可通过 设备数 * 测点数 * 测点采样频率 计算出每秒需处理的数据量。但是,实际操作有一定难度,因为需要统计各个流数据处理任务用到的模型测点,并且不同测点的数据采样频率不同。


EnOS支持监控流数据处理任务输入的数据量。在数据接入比较稳定的情况下,可通过监控流数据处理任务对应的输入数据量在过去一段时间(例如7天)的平均每秒值,将其作为参考标准,同时基于业务的实际情况做调整。


如出现以下特殊情况,可基于实际情况做调整:

  • 在光伏领域,白天设备接入的数据比较多,晚上接入的数据很少。如果以平均值来评估输入记录数,可能导致白天的数据无法及时被处理。在这种情况下,需要对参考值做调整。例如白天的记录数为100条每秒,晚上的记录数为0条每秒,参考值就可能是50条每秒。但为了白天的数据能及时被处理,估算的记录数还是应以100条数据作为每秒输入的记录数。

  • 如果发生断网后恢复的情况,接入的数据量会大幅增长。为了不对关键流数据处理任务造成影响,可以针对这种情况预留一些缓冲量(对应估算出来的计算资源也预留了缓冲量)。例如光伏领域正常的配置为每秒输入100条数据,但为了应对断网恢复后造成的数据量增长,假设接入数据量变为了每秒500条,可以将500作为每秒输入记录数,来缓冲这种情况对系统带来影响,当然对应的资源使用就比较多。

问:如何估算自定义的流数据处理任务需要的计算资源?

答:通过算子自定义编排的流数据处理任务,因为每个算子的数据处理能力不同,需要通过以下测试估算出自定义的流数据处理任务需要的计算资源:

  1. 完成自定义流数据处理任务的配置(包括各种输入测点,方便后续 Data Reader 读取)。

  2. 启动 Data Reader(Data Reader会把自定义流数据处理任务中需要的测点写到 Kafka Internal Topic)。运行一段时间后,保证 Internal Topic 里有足够多的数据给自定义流数据处理任务使用,例如 Offset 达到 100万(需要有足够的数据量,保证测试效果的准确性)。

  3. 以 Standalone 模式(例如配置 2Core/4GB 的计算资源)启动自定义流数据处理任务,观察任务运行稳定后的消费速率。假设是每秒2000条记录,则基本可以认为该自定义流数据处理任务在 1Core/2GB 的资源配置下,数据处理的速率是每秒1000条。如果实际接入的输入数据是每秒4000条,那至少需要为流数据处理任务配置 4Core/8GB 的计算资源。

问:如何估算系统流任务运行需要的计算资源?

答:系统流任务(Data Reader 和 Data Writer)是以模版的方式提供的,用户流数据处理任务发布上线并完成运行配置后,系统已默认创建所需的系统流任务。平台已提供系统流任务在 Standalone 模式和集群模式不同资源规格下的性能数据。


对于系统流任务运行需要的计算资源,可参考估算通过模板配置的流数据处理任务需要的计算资源。

问:为流数据处理任务配置告警服务时,如何设置Lag告警阈值?

答:Lag是用于衡量流数据处理任务来不及处理的数据量,可以通过流数据处理任务的运行监控查询,并设置告警阈值。正常情况下,如果计算资源分配合理,输入数据会被及时处理,所以Lag一般是等于0或者接近0。


为流数据处理任务配置Lag告警阈值,主要是为了当任务运行出现问题时,能及早发现和处理问题,尽量减少对业务的影响。例如当流数据处理任务发生 Run Error 的情况,任务运行中止,接入数据就会积压。


建议将Lag阈值设置为:输入速率 * T(T为业务能接受的实时计算数据延迟时间,单位为秒)。


例如,输入速率 = 1000/s,T = 15min,则 Lag = 1000 * 15 * 60 = 900000。


对于业务能接受的实时计算数据延迟时间,可依据 SLA 中的约定或结合实际业务场景来判断,一般需要考虑以下影响因素:

  • 解决问题需要的时间

  • 重启任务需要的时间

  • 基于实时计算的结果可能还有后续的其他计算

  • 其他业务上的原因


延迟时间设得越小,越能够及时发现问题,但也容易发生误报,因为实时数据可能会有流量波动。从系统角度考虑,建议设置在 1 - 15min 的范围,但最终决定权还是由用户从业务角度来判断。

如何应对流量突涨和数据积压的情况?

答:实时流数据处理任务的流量一般是比较稳定的,因为接入IoT设备的测点一般都是按固定频率采样的。正常情况下,如果按照实际接入的数据流量来评估流数据处理任务运行所需的计算资源,在不预留缓冲量的情况下,如果数据流量突涨,就会导致数据积压。


以下两种情况可能导致数据流量突涨:

  • 新接入更多的设备:这种情况一般是可以预期的,建议提前做好流量规划和资源扩容,以应对新增的流量。

  • 非预期的流量突涨:例如断网后重连,这种情况下,因为断网导致一些数据都积压在Edge。网络恢复后积压的数据重新上送,就会对云端的实时计算造成流量突涨。


为了应对非预期的流量突涨,可以结合实际的业务场景和需求,从以下几个角度考虑解决方案:


将积压数据和实时数据分开上送

积压数据走离线通道,通过离线通道单独处理,这样就不影响实时通道的计算。


对Edge上送的数据做限流

积压数据和实时数据一起上送,如果云端实时计算能力有缓冲量,可以对Edge上送的数据做限流。这种情况下,积压的数据先上送,实时产生的数据再继续上送,但整体做了限流。例如云端实时计算能力是每秒能处理2000条记录,正常流量是每秒1000条记录,相当于预留了一倍的计算资源。如果积压的数据是10万条记录,则需要 100000/(2000-1000)=100s,才能将积压的数据处理完。


跳过积压数据,直接处理实时数据

如果不限流,积压数据和实时数据一起上送,则会在云端造成数据积压。如果云端实时计算能力没有缓冲量,可以选择跳过积压的数据,直接处理最新的数据。操作上需要先停止流数据处理任务,调用接口跳过中间数据,重启流数据处理任务,从最新的数据开始消费。这样操作的优点是不影响实时结果输出,缺点是因为跳过中间的数据,会导致部分计算结果不准确,用户需通过其他方式修正这部分数据。


紧急扩容

在计算资源有剩余和成本能接受的情况下,可以选择紧急扩容。操作上需要先停止流数据处理任务,扩容流数据处理任务所需的计算资源,重启流数据处理任务,继续消费数据。待积压数据处理完成后可以再做缩容处理。这样操作的优点是计算结果准确,缺点是资源和成本可能比较大,特别是积压的数据量很大时。


预留计算资源逐步处理积压数据

如果计算资源是有缓冲量的,且积压数据发生在云端,可通过预留的缓冲计算资源,逐渐处理积压的数据。这样操作的优点是计算准确,不需要额外操作;缺点是消化积压数据需要一段时间,可视积压数据量大小而定,也需要考虑预留的资源成本。