Flink resource manager的作用如图,



 * <h1>Worker allocation steps</h1><br/>
 * <ol><br/>
 *     <li>The resource manager decides to request more workers. This can happen in order<br/>
 *         to fill the initial pool, or as a result of the JobManager requesting more workers.</li><br/>
 *     <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests<br/>
 *         for more containers. After that, the {@link #getNumWorkerRequestsPending()}<br/>
 *         should reflect the pending requests.</li><br/>
 *     <li>The concrete framework may acquire containers and then trigger to start TaskManagers<br/>
 *         in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li><br/>
 *     <li>At some point, the TaskManager processes will have started and send a registration<br/>
 *         message to the JobManager. The JobManager will perform<br/>
 *         a lookup with the ResourceManager to check if it really started this TaskManager.<br/>
 *         The method {@link #workerStarted(ResourceID)} will be called<br/>
 *         to inform about a registered worker.</li><br/>
 * </ol><br/>
public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor {

    /** The service to find the right leader JobManager (to support high availability) */<br/>
    private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知

    /** Map which contains the workers from which we know that they have been successfully started<br/>
     * in a container. This notification is sent by the JM when a TM tries to register at it. */<br/>
    private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知

    /** The JobManager that the framework master manages resources for */<br/>
    private ActorRef jobManager;

    /** Our JobManager's leader session */<br/>
    private UUID leaderSessionID;

    /** The size of the worker pool that the resource master strives to maintain */<br/>
    private int designatedPoolSize; //resource pool大小


ResourceManager作为actor, 主要是处理message,

    protected void handleMessage(Object message) {<br/>
        try {<br/>
            // --- messages about worker allocation and pool sizes

            if (message instanceof CheckAndAllocateContainers) {<br/>
            else if (message instanceof SetWorkerPoolSize) {<br/>
                SetWorkerPoolSize msg = (SetWorkerPoolSize) message;<br/>
            else if (message instanceof RemoveResource) {<br/>
                RemoveResource msg = (RemoveResource) message;<br/>

            // --- lookup of registered resources

            else if (message instanceof NotifyResourceStarted) {<br/>
                NotifyResourceStarted msg = (NotifyResourceStarted) message;<br/>
                handleResourceStarted(sender(), msg.getResourceID());<br/>

            // --- messages about JobManager leader status and registration

            else if (message instanceof NewLeaderAvailable) {<br/>
                NewLeaderAvailable msg = (NewLeaderAvailable) message;<br/>
                newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());<br/>
            else if (message instanceof TriggerRegistrationAtJobManager) {<br/>
                TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;<br/>
            else if (message instanceof RegisterResourceManagerSuccessful) {<br/>
                RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;<br/>
                jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());<br/>




 * This method causes the resource framework master to <b>synchronously</b>re-examine<br/>
 * the set of available and pending workers containers, and allocate containers<br/>
 * if needed.<br/>
 * This method does not automatically release workers, because it is not visible to<br/>
 * this resource master which workers can be released. Instead, the JobManager must<br/>
 * explicitly release individual workers.<br/>
private void checkWorkersPool() {<br/>
    int numWorkersPending = getNumWorkerRequestsPending();<br/>
    int numWorkersPendingRegistration = getNumWorkersPendingRegistration();

    // see how many workers we want, and whether we have enough<br/>
    int allAvailableAndPending = startedWorkers.size() +<br/>
        numWorkersPending + numWorkersPendingRegistration;

    int missing = designatedPoolSize - allAvailableAndPending;

    if (missing > 0) {<br/>
        requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker<br/>


 * Tells the ResourceManager that a TaskManager had been started in a container with the given<br/>
 * resource id.<br/>
 * @param jobManager The sender (JobManager) of the message<br/>
 * @param resourceID The resource id of the started TaskManager<br/>
private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {<br/>
    if (resourceID != null) {<br/>
        // check if resourceID is already registered (TaskManager may send duplicate register messages)<br/>
        WorkerType oldWorker = startedWorkers.get(resourceID);<br/>
        if (oldWorker != null) { //看看该worker是否已经存在<br/>
            LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);<br/>
        } else {<br/>
            WorkerType newWorker = workerStarted(resourceID); //取得worker

            if (newWorker != null) {<br/>
                startedWorkers.put(resourceID, newWorker); //注册新的worker<br/>
                LOG.info("TaskManager {} has started.", resourceID);<br/>
            } else {<br/>
                LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);<br/>

    // Acknowledge the resource registration<br/>
    jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册<br/>







public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.<br/>for (ExecutionJobVertex ejv : this.tasks.values()) {<br/>   if (ejv.getJobVertex().isInputVertex()) {<br/>      ejv.scheduleAll(slotProvider, allowQueuedScheduling);<br/>   }<br/>}


public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

   ExecutionVertex[] vertices = this.taskVertices;

   // kick off the tasks<br/>   for (ExecutionVertex ev : vertices) {<br/>      ev.scheduleForExecution(slotProvider, queued);<br/>   }<br/>}


public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {<br/>   return this.currentExecution.scheduleForExecution(slotProvider, queued);<br/>}


public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();<br/>
    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

    if (transitionState(CREATED, SCHEDULED)) {

        ScheduledUnit toSchedule = locationConstraint == null ?<br/>
            new ScheduledUnit(this, sharingGroup) :<br/>
            new ScheduledUnit(this, sharingGroup, locationConstraint);

        // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned<br/>
        //     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that<br/>
        final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源

        // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so<br/>
        // that we directly deploy the tasks if the slot allocation future is completed. This is<br/>
        // necessary for immediate deployment.<br/>
        final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {<br/>
            public Void apply(SimpleSlot simpleSlot, Throwable throwable) {<br/>
                if (simpleSlot != null) {<br/>
                    try {<br/>
                        deployToSlot(simpleSlot); //如果申请到,去部署<br/>
                    } catch (Throwable t) {<br/>
                        try {<br/>
                        } finally {<br/>
                else {<br/>
                return null;<br/>

        return true;<br/>


调用到,slotProvider.allocateSlot, slotProvider即Scheduler

public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)<br/>
        throws NoResourceAvailableException {

    final Object ret = scheduleTask(task, allowQueued);<br/>
    if (ret instanceof SimpleSlot) {<br/>
        return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束<br/>
    else if (ret instanceof Future) {<br/>
        return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future<br/>
    else {<br/>
        throw new RuntimeException();<br/>



     * Returns either a {@link SimpleSlot}, or a {@link Future}.<br/>
    private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {

        final ExecutionVertex vertex = task.getTaskToExecute().getVertex();

        final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();<br/>
        final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&<br/>
                                    preferredLocations != null && preferredLocations.iterator().hasNext();

        synchronized (globalLock) { //全局锁

            SlotSharingGroup sharingUnit = task.getSlotSharingGroup();

            if (sharingUnit != null) { //如果是共享slot

                // 1)  === If the task has a slot sharing group, schedule with shared slots ===

                final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();<br/>
                final CoLocationConstraint constraint = task.getLocationConstraint();

                // get a slot from the group, if the group has one for us (and can fulfill the constraint)<br/>
                final SimpleSlot slotFromGroup;<br/>
                if (constraint == null) {<br/>
                    slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的<br/>
                else {<br/>
                    slotFromGroup = assignment.getSlotForTask(vertex, constraint);<br/>

                SimpleSlot newSlot = null;<br/>
                SimpleSlot toUse = null;

                // the following needs to make sure any allocated slot is released in case of an error<br/>
                try {

                    // check whether the slot from the group is already what we want.<br/>
                    // any slot that is local, or where the assignment was unconstrained is good!<br/>
                    if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的<br/>
                        updateLocalityCounters(slotFromGroup, vertex);<br/>
                        return slotFromGroup; //已经找到合适的slot,返回<br/>

                    // the group did not have a local slot for us. see if we can one (or a better one)<br/>
                    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot

                    if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {<br/>
                        // if there is no slot from the group, or the new slot is local,<br/>
                        // then we use the new slot<br/>
                        if (slotFromGroup != null) {<br/>
                        toUse = newSlot;<br/>
                    else {<br/>
                        // both are available and usable. neither is local. in that case, we may<br/>
                        // as well use the slot from the sharing group, to minimize the number of<br/>
                        // instances that the job occupies<br/>
                        toUse = slotFromGroup;<br/>

                    // if this is the first slot for the co-location constraint, we lock<br/>
                    // the location, because we are going to use that slot<br/>
                    if (constraint != null && !constraint.isAssigned()) {<br/>

                    updateLocalityCounters(toUse, vertex);<br/>

                return toUse; //返回申请的slot<br/>
            else { //如果不是共享slot,比较简单

                // 2) === schedule without hints and sharing ===

                SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot<br/>
                if (slot != null) {<br/>
                    updateLocalityCounters(slot, vertex);<br/>
                    return slot; //申请到了就返回slot<br/>
                else {<br/>
                    // no resource available now, so queue the request<br/>
                    if (queueIfNoResource) { //如果可以queue<br/>
                        CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();<br/>
                        this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请<br/>
                        return future;<br/>


会调用到, getFreeSlotForTask

     * Gets a suitable instance to schedule the vertex execution to.<br/>
     * <p><br/>
     * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.<br/>
     * @param vertex The task to run.<br/>
     * @return The instance to run the vertex on, it {@code null}, if no instance is available.<br/>
    protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,<br/>
                                            Iterable<TaskManagerLocation> requestedLocations,<br/>
                                            boolean localOnly) {<br/>
        // we need potentially to loop multiple times, because there may be false positives<br/>
        // in the set-with-available-instances<br/>
        while (true) {<br/>
            Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance

            if (instanceLocalityPair == null){<br/>
                return null; //没有合适的instance,分配失败<br/>

            Instance instanceToUse = instanceLocalityPair.getLeft();<br/>
            Locality locality = instanceLocalityPair.getRight();

            try {<br/>
                SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot

                // if the instance has further available slots, re-add it to the set of available resources.<br/>
                if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配<br/>
                    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);<br/>

                if (slot != null) {<br/>
                    return slot; //成功就返回slot<br/>
            catch (InstanceDiedException e) {<br/>
                // the instance died it has not yet been propagated to this scheduler<br/>
                // remove the instance from the set of available instances<br/>

            // if we failed to get a slot, fall through the loop<br/>



     * Tries to find a requested instance. If no such instance is available it will return a non-<br/>
     * local instance. If no such instance exists (all slots occupied), then return null.<br/>
     * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p><br/>
     * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that<br/>
     *                           no locality preference exists.<br/>
     * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.<br/>
    private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {

        // drain the queue of newly available instances<br/>
        while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources<br/>
            Instance queuedInstance = this.newlyAvailableInstances.poll();<br/>
            if (queuedInstance != null) {<br/>
                this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);<br/>

        // if nothing is available at all, return null<br/>
        if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败<br/>
            return null;<br/>

        Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();

        if (locations != null && locations.hasNext()) { //按照locality preference依次找instance<br/>
            // we have a locality preference

            while (locations.hasNext()) {<br/>
                TaskManagerLocation location = locations.next();<br/>
                if (location != null) {<br/>
                    Instance instance = instancesWithAvailableResources.remove(location.getResourceID());<br/>
                    if (instance != null) {<br/>
                        return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);<br/>

            // no local instance available<br/>
            if (localOnly) {<br/>
                return null;<br/>
            else {<br/>
                // take the first instance from the instances with resources<br/>
                Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/>
                Instance instanceToUse = instances.next();<br/>

                return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);<br/>
        else {<br/>
            // no location preference, so use some instance<br/>
            Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/>
            Instance instanceToUse = instances.next();<br/>

            return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);<br/>



public void newInstanceAvailable(Instance instance) {

    // synchronize globally for instance changes<br/>
    synchronized (this.globalLock) {

        try {<br/>
            // make sure we get notifications about slots becoming available<br/>
            instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener

            // store the instance in the by-host-lookup<br/>
            String instanceHostName = instance.getTaskManagerLocation().getHostname();<br/>
            Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);<br/>
            if (instanceSet == null) {<br/>
                instanceSet = new HashSet<Instance>();<br/>
                allInstancesByHost.put(instanceHostName, instanceSet);<br/>

            // add it to the available resources and let potential waiters know<br/>
            this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources

            // add all slots as available<br/>
            for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {<br/>


    public void newSlotAvailable(final Instance instance) {

        // WARNING: The asynchrony here is necessary, because  we cannot guarantee the order<br/>
        // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:<br/>
        // -> The scheduler needs to grab them (1) global scheduler lock<br/>
        //                                     (2) slot/instance lock<br/>
        // -> The slot releasing grabs (1) slot/instance (for releasing) and<br/>
        //                             (2) scheduler (to check whether to take a new task item<br/>
        // that leads with a high probability to deadlocks, when scheduling fast

        this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances

        Futures.future(new Callable<Object>() {<br/>
            public Object call() throws Exception {<br/>
                handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉<br/>
                return null;<br/>
        }, executionContext);<br/>



    private void notifyNewInstance(Instance instance) {<br/>
        synchronized (this.instanceListeners) {<br/>
            for (InstanceListener listener : this.instanceListeners) {<br/>
                try {<br/>
                catch (Throwable t) {<br/>
                    LOG.error("Notification of new instance availability failed.", t);<br/>



case msg @ RegisterTaskManager(<br/>
      numberOfSlots) =>

  val taskManager = sender()

  currentResourceManager match {<br/>
    case Some(rm) => //如果有resourceManager<br/>
      val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动<br/>

  // ResourceManager is told about the resource, now let's try to register TaskManager<br/>
  if (instanceManager.isRegistered(resourceId)) { //如果已经注册过<br/>
    val instanceID = instanceManager.getRegisteredInstance(resourceId).getId

    taskManager ! decorateMessage(<br/>
  } else { //新的resource<br/>
    try {<br/>
      val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)<br/>
      val taskManagerGateway = new ActorTaskManagerGateway(actorGateway)

      val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager<br/>

      taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager

      taskManager ! decorateMessage(<br/>
        AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册

      // to be notified when the taskManager is no longer reachable<br/>