Flink – state
public class StreamTaskState implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
private StateHandle<?> operatorState;
private StateHandle<Serializable> functionState;
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
Flink中state分为三种,
可以看到,StreamTaskState是对三种state的封装,
1. KVState
是最基本的state,
抽象是一对,KvState和KvStateSnapshot
通过两个接口,互相转化
/**<br/>
* Key/Value state implementation for user-defined state. The state is backed by a state<br/>
* backend, which typically follows one of the following patterns: Either the state is stored<br/>
* in the key/value state object directly (meaning in the executing JVM) and snapshotted by the<br/>
* state backend into some store (during checkpoints), or the key/value state is in fact backed<br/>
* by an external key/value store as the state backend, and checkpoints merely record the<br/>
* metadata of what is considered part of the checkpoint.<br/>
*<br/>
* @param <K> The type of the key.<br/>
* @param <N> The type of the namespace.<br/>
* @param <S> The type of {@link State} this {@code KvState} holds.<br/>
* @param <SD> The type of the {@link StateDescriptor} for state {@code S}.<br/>
* @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.<br/>
*/<br/>
public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {
/**<br/>
* Sets the current key, which will be used when using the state access methods.<br/>
*<br/>
* @param key The key.<br/>
*/<br/>
void setCurrentKey(K key);
/**<br/>
* Sets the current namespace, which will be used when using the state access methods.<br/>
*<br/>
* @param namespace The namespace.<br/>
*/<br/>
void setCurrentNamespace(N namespace);
/**<br/>
* Creates a snapshot of this state.<br/>
*<br/>
* @param checkpointId The ID of the checkpoint for which the snapshot should be created.<br/>
* @param timestamp The timestamp of the checkpoint.<br/>
* @return A snapshot handle for this key/value state.<br/>
*<br/>
* @throws Exception Exceptions during snapshotting the state should be forwarded, so the system<br/>
* can react to failed snapshots.<br/>
*/<br/>
KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
/**<br/>
* Disposes the key/value state, releasing all occupied resources.<br/>
*/<br/>
void dispose();<br/>
}
定义也比较简单,关键是snapshot接口,产生KvStateSnapshot
public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend><br/>
extends StateObject {
/**<br/>
* Loads the key/value state back from this snapshot.<br/>
*<br/>
* @param stateBackend The state backend that created this snapshot and can restore the key/value state<br/>
* from this snapshot.<br/>
* @param keySerializer The serializer for the keys.<br/>
* @param classLoader The class loader for user-defined types.<br/>
*<br/>
* @return An instance of the key/value state loaded from this snapshot.<br/>
*<br/>
* @throws Exception Exceptions can occur during the state loading and are forwarded.<br/>
*/<br/>
KvState<K, N, S, SD, Backend> restoreState(<br/>
Backend stateBackend,<br/>
TypeSerializer<K> keySerializer,<br/>
ClassLoader classLoader) throws Exception;<br/>
}
KvStateSnapshot,对应于KvState,关键是restoreState接口
以具体的,FsState为例,
public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>><br/> extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
可以看到AbstractFsState是继承AbstractHeapState的,因为对于FsState的状态也是cache在Heap中的,只是在snapshot的时候需要写文件
所以先看下AbstractHeapState,
/**<br/>
* Base class for partitioned {@link ListState} implementations that are backed by a regular<br/>
* heap hash map. The concrete implementations define how the state is checkpointed.<br/>
*<br/>
* @param <K> The type of the key.<br/>
* @param <N> The type of the namespace.<br/>
* @param <SV> The type of the values in the state.<br/>
* @param <S> The type of State<br/>
* @param <SD> The type of StateDescriptor for the State S<br/>
* @param <Backend> The type of the backend that snapshots this key/value state.<br/>
*/<br/>
public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend><br/>
implements KvState<K, N, S, SD, Backend>, State {
/** Map containing the actual key/value pairs */<br/>
protected final HashMap<N, Map<K, SV>> state; //可以看到这里,多了个namespace的概念,避免key太容易重复
/** Serializer for the state value. The state value could be a List<V>, for example. */<br/>
protected final TypeSerializer<SV> stateSerializer;
/** The serializer for the keys */<br/>
protected final TypeSerializer<K> keySerializer;
/** The serializer for the namespace */<br/>
protected final TypeSerializer<N> namespaceSerializer;
/** This holds the name of the state and can create an initial default value for the state. */<br/>
protected final SD stateDesc; //StateDescriptor,用于放一些state的信息,比如default值
/** The current key, which the next value methods will refer to */<br/>
protected K currentKey;
/** The current namespace, which the access methods will refer to. */<br/>
protected N currentNamespace = null;
/** Cache the state map for the current key. */<br/>
protected Map<K, SV> currentNSState;
/**<br/>
* Creates a new empty key/value state.<br/>
*<br/>
* @param keySerializer The serializer for the keys.<br/>
* @param namespaceSerializer The serializer for the namespace.<br/>
* @param stateDesc The state identifier for the state. This contains name<br/>
* and can create a default state value.<br/>
*/<br/>
protected AbstractHeapState(TypeSerializer<K> keySerializer,<br/>
TypeSerializer<N> namespaceSerializer,<br/>
TypeSerializer<SV> stateSerializer,<br/>
SD stateDesc) {<br/>
this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>());<br/>
}
AbstractFsState
public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>><br/>
extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
/** The file system state backend backing snapshots of this state */<br/>
private final FsStateBackend backend;
public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath); //
@Override<br/>
public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //
// serialize the state to the output stream<br/>
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));<br/>
outView.writeInt(state.size());<br/>
for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {<br/>
N namespace = namespaceState.getKey();<br/>
namespaceSerializer.serialize(namespace, outView);<br/>
outView.writeInt(namespaceState.getValue().size());<br/>
for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {<br/>
keySerializer.serialize(entry.getKey(), outView);<br/>
stateSerializer.serialize(entry.getValue(), outView);<br/>
}<br/>
}<br/>
outView.flush(); //真实的内容是刷到文件的
// create a handle to the state<br/>
return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path<br/>
}<br/>
}<br/>
}
对于kv state,也分为好几类,valuestate,liststate,reducestate,foldstate,
简单起见,先看valuestate
public class FsValueState<K, N, V><br/>
extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>><br/>
implements ValueState<V> {
@Override<br/>
public V value() {<br/>
if (currentNSState == null) {<br/>
currentNSState = state.get(currentNamespace); //现初始化当前namespace的kv<br/>
}<br/>
if (currentNSState != null) {<br/>
V value = currentNSState.get(currentKey);<br/>
return value != null ? value : stateDesc.getDefaultValue(); //取出value,如果为null,从stateDesc中取出default<br/>
}<br/>
return stateDesc.getDefaultValue();<br/>
}
@Override<br/>
public void update(V value) {<br/>
if (currentKey == null) {<br/>
throw new RuntimeException("No key available.");<br/>
}
if (value == null) {<br/>
clear();<br/>
return;<br/>
}
if (currentNSState == null) {<br/>
currentNSState = new HashMap<>();<br/>
state.put(currentNamespace, currentNSState);<br/>
}
currentNSState.put(currentKey, value); //更新<br/>
}
@Override<br/>
public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {<br/>
return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); //以文件路径,创建snapshot<br/>
}
继续看FsStateSnapshot
public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>><br/>
extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap); //
@Override<br/>
public KvState<K, N, S, SD, FsStateBackend> restoreState(<br/>
FsStateBackend stateBackend,<br/>
final TypeSerializer<K> keySerializer,<br/>
ClassLoader classLoader) throws Exception {
// state restore<br/>
ensureNotClosed();
try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {<br/>
// make sure the in-progress restore from the handle can be closed<br/>
registerCloseable(inStream);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
final int numKeys = inView.readInt();<br/>
HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
for (int i = 0; i < numKeys; i++) {<br/>
N namespace = namespaceSerializer.deserialize(inView);<br/>
final int numValues = inView.readInt();<br/>
Map<K, SV> namespaceMap = new HashMap<>(numValues);<br/>
stateMap.put(namespace, namespaceMap);<br/>
for (int j = 0; j < numValues; j++) {<br/>
K key = keySerializer.deserialize(inView);<br/>
SV value = stateSerializer.deserialize(inView);<br/>
namespaceMap.put(key, value);<br/>
}<br/>
}
return createFsState(stateBackend, stateMap); //<br/>
}<br/>
catch (Exception e) {<br/>
throw new Exception("Failed to restore state from file system", e);<br/>
}<br/>
}<br/>
}
FsValueState内部实现的snapshot
public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {<br/>
private static final long serialVersionUID = 1L;
public Snapshot(TypeSerializer<K> keySerializer,<br/>
TypeSerializer<N> namespaceSerializer,<br/>
TypeSerializer<V> stateSerializer,<br/>
ValueStateDescriptor<V> stateDescs,<br/>
Path filePath) {<br/>
super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);<br/>
}
@Override<br/>
public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {<br/>
return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);<br/>
}<br/>
}
2. FunctionState
stateHandle对于KvState,更为通用一些
/**<br/>
* StateHandle is a general handle interface meant to abstract operator state fetching.<br/>
* A StateHandle implementation can for example include the state itself in cases where the state<br/>
* is lightweight or fetching it lazily from some external storage when the state is too large.<br/>
*/<br/>
public interface StateHandle<T> extends StateObject {
/**<br/>
* This retrieves and return the state represented by the handle.<br/>
*<br/>
* @param userCodeClassLoader Class loader for deserializing user code specific classes<br/>
*<br/>
* @return The state represented by the handle.<br/>
* @throws java.lang.Exception Thrown, if the state cannot be fetched.<br/>
*/<br/>
T getState(ClassLoader userCodeClassLoader) throws Exception;<br/>
}
3. OperatorState,典型的是windowOperater的状态
OperatorState,也是用StateHandle作为,snapshot的抽象
看下这三种State如何做snapshot的
AbstractStreamOperator,看看和checkpoint相关的接口,可以看到只会snapshot KvState
@Override<br/>
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {<br/>
// here, we deal with key/value state snapshots
StreamTaskState state = new StreamTaskState();
if (stateBackend != null) {<br/>
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =<br/>
stateBackend.snapshotPartitionedState(checkpointId, timestamp);<br/>
if (partitionedSnapshots != null) {<br/>
state.setKvStates(partitionedSnapshots);<br/>
}<br/>
}
return state;<br/>
}
@Override<br/>
@SuppressWarnings("rawtypes,unchecked")<br/>
public void restoreState(StreamTaskState state) throws Exception {<br/>
// restore the key/value state. the actual restore happens lazily, when the function requests<br/>
// the state again, because the restore method needs information provided by the user function<br/>
if (stateBackend != null) {<br/>
stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());<br/>
}<br/>
}
@Override<br/>
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {<br/>
if (stateBackend != null) {<br/>
stateBackend.notifyOfCompletedCheckpoint(checkpointId);<br/>
}<br/>
}
AbstractUdfStreamOperator
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT>
这个首先继承了AbstractStreamOperator,看下checkpoint相关的接口,
@Override<br/>
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {<br/>
StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); //先执行super的snapshotOperatorState,即Kv state的snapshot
if (userFunction instanceof Checkpointed) {<br/>
@SuppressWarnings("unchecked")<br/>
Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
Serializable udfState;<br/>
try {<br/>
udfState = chkFunction.snapshotState(checkpointId, timestamp); //snapshot,function的状态<br/>
}<br/>
catch (Exception e) {<br/>
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);<br/>
}
if (udfState != null) {<br/>
try {<br/>
AbstractStateBackend stateBackend = getStateBackend();<br/>
StateHandle<Serializable> handle =<br/>
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); //调用stateBackend存储state,并返回snapshot<br/>
state.setFunctionState(handle);<br/>
}<br/>
catch (Exception e) {<br/>
throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "<br/>
+ e.getMessage(), e);<br/>
}<br/>
}<br/>
}
return state;<br/>
}
@Override<br/>
public void restoreState(StreamTaskState state) throws Exception {<br/>
super.restoreState(state);
StateHandle<Serializable> stateHandle = state.getFunctionState();
if (userFunction instanceof Checkpointed && stateHandle != null) {<br/>
@SuppressWarnings("unchecked")<br/>
Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
Serializable functionState = stateHandle.getState(getUserCodeClassloader());<br/>
if (functionState != null) {<br/>
try {<br/>
chkFunction.restoreState(functionState);<br/>
}<br/>
catch (Exception e) {<br/>
throw new Exception("Failed to restore state to function: " + e.getMessage(), e);<br/>
}<br/>
}<br/>
}<br/>
}
@Override<br/>
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {<br/>
super.notifyOfCompletedCheckpoint(checkpointId);
if (userFunction instanceof CheckpointListener) {<br/>
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);<br/>
}<br/>
}
可以看到这个operater,会snapshot kv state,和udf中的function的state
WindowOperator,典型的operater state
public class WindowOperator<K, IN, ACC, OUT, W extends Window><br/> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>><br/> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
if (mergingWindowsByKey != null) {<br/>
TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );<br/>
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);<br/>
for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {<br/>
setKeyContext(key.getKey());<br/>
ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);<br/>
mergeState.clear();<br/>
key.getValue().persist(mergeState);<br/>
}<br/>
}
StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
AbstractStateBackend.CheckpointStateOutputView out =<br/>
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
snapshotTimers(out);
taskState.setOperatorState(out.closeAndGetHandle());
return taskState;<br/>
}
@Override<br/>
public void restoreState(StreamTaskState taskState) throws Exception {<br/>
super.restoreState(taskState);
final ClassLoader userClassloader = getUserCodeClassloader();
@SuppressWarnings("unchecked")<br/>
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();<br/>
DataInputView in = inputState.getState(userClassloader);
restoreTimers(in);<br/>
}
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
