会话窗口 flink
Posted
技术标签:
【中文标题】会话窗口 flink【英文标题】:Session windows flink 【发布时间】:2020-06-13 08:59:33 【问题描述】:有人可以帮我了解flink中的窗口(会话)何时以及如何发生吗?或者样品是如何处理的?
例如,如果我有连续的事件流流入,事件是应用程序中的请求和应用程序提供的响应。 作为 flink 处理的一部分,我们需要了解处理请求需要多少时间。
我知道有时间翻滚窗口每 n 秒触发一次,这是配置的,一旦时间流逝,该时间窗口中的所有事件将被聚合。
例如: 假设定义的时间窗口是 30 秒,如果一个事件在 t 时间到达,另一个在 t+30 到达,那么两者都将被处理,但在 t+31 到达的事件将被忽略。
如果我说的上述说法不正确,请更正。
上面的问题是:如果说一个事件在t时间到达,另一个事件在t+3时间到达,是否还要等待整整30秒才能汇总并最终确定结果?
现在在会话窗口的情况下,这是如何工作的?如果事件是单独处理的,并且在反序列化时将代理时间戳用作单个事件的 session_id,那么将为每个事件创建会话窗口吗?如果是,那么我们是否需要区别对待请求和响应事件,因为如果我们不这样做,那么响应事件不会有自己的会话窗口吗?
我将尝试在短时间内发布我正在使用的示例(在 java 中),但以上几点的任何输入都会有所帮助!
处理函数
DTO:
public class IncomingEvent
private String id;
private String eventId;
private Date timestamp;
private String component;
//getters and setters
public class FinalOutPutEvent
private String id;
private long timeTaken;
//getters and setters
================================================ 传入事件的反序列化:
公共类 IncomingEventDeserializationScheme 实现 KafkaDeserializationSchema
private ObjectMapper mapper;
public IncomingEventDeserializationScheme(ObjectMapper mapper)
this.mapper = mapper;
@Override
public TypeInformation<IncomingEvent> getProducedType()
return TypeInformation.of(IncomingEvent.class);
@Override
public boolean isEndOfStream(IncomingEvent nextElement)
return false;
@Override
public IncomingEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception
if (record.value() == null)
return null;
try
IncomingEvent event = mapper.readValue(record.value(), IncomingEvent.class);
if(event != null)
new SessionWindow(record.timestamp());
event.setOffset(record.offset());
event.setTopic(record.topic());
event.setPartition(record.partition());
event.setBrokerTimestamp(record.timestamp());
return event;
catch (Exception e)
return null;
================================================
主要逻辑
public class MyEventJob
private static final ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) throws Exception
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MyEventJob eventJob = new MyEventJob();
InputStream inStream = eventJob.getFileFromResources("myConfig.properties");
ParameterTool parameter = ParameterTool.fromPropertiesFile(inStream);
Properties properties = parameter.getProperties();
Integer timePeriodBetweenEvents = 120;
String outWardTopicHostedOnServer = localhost:9092";
DataStreamSource<IncomingEvent> stream = env.addSource(new FlinkKafkaConsumer<>("my-input-topic", new IncomingEventDeserializationScheme(mapper), properties));
SingleOutputStreamOperator<IncomingEvent> filteredStream = stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<IncomingEvent>()
long eventTime;
@Override
public long extractTimestamp(IncomingEvent element, long previousElementTimestamp)
return element.getTimestamp();
@Override
public Watermark getCurrentWatermark()
return new Watermark(eventTime);
)
.map(e -> e.setId(e.getEventId()); return e; );
SingleOutputStreamOperator<FinalOutPutEvent> correlatedStream = filteredStream
.keyBy(new KeySelector<IncomingEvent, String> ()
@Override
public String getKey(@Nonnull IncomingEvent input) throws Exception
return input.getId();
)
.window(GlobalWindows.create()).allowedLateness(Time.seconds(defaultSliceTimePeriod))
.trigger( new Trigger<IncomingEvent, Window> ()
private final long sessionTimeOut;
public SessionTrigger(long sessionTimeOut)
this.sessionTimeOut = sessionTimeOut;
@Override
public TriggerResult onElement(IncomingEvent element, long timestamp, Window window, TriggerContext ctx)
throws Exception
ctx.registerProcessingTimeTimer(timestamp + sessionTimeOut);
return TriggerResult.CONTINUE;
@Override
public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception
return TriggerResult.FIRE_AND_PURGE;
@Override
public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception
return TriggerResult.CONTINUE;
@Override
public void clear(Window window, TriggerContext ctx) throws Exception
//check the clear method implementation
)
.process(new ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>()
@Override
public void process(String arg0,
ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>.Context arg1,
Iterable<IncomingEvent> input, Collector<FinalOutPutEvent> out) throws Exception
List<IncomingEvent> eventsIn = new ArrayList<>();
input.forEach(eventsIn::add);
if(eventsIn.size() == 1)
//Logic to handle incomplete request/response events
else if (eventsIn.size() == 2)
//Logic to handle the complete request/response and how much time it took
);
FlinkKafkaProducer<FinalOutPutEvent> kafkaProducer = new FlinkKafkaProducer<>(
outWardTopicHostedOnServer, // broker list
"target-topic", // target topic
new EventSerializationScheme(mapper));
correlatedStream.addSink(kafkaProducer);
env.execute("Streaming");
谢谢 维姬
【问题讨论】:
我已经回答了理论问题,虽然我不太了解您的请求-响应窗口问题,所以如果您发布一些示例,我将编辑我的回答。 你所说的关于 Flink 的 windows 的一些内容是不正确的。但也不清楚为什么要使用窗口。您能解释一下为什么将会话窗口视为您的用例的解决方案吗?您打算如何匹配请求和响应事件? @david:感谢您在我的查询中投入时间。我认为我需要使用会话窗口的原因是我不确定请求和响应事件是否会落在同一个窗口中。例如,我的请求和响应可能相差 3 秒。但是我的请求在 t+29 秒到达,而我的响应在 t+32 秒到达。现在,我可能需要做的是计算两个事件之间的增量,但正如您所见,它们落在两个不同的窗口中,这基本上意味着我将无法处理它们。请建议现在是否有意义! @dominik:感谢您的回复!我将尽快粘贴一个示例,这可能可以帮助我进一步澄清疑问! 尚不清楚您为什么要考虑使用 Windows。这似乎是 RichFlatMap 或 ProcessFunction 可能会容易得多的情况。但真正的谜团是如何识别特定请求事件的响应事件?如果您能解释一下,我们可以向您展示如何测量它们之间的延迟。他们共享一个 sessionId 吗? 【参考方案1】:根据您的描述,我认为您想编写一个自定义的ProcessFunction,由session_id
键入。您将拥有一个ValueState
,用于存储请求事件的时间戳。当您获得相应的响应事件时,您计算增量并发出(使用session_id
)并清除状态。
您可能还想在收到请求事件时设置一个计时器,这样如果您在安全/长时间内没有收到响应事件,您可以发出失败请求的侧面输出。
【讨论】:
我添加了一个带有进程窗口的示例代码。目的是能够聚合请求和响应事件。【参考方案2】:因此,使用默认触发器,每个窗口都会在时间完全过去后完成。取决于您使用的是EventTime
还是ProcessingTime
,这可能意味着不同的事情,但一般来说,Flink 将始终等待窗口关闭,然后再完全处理。在您的情况下,t+31 的事件将简单地转到另一个窗口。
对于会话窗口,它们也是窗口,这意味着最终它们只是聚合时间戳之间的差异低于定义的间隙的样本。在内部,这比普通窗口更复杂,因为它们没有定义开始和结束。会话窗口操作员获取样本并为每个单独的样本创建一个新窗口。然后,操作员验证新创建的窗口是否可以与已经存在的窗口合并(即它们的时间戳是否比间隙更近)并合并它们。这最终导致窗口的所有元素的时间戳比定义的间隙更接近。
【讨论】:
【参考方案3】:你让这变得比它需要的更复杂。下面的示例需要进行一些调整,但希望能够传达如何使用KeyedProcessFunction
而不是会话窗口的想法。
另外,BoundedOutOfOrdernessTimestampExtractor
的构造函数需要传递一个Time maxOutOfOrderness
。不知道为什么要使用忽略 maxOutOfOrderness
的实现来覆盖其 getCurrentWatermark
方法。
public static void main(String[] args) throws Exception
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = ...
events
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks(OUT_OF_ORDERNESS))
.keyBy(e -> e.sessionId)
.process(new RequestReponse())
...
public static class RequestReponse extends KeyedProcessFunction<KEY, Event, Long>
private ValueState<Long> requestTimeState;
@Override
public void open(Configuration config)
ValueStateDescriptor<Event> descriptor = new ValueStateDescriptor<>(
"request time", Long.class);
requestState = getRuntimeContext().getState(descriptor);
@Override
public void processElement(Event event, Context context, Collector<Long> out) throws Exception
TimerService timerService = context.timerService();
Long requestedAt = requestTimeState.value();
if (requestedAt == null)
// haven't seen the request before; save its timestamp
requestTimeState.update(event.timestamp);
timerService.registerEventTimeTimer(event.timestamp + TIMEOUT);
else
// this event is the response
// emit the time elapsed between request and response
out.collect(event.timestamp - requestedAt);
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws Exception
//handle incomplete request/response events
public static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event>
public TimestampsAndWatermarks(Time t)
super(t);
@Override
public long extractTimestamp(Event event)
return event.eventTime;
【讨论】:
感谢@david .. 很抱歉回复晚了。我被卷入了不同的事情。我很欣赏你的回应和例子。肯定会尝试一次,然后拿回我的发现..以上是关于会话窗口 flink的主要内容,如果未能解决你的问题,请参考以下文章