博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink – Stream Task执行过程
阅读量:7042 次
发布时间:2019-06-28

本文共 3185 字,大约阅读时间需要 10 分钟。

Task.run

if (invokable instanceof StatefulTask) {
StatefulTask op = (StatefulTask) invokable; op.setInitialState(taskStateHandles); }
// run the invokable invokable.invoke();
 

invokable是StreamTask

StreamTask.invoke

public final void invoke() throws Exception {
run();
}
 

StreamTask是抽象基类,比如,OneInputStreamTask

protected void run() throws Exception {        // cache processor reference on the stack, to make the code more JIT friendly        final StreamInputProcessor
inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }

 

StreamInputProcessor.processInput

StreamRecord
record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); }

可以看到在processElement之前,

streamOperator.setKeyContextElement1(record);
@SuppressWarnings({"unchecked", "rawtypes"})    public void setKeyContextElement1(StreamRecord record) throws Exception {        setKeyContextElement(record, stateKeySelector1);    }    private 
void setKeyContextElement(StreamRecord
record, KeySelector
selector) throws Exception { if (selector != null) { Object key = selector.getKey(record.getValue()); //通过KeySelector来生成key setCurrentKey(key); } } @SuppressWarnings({
"unchecked", "rawtypes"}) public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions @SuppressWarnings("unchecked,rawtypes") AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend; rawBackend.setCurrentKey(key); //调用state backend的setCurrentKey } catch (Exception e) { throw new RuntimeException("Exception occurred while setting the current key context.", e); } } } @SuppressWarnings({
"unchecked", "rawtypes"}) public Object getCurrentKey() { if (keyedStateBackend != null) { return keyedStateBackend.getCurrentKey(); //从state backend取出key } else { throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream."); } }

将key设到state backend中,

AbstractKeyedStateBackend
public void setCurrentKey(K newKey) {        this.currentKey = newKey;        this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);    }

 

 

OneInputStreamOperator.processElement

StreamSink实现OneInputStreamOperator接口

public class StreamSink
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
{ @Override public void processElement(StreamRecord
element) throws Exception { userFunction.invoke(element.getValue()); }

 

最终调用到SinkFunction的invoke

转载地址:http://bjxal.baihongyu.com/

你可能感兴趣的文章
使用Maven Assembly plugin将依赖打包进jar
查看>>
elasticsearch 基础性操作
查看>>
6个技巧加速你的gradle编译
查看>>
tp中使用事务
查看>>
spring mybatis配置
查看>>
UIButton 标题靠右
查看>>
【原创+亲测可用】JS如何区分微信浏览器、QQ浏览器和QQ内置浏览器
查看>>
磁盘阵列卡
查看>>
UDP打洞
查看>>
Realtime Search: Solr vs Elasticsearch
查看>>
Java开发Maven环境配置和介绍
查看>>
CentOS7 防火墙
查看>>
htmlparser实现从网页上抓取数据(收集)
查看>>
[原创]FineUI秘密花园(二十三) — 树控件概述
查看>>
motion的移植和使用
查看>>
JavaEE PayPal退款流程
查看>>
SpringBoot之logback配置文件
查看>>
css居中,vue
查看>>
Swift 网络库封装整理 MVC + MVVM 两种设计
查看>>
[译]ES6入门(第二部分)
查看>>