Flink 1.1 – ResourceManager
Flink resource manager的作用如图,

FlinkResourceManager
/**<br/>
*<br/>
* <h1>Worker allocation steps</h1><br/>
*<br/>
* <ol><br/>
* <li>The resource manager decides to request more workers. This can happen in order<br/>
* to fill the initial pool, or as a result of the JobManager requesting more workers.</li><br/>
*<br/>
* <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests<br/>
* for more containers. After that, the {@link #getNumWorkerRequestsPending()}<br/>
* should reflect the pending requests.</li><br/>
*<br/>
* <li>The concrete framework may acquire containers and then trigger to start TaskManagers<br/>
* in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li><br/>
*<br/>
* <li>At some point, the TaskManager processes will have started and send a registration<br/>
* message to the JobManager. The JobManager will perform<br/>
* a lookup with the ResourceManager to check if it really started this TaskManager.<br/>
* The method {@link #workerStarted(ResourceID)} will be called<br/>
* to inform about a registered worker.</li><br/>
* </ol><br/>
*<br/>
*/<br/>
public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor {
/** The service to find the right leader JobManager (to support high availability) */<br/>
private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知
/** Map which contains the workers from which we know that they have been successfully started<br/>
* in a container. This notification is sent by the JM when a TM tries to register at it. */<br/>
private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知
/** The JobManager that the framework master manages resources for */<br/>
private ActorRef jobManager;
/** Our JobManager's leader session */<br/>
private UUID leaderSessionID;
/** The size of the worker pool that the resource master strives to maintain */<br/>
private int designatedPoolSize; //resource pool大小
上面注释里面,把申请resource的过程写的蛮清楚的
ResourceManager作为actor, 主要是处理message,
@Override<br/>
protected void handleMessage(Object message) {<br/>
try {<br/>
// --- messages about worker allocation and pool sizes
if (message instanceof CheckAndAllocateContainers) {<br/>
checkWorkersPool();<br/>
}<br/>
else if (message instanceof SetWorkerPoolSize) {<br/>
SetWorkerPoolSize msg = (SetWorkerPoolSize) message;<br/>
adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());<br/>
}<br/>
else if (message instanceof RemoveResource) {<br/>
RemoveResource msg = (RemoveResource) message;<br/>
removeRegisteredResource(msg.resourceId());<br/>
}
// --- lookup of registered resources
else if (message instanceof NotifyResourceStarted) {<br/>
NotifyResourceStarted msg = (NotifyResourceStarted) message;<br/>
handleResourceStarted(sender(), msg.getResourceID());<br/>
}
// --- messages about JobManager leader status and registration
else if (message instanceof NewLeaderAvailable) {<br/>
NewLeaderAvailable msg = (NewLeaderAvailable) message;<br/>
newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());<br/>
}<br/>
else if (message instanceof TriggerRegistrationAtJobManager) {<br/>
TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;<br/>
triggerConnectingToJobManager(msg.jobManagerAddress());<br/>
}<br/>
else if (message instanceof RegisterResourceManagerSuccessful) {<br/>
RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;<br/>
jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());<br/>
}
其中关键的是,
checkWorkersPool
/**<br/>
* This method causes the resource framework master to <b>synchronously</b>re-examine<br/>
* the set of available and pending workers containers, and allocate containers<br/>
* if needed.<br/>
*<br/>
* This method does not automatically release workers, because it is not visible to<br/>
* this resource master which workers can be released. Instead, the JobManager must<br/>
* explicitly release individual workers.<br/>
*/<br/>
private void checkWorkersPool() {<br/>
int numWorkersPending = getNumWorkerRequestsPending();<br/>
int numWorkersPendingRegistration = getNumWorkersPendingRegistration();
// see how many workers we want, and whether we have enough<br/>
int allAvailableAndPending = startedWorkers.size() +<br/>
numWorkersPending + numWorkersPendingRegistration;
int missing = designatedPoolSize - allAvailableAndPending;
if (missing > 0) {<br/>
requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker<br/>
}<br/>
}
job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted
/**<br/>
* Tells the ResourceManager that a TaskManager had been started in a container with the given<br/>
* resource id.<br/>
*<br/>
* @param jobManager The sender (JobManager) of the message<br/>
* @param resourceID The resource id of the started TaskManager<br/>
*/<br/>
private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {<br/>
if (resourceID != null) {<br/>
// check if resourceID is already registered (TaskManager may send duplicate register messages)<br/>
WorkerType oldWorker = startedWorkers.get(resourceID);<br/>
if (oldWorker != null) { //看看该worker是否已经存在<br/>
LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);<br/>
} else {<br/>
WorkerType newWorker = workerStarted(resourceID); //取得worker
if (newWorker != null) {<br/>
startedWorkers.put(resourceID, newWorker); //注册新的worker<br/>
LOG.info("TaskManager {} has started.", resourceID);<br/>
} else {<br/>
LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);<br/>
}<br/>
}<br/>
}
// Acknowledge the resource registration<br/>
jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册<br/>
}
Job资源分配的过程,
在submitJob中,会生成ExecutionGraph
最终调用到,
executionGraph.scheduleForExecution(scheduler)
接着,ExecutionGraph
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.<br/>for (ExecutionJobVertex ejv : this.tasks.values()) {<br/> if (ejv.getJobVertex().isInputVertex()) {<br/> ejv.scheduleAll(slotProvider, allowQueuedScheduling);<br/> }<br/>}
然后,ExecutionJobVertex
public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
ExecutionVertex[] vertices = this.taskVertices;
// kick off the tasks<br/> for (ExecutionVertex ev : vertices) {<br/> ev.scheduleForExecution(slotProvider, queued);<br/> }<br/>}
再,ExecutionVertex
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {<br/> return this.currentExecution.scheduleForExecution(slotProvider, queued);<br/>}
最终,Execution
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();<br/>
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
if (transitionState(CREATED, SCHEDULED)) {
ScheduledUnit toSchedule = locationConstraint == null ?<br/>
new ScheduledUnit(this, sharingGroup) :<br/>
new ScheduledUnit(this, sharingGroup, locationConstraint);
// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned<br/>
// in all cases where the deployment failed. we use many try {} finally {} clauses to assure that<br/>
final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源
// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so<br/>
// that we directly deploy the tasks if the slot allocation future is completed. This is<br/>
// necessary for immediate deployment.<br/>
final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {<br/>
@Override<br/>
public Void apply(SimpleSlot simpleSlot, Throwable throwable) {<br/>
if (simpleSlot != null) {<br/>
try {<br/>
deployToSlot(simpleSlot); //如果申请到,去部署<br/>
} catch (Throwable t) {<br/>
try {<br/>
simpleSlot.releaseSlot();<br/>
} finally {<br/>
markFailed(t);<br/>
}<br/>
}<br/>
}<br/>
else {<br/>
markFailed(throwable);<br/>
}<br/>
return null;<br/>
}<br/>
});
return true;<br/>
}
调用到,slotProvider.allocateSlot, slotProvider即Scheduler
@Override<br/>
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)<br/>
throws NoResourceAvailableException {
final Object ret = scheduleTask(task, allowQueued);<br/>
if (ret instanceof SimpleSlot) {<br/>
return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束<br/>
}<br/>
else if (ret instanceof Future) {<br/>
return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future<br/>
}<br/>
else {<br/>
throw new RuntimeException();<br/>
}<br/>
}
scheduleTask
/**<br/>
* Returns either a {@link SimpleSlot}, or a {@link Future}.<br/>
*/<br/>
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();<br/>
final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&<br/>
preferredLocations != null && preferredLocations.iterator().hasNext();
synchronized (globalLock) { //全局锁
SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
if (sharingUnit != null) { //如果是共享slot
// 1) === If the task has a slot sharing group, schedule with shared slots ===
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();<br/>
final CoLocationConstraint constraint = task.getLocationConstraint();
// get a slot from the group, if the group has one for us (and can fulfill the constraint)<br/>
final SimpleSlot slotFromGroup;<br/>
if (constraint == null) {<br/>
slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的<br/>
}<br/>
else {<br/>
slotFromGroup = assignment.getSlotForTask(vertex, constraint);<br/>
}
SimpleSlot newSlot = null;<br/>
SimpleSlot toUse = null;
// the following needs to make sure any allocated slot is released in case of an error<br/>
try {
// check whether the slot from the group is already what we want.<br/>
// any slot that is local, or where the assignment was unconstrained is good!<br/>
if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的<br/>
updateLocalityCounters(slotFromGroup, vertex);<br/>
return slotFromGroup; //已经找到合适的slot,返回<br/>
}
// the group did not have a local slot for us. see if we can one (or a better one)<br/>
newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot
if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {<br/>
// if there is no slot from the group, or the new slot is local,<br/>
// then we use the new slot<br/>
if (slotFromGroup != null) {<br/>
slotFromGroup.releaseSlot();<br/>
}<br/>
toUse = newSlot;<br/>
}<br/>
else {<br/>
// both are available and usable. neither is local. in that case, we may<br/>
// as well use the slot from the sharing group, to minimize the number of<br/>
// instances that the job occupies<br/>
newSlot.releaseSlot();<br/>
toUse = slotFromGroup;<br/>
}
// if this is the first slot for the co-location constraint, we lock<br/>
// the location, because we are going to use that slot<br/>
if (constraint != null && !constraint.isAssigned()) {<br/>
constraint.lockLocation();<br/>
}
updateLocalityCounters(toUse, vertex);<br/>
}
return toUse; //返回申请的slot<br/>
}<br/>
else { //如果不是共享slot,比较简单
// 2) === schedule without hints and sharing ===
SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot<br/>
if (slot != null) {<br/>
updateLocalityCounters(slot, vertex);<br/>
return slot; //申请到了就返回slot<br/>
}<br/>
else {<br/>
// no resource available now, so queue the request<br/>
if (queueIfNoResource) { //如果可以queue<br/>
CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();<br/>
this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请<br/>
return future;<br/>
}<br/>
}<br/>
}<br/>
}<br/>
}
我们直接看非共享slot的case,
会调用到, getFreeSlotForTask
/**<br/>
* Gets a suitable instance to schedule the vertex execution to.<br/>
* <p><br/>
* NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.<br/>
*<br/>
* @param vertex The task to run.<br/>
* @return The instance to run the vertex on, it {@code null}, if no instance is available.<br/>
*/<br/>
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,<br/>
Iterable<TaskManagerLocation> requestedLocations,<br/>
boolean localOnly) {<br/>
// we need potentially to loop multiple times, because there may be false positives<br/>
// in the set-with-available-instances<br/>
while (true) {<br/>
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance
if (instanceLocalityPair == null){<br/>
return null; //没有合适的instance,分配失败<br/>
}
Instance instanceToUse = instanceLocalityPair.getLeft();<br/>
Locality locality = instanceLocalityPair.getRight();
try {<br/>
SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot
// if the instance has further available slots, re-add it to the set of available resources.<br/>
if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配<br/>
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);<br/>
}
if (slot != null) {<br/>
slot.setLocality(locality);<br/>
return slot; //成功就返回slot<br/>
}<br/>
}<br/>
catch (InstanceDiedException e) {<br/>
// the instance died it has not yet been propagated to this scheduler<br/>
// remove the instance from the set of available instances<br/>
removeInstance(instanceToUse);<br/>
}
// if we failed to get a slot, fall through the loop<br/>
}<br/>
}
findInstance
/**<br/>
* Tries to find a requested instance. If no such instance is available it will return a non-<br/>
* local instance. If no such instance exists (all slots occupied), then return null.<br/>
*<br/>
* <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p><br/>
*<br/>
* @param requestedLocations The list of preferred instances. May be null or empty, which indicates that<br/>
* no locality preference exists.<br/>
* @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.<br/>
*/<br/>
private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
// drain the queue of newly available instances<br/>
while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources<br/>
Instance queuedInstance = this.newlyAvailableInstances.poll();<br/>
if (queuedInstance != null) {<br/>
this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);<br/>
}<br/>
}
// if nothing is available at all, return null<br/>
if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败<br/>
return null;<br/>
}
Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();
if (locations != null && locations.hasNext()) { //按照locality preference依次找instance<br/>
// we have a locality preference
while (locations.hasNext()) {<br/>
TaskManagerLocation location = locations.next();<br/>
if (location != null) {<br/>
Instance instance = instancesWithAvailableResources.remove(location.getResourceID());<br/>
if (instance != null) {<br/>
return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);<br/>
}<br/>
}<br/>
}
// no local instance available<br/>
if (localOnly) {<br/>
return null;<br/>
}<br/>
else {<br/>
// take the first instance from the instances with resources<br/>
Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/>
Instance instanceToUse = instances.next();<br/>
instances.remove();
return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);<br/>
}<br/>
}<br/>
else {<br/>
// no location preference, so use some instance<br/>
Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/>
Instance instanceToUse = instances.next();<br/>
instances.remove();
return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);<br/>
}<br/>
}
那么继续,newlyAvailableInstances,哪儿来的?
@Override<br/>
public void newInstanceAvailable(Instance instance) {
// synchronize globally for instance changes<br/>
synchronized (this.globalLock) {
try {<br/>
// make sure we get notifications about slots becoming available<br/>
instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener
// store the instance in the by-host-lookup<br/>
String instanceHostName = instance.getTaskManagerLocation().getHostname();<br/>
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);<br/>
if (instanceSet == null) {<br/>
instanceSet = new HashSet<Instance>();<br/>
allInstancesByHost.put(instanceHostName, instanceSet);<br/>
}<br/>
instanceSet.add(instance);
// add it to the available resources and let potential waiters know<br/>
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources
// add all slots as available<br/>
for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {<br/>
newSlotAvailable(instance);<br/>
}<br/>
}<br/>
}<br/>
}
@Override<br/>
public void newSlotAvailable(final Instance instance) {
// WARNING: The asynchrony here is necessary, because we cannot guarantee the order<br/>
// of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:<br/>
//<br/>
// -> The scheduler needs to grab them (1) global scheduler lock<br/>
// (2) slot/instance lock<br/>
// -> The slot releasing grabs (1) slot/instance (for releasing) and<br/>
// (2) scheduler (to check whether to take a new task item<br/>
//<br/>
// that leads with a high probability to deadlocks, when scheduling fast
this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances
Futures.future(new Callable<Object>() {<br/>
@Override<br/>
public Object call() throws Exception {<br/>
handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉<br/>
return null;<br/>
}<br/>
}, executionContext);<br/>
}
接着newInstanceAvailable,在InstanceManager里面被调用,
private void notifyNewInstance(Instance instance) {<br/>
synchronized (this.instanceListeners) {<br/>
for (InstanceListener listener : this.instanceListeners) {<br/>
try {<br/>
listener.newInstanceAvailable(instance);<br/>
}<br/>
catch (Throwable t) {<br/>
LOG.error("Notification of new instance availability failed.", t);<br/>
}<br/>
}<br/>
}<br/>
}
notifyNewInstance在registerTaskManager中被调用,
registerTaskManager是在JobManager里面当taskManager注册时被调用的
case msg @ RegisterTaskManager(<br/>
resourceId,<br/>
connectionInfo,<br/>
hardwareInformation,<br/>
numberOfSlots) =>
val taskManager = sender()
currentResourceManager match {<br/>
case Some(rm) => //如果有resourceManager<br/>
val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动<br/>
}
// ResourceManager is told about the resource, now let's try to register TaskManager<br/>
if (instanceManager.isRegistered(resourceId)) { //如果已经注册过<br/>
val instanceID = instanceManager.getRegisteredInstance(resourceId).getId
taskManager ! decorateMessage(<br/>
AlreadyRegistered(<br/>
instanceID,<br/>
libraryCacheManager.getBlobServerPort))<br/>
} else { //新的resource<br/>
try {<br/>
val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)<br/>
val taskManagerGateway = new ActorTaskManagerGateway(actorGateway)
val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager<br/>
taskManagerGateway,<br/>
connectionInfo,<br/>
hardwareInformation,<br/>
numberOfSlots)
taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager
taskManager ! decorateMessage(<br/>
AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册
// to be notified when the taskManager is no longer reachable<br/>
context.watch(taskManager)<br/>
}<br/>
}
这个版本没有实现图中的架构
当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager
当前ResourceManager只是起到一个记录的作用
ResourceManager没有从JobManager中独立出来
仍然是这种架构,
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
