Task.run
if (invokable instanceof StatefulTask) { StatefulTask op = (StatefulTask) invokable; op.setInitialState(taskStateHandles); }
// run the invokable invokable.invoke();
invokable是StreamTask
StreamTask.invoke
public final void invoke() throws Exception {
run();
}
StreamTask是抽象基类,比如,OneInputStreamTask
protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessorinputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
StreamInputProcessor.processInput
StreamRecordrecord = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); }
可以看到在processElement之前,
streamOperator.setKeyContextElement1(record);
@SuppressWarnings({"unchecked", "rawtypes"}) public void setKeyContextElement1(StreamRecord record) throws Exception { setKeyContextElement(record, stateKeySelector1); } privatevoid setKeyContextElement(StreamRecord record, KeySelector selector) throws Exception { if (selector != null) { Object key = selector.getKey(record.getValue()); //通过KeySelector来生成key setCurrentKey(key); } } @SuppressWarnings({ "unchecked", "rawtypes"}) public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions @SuppressWarnings("unchecked,rawtypes") AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend; rawBackend.setCurrentKey(key); //调用state backend的setCurrentKey } catch (Exception e) { throw new RuntimeException("Exception occurred while setting the current key context.", e); } } } @SuppressWarnings({ "unchecked", "rawtypes"}) public Object getCurrentKey() { if (keyedStateBackend != null) { return keyedStateBackend.getCurrentKey(); //从state backend取出key } else { throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream."); } }
将key设到state backend中,
AbstractKeyedStateBackend
public void setCurrentKey(K newKey) { this.currentKey = newKey; this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups); }
OneInputStreamOperator.processElement
StreamSink实现OneInputStreamOperator接口
public class StreamSinkextends AbstractUdfStreamOperator
最终调用到SinkFunction的invoke