Flink – RocksDBStateBackend
如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的
有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩
但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的
所以taskmanager挂掉后,数据还是没了,
所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot
kv state需要由rockdb来管理,这是和内存或file backend最大的不同
AbstractRocksDBState
/**<br/>
* Base class for {@link State} implementations that store state in a RocksDB database.<br/>
*<br/>
* <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that<br/>
* the {@link RocksDBStateBackend} manages and checkpoints.<br/>
*<br/>
* @param <K> The type of the key.<br/>
* @param <N> The type of the namespace.<br/>
* @param <S> The type of {@link State}.<br/>
* @param <SD> The type of {@link StateDescriptor}.<br/>
*/<br/>
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>><br/>
implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
/** Serializer for the namespace */<br/>
private final TypeSerializer<N> namespaceSerializer;
/** The current namespace, which the next value methods will refer to */<br/>
private N currentNamespace;
/** Backend that holds the actual RocksDB instance where we store state */<br/>
protected RocksDBStateBackend backend;
/** The column family of this particular instance of state */<br/>
protected ColumnFamilyHandle columnFamily;
/**<br/>
* We disable writes to the write-ahead-log here.<br/>
*/<br/>
private final WriteOptions writeOptions;
/**<br/>
* Creates a new RocksDB backed state.<br/>
*<br/>
* @param namespaceSerializer The serializer for the namespace.<br/>
*/<br/>
protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,<br/>
TypeSerializer<N> namespaceSerializer,<br/>
RocksDBStateBackend backend) {
this.namespaceSerializer = namespaceSerializer;<br/>
this.backend = backend;
this.columnFamily = columnFamily;
writeOptions = new WriteOptions();<br/>
writeOptions.setDisableWAL(true);<br/>
}
@Override<br/>
public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,<br/>
long timestamp) throws Exception {<br/>
throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");<br/>
}<br/>
}
RocksDBValueState
/**<br/>
* {@link ValueState} implementation that stores state in RocksDB.<br/>
*<br/>
* @param <K> The type of the key.<br/>
* @param <N> The type of the namespace.<br/>
* @param <V> The type of value that the state state stores.<br/>
*/<br/>
public class RocksDBValueState<K, N, V><br/>
extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>><br/>
implements ValueState<V> {
@Override<br/>
public V value() {<br/>
ByteArrayOutputStream baos = new ByteArrayOutputStream();<br/>
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);<br/>
try {<br/>
writeKeyAndNamespace(out);<br/>
byte[] key = baos.toByteArray();<br/>
byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value<br/>
if (valueBytes == null) {<br/>
return stateDesc.getDefaultValue();<br/>
}<br/>
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));<br/>
} catch (IOException|RocksDBException e) {<br/>
throw new RuntimeException("Error while retrieving data from RocksDB.", e);<br/>
}<br/>
}
@Override<br/>
public void update(V value) throws IOException {<br/>
if (value == null) {<br/>
clear();<br/>
return;<br/>
}<br/>
ByteArrayOutputStream baos = new ByteArrayOutputStream();<br/>
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);<br/>
try {<br/>
writeKeyAndNamespace(out);<br/>
byte[] key = baos.toByteArray();<br/>
baos.reset();<br/>
valueSerializer.serialize(value, out);<br/>
backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db<br/>
} catch (Exception e) {<br/>
throw new RuntimeException("Error while adding data to RocksDB", e);<br/>
}<br/>
}<br/>
}
因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink – Working with State
RocksDBStateBackend
初始化过程,
/**<br/>
* A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can<br/>
* store very large state that exceeds memory and spills to disk.<br/>
*<br/>
* <p>All key/value state (including windows) is stored in the key/value index of RocksDB.<br/>
* For persistence against loss of machines, checkpoints take a snapshot of the<br/>
* RocksDB database, and persist that snapshot in a file system (by default) or<br/>
* another configurable state backend.<br/>
*<br/>
* <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options<br/>
* using the methods {@link #setPredefinedOptions(PredefinedOptions)} and<br/>
* {@link #setOptions(OptionsFactory)}.<br/>
*/<br/>
public class RocksDBStateBackend extends AbstractStateBackend {
// ------------------------------------------------------------------------<br/>
// Static configuration values<br/>
// ------------------------------------------------------------------------
/** The checkpoint directory that we copy the RocksDB backups to. */<br/>
private final Path checkpointDirectory;
/** The state backend that stores the non-partitioned state */<br/>
private final AbstractStateBackend nonPartitionedStateBackend;
/**<br/>
* Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}<br/>
* to store state. The different k/v states that we have don't each have their own RocksDB<br/>
* instance. They all write to this instance but to their own column family.<br/>
*/<br/>
protected volatile transient RocksDB db; //RocksDB实例
/**<br/>
* Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the<br/>
* file system and location defined by the given URI.<br/>
*<br/>
* <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system<br/>
* host and port in the URI, or have the Hadoop configuration that describes the file system<br/>
* (host / high-availability group / possibly credentials) either referenced from the Flink<br/>
* config, or included in the classpath.<br/>
*<br/>
* @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.<br/>
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.<br/>
*/<br/>
public RocksDBStateBackend(String checkpointDataUri) throws IOException {<br/>
this(new Path(checkpointDataUri).toUri());<br/>
}
/**<br/>
* Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the<br/>
* file system and location defined by the given URI.<br/>
*<br/>
* <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system<br/>
* host and port in the URI, or have the Hadoop configuration that describes the file system<br/>
* (host / high-availability group / possibly credentials) either referenced from the Flink<br/>
* config, or included in the classpath.<br/>
*<br/>
* @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.<br/>
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.<br/>
*/<br/>
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {<br/>
// creating the FsStateBackend automatically sanity checks the URI<br/>
FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot
this.nonPartitionedStateBackend = fsStateBackend;<br/>
this.checkpointDirectory = fsStateBackend.getBasePath();<br/>
}
// ------------------------------------------------------------------------<br/>
// State backend methods<br/>
// ------------------------------------------------------------------------
@Override<br/>
public void initializeForJob(<br/>
Environment env,<br/>
String operatorIdentifier,<br/>
TypeSerializer<?> keySerializer) throws Exception {
super.initializeForJob(env, operatorIdentifier, keySerializer);
this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
RocksDB.loadLibrary(); //初始化rockdb
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件<br/>
// RocksDB seems to need this...<br/>
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));<br/>
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);<br/>
try {<br/>
db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB<br/>
} catch (RocksDBException e) {<br/>
throw new RuntimeException("Error while opening RocksDB instance.", e);<br/>
}<br/>
}
snapshotPartitionedState
@Override<br/>
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {<br/>
if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {<br/>
return new HashMap<>();<br/>
}
if (fullyAsyncBackup) {<br/>
return performFullyAsyncSnapshot(checkpointId, timestamp);<br/>
} else {<br/>
return performSemiAsyncSnapshot(checkpointId, timestamp);<br/>
}<br/>
}
snapshot分为全异步和半异步两种,
半异步,
/**<br/>
* Performs a checkpoint by using the RocksDB backup feature to backup to a directory.<br/>
* This backup is the asynchronously copied to the final checkpoint location.<br/>
*/<br/>
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {<br/>
// We don't snapshot individual k/v states since everything is stored in a central<br/>
// RocksDB data base. Create a dummy KvStateSnapshot that holds the information about<br/>
// that checkpoint. We use the in injectKeyValueStateSnapshots to restore.
final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);<br/>
final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
long startTime = System.currentTimeMillis();
BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());<br/>
// we disabled the WAL<br/>
backupOptions.setBackupLogFiles(false);<br/>
// no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot<br/>
backupOptions.setSync(false); //设为异步
try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {<br/>
// wait before flush with "true"<br/>
backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘<br/>
}
long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时<br/>
LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
// draw a copy in case it get's changed while performing the async snapshot<br/>
List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();<br/>
for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {<br/>
kvStateInformationCopy.add(state.f1);<br/>
}<br/>
SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, //<br/>
backupUri,<br/>
kvStateInformationCopy,<br/>
checkpointId);
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();<br/>
result.put("dummy_state", dummySnapshot);<br/>
return result;<br/>
}
SemiAsyncSnapshot.materialize
@Override<br/>
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {<br/>
try {<br/>
long startTime = System.currentTimeMillis();<br/>
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); //从本地磁盘copy到hdfs<br/>
long endTime = System.currentTimeMillis();<br/>
LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");<br/>
return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);<br/>
} catch (Exception e) {<br/>
FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());<br/>
fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);<br/>
throw e;<br/>
} finally {<br/>
FileUtils.deleteQuietly(localBackupPath);<br/>
}<br/>
}
全异步
/**<br/>
* Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then<br/>
* iterating over all key/value pairs in RocksDB to store them in the final checkpoint<br/>
* location. The only synchronous part is the drawing of the {@code Snapshot} which<br/>
* is essentially free.<br/>
*/<br/>
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {<br/>
// we draw a snapshot from RocksDB then iterate over all keys at that point<br/>
// and store them in the backup location
final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
long startTime = System.currentTimeMillis();
org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘
long endTime = System.currentTimeMillis();<br/>
LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
// draw a copy in case it get's changed while performing the async snapshot<br/>
Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();<br/>
columnFamiliesCopy.putAll(kvStateInformation);<br/>
FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入<br/>
this,<br/>
backupUri,<br/>
columnFamiliesCopy,<br/>
checkpointId);
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();<br/>
result.put("dummy_state", dummySnapshot);<br/>
return result;<br/>
}
FullyAsyncSnapshot.materialize
可以看到需要自己去做db内容的序列化到文件的过程
@Override<br/>
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {<br/>
try {<br/>
long startTime = System.currentTimeMillis();
CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
outputView.writeInt(columnFamilies.size());
// we don't know how many key/value pairs there are in each column family.<br/>
// We prefix every written element with a byte that signifies to which<br/>
// column family it belongs, this way we can restore the column families<br/>
byte count = 0;<br/>
Map<String, Byte> columnFamilyMapping = new HashMap<>();<br/>
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {<br/>
columnFamilyMapping.put(column.getKey(), count);
outputView.writeByte(count);
ObjectOutputStream ooOut = new ObjectOutputStream(outputView);<br/>
ooOut.writeObject(column.getValue().f1);<br/>
ooOut.flush();
count++;<br/>
}
ReadOptions readOptions = new ReadOptions();<br/>
readOptions.setSnapshot(snapshot);
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {<br/>
byte columnByte = columnFamilyMapping.get(column.getKey());
synchronized (dbCleanupLock) {<br/>
if (db == null) {<br/>
throw new RuntimeException("RocksDB instance was disposed. This happens " +<br/>
"when we are in the middle of a checkpoint and the job fails.");<br/>
}<br/>
RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);<br/>
iterator.seekToFirst();<br/>
while (iterator.isValid()) {<br/>
outputView.writeByte(columnByte);<br/>
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),<br/>
outputView);<br/>
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),<br/>
outputView);<br/>
iterator.next();<br/>
}<br/>
}<br/>
}
StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
long endTime = System.currentTimeMillis();<br/>
LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");<br/>
return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);<br/>
} finally {<br/>
synchronized (dbCleanupLock) {<br/>
if (db != null) {<br/>
db.releaseSnapshot(snapshot);<br/>
}<br/>
}<br/>
snapshot = null;<br/>
}<br/>
}
CheckpointStateOutputView
backend.createCheckpointStateOutputView
public CheckpointStateOutputView createCheckpointStateOutputView(<br/>
long checkpointID, long timestamp) throws Exception {<br/>
return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));<br/>
}
关键createCheckpointStateOutputStream
RocksDBStateBackend
@Override<br/>
public CheckpointStateOutputStream createCheckpointStateOutputStream(<br/>
long checkpointID, long timestamp) throws Exception {
return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);<br/>
}
看看nonPartitionedStateBackend是什么?
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {<br/>
// creating the FsStateBackend automatically sanity checks the URI<br/>
FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
this.nonPartitionedStateBackend = fsStateBackend;<br/>
this.checkpointDirectory = fsStateBackend.getBasePath();<br/>
}
其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot
restoreState
@Override<br/>
public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {<br/>
if (keyValueStateSnapshots.size() == 0) {<br/>
return;<br/>
}
KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");<br/>
if (dummyState instanceof FinalSemiAsyncSnapshot) {<br/>
restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);<br/>
} else if (dummyState instanceof FinalFullyAsyncSnapshot) {<br/>
restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);<br/>
} else {<br/>
throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);<br/>
}<br/>
}
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
