InstanceManager用于管理JobManager申请到的taskManager和slots资源

/**<br/>
 * Simple manager that keeps track of which TaskManager are available and alive.<br/>
 */<br/>
public class InstanceManager {

    // ------------------------------------------------------------------------<br/>
    // Fields<br/>
    // ------------------------------------------------------------------------

    //分别以InstanceId和ResourceId来索引Instance<br/>
    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */<br/>
    private final Map<InstanceID, Instance> registeredHostsById;<br/>
    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */<br/>
    private final Map<ResourceID, Instance> registeredHostsByResource;

    /** Set of hosts that were present once and have died */<br/>
    private final Set<ResourceID> deadHosts;

    /** Listeners that want to be notified about availability and disappearance of instances */<br/>
    private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler

    /** The total number of task slots that the system has */<br/>
    private int totalNumberOfAliveTaskSlots;

 

关键的操作,

registerTaskManager

/**<br/>
 * Registers a task manager. Registration of a task manager makes it available to be used<br/>
 * for the job execution.<br/>
 *<br/>
 * @param taskManagerGateway gateway to the task manager<br/>
 * @param taskManagerLocation Location info of the TaskManager<br/>
 * @param resources Hardware description of the TaskManager<br/>
 * @param numberOfSlots Number of available slots on the TaskManager<br/>
 * @return The assigned InstanceID of the registered task manager<br/>
 */<br/>
public InstanceID registerTaskManager(<br/>
        TaskManagerGateway taskManagerGateway,<br/>
        TaskManagerLocation taskManagerLocation,<br/>
        HardwareDescription resources,<br/>
        int numberOfSlots) {

    synchronized (this.lock) {<br/>
        InstanceID instanceID = new InstanceID();

        Instance host = new Instance( //创建新的instance<br/>
            taskManagerGateway,<br/>
            taskManagerLocation,<br/>
            instanceID,<br/>
            resources,<br/>
            numberOfSlots);

        registeredHostsById.put(instanceID, host); //register<br/>
        registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);

        totalNumberOfAliveTaskSlots += numberOfSlots;

        host.reportHeartBeat();

        // notify all listeners (for example the scheduler)<br/>
        notifyNewInstance(host);

        return instanceID;<br/>
    }<br/>
}

其中,notifyNewInstance

private void notifyNewInstance(Instance instance) {<br/>
    synchronized (this.instanceListeners) {<br/>
        for (InstanceListener listener : this.instanceListeners) {<br/>
            try {<br/>
                listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable<br/>
            }<br/>
            catch (Throwable t) {<br/>
                LOG.error("Notification of new instance availability failed.", t);<br/>
            }<br/>
        }<br/>
    }<br/>
}

 

Instance

看注释,instance就是一种抽象

用于描述注册到JobManager,并准备接受work的TaskManager

/**<br/>
 * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}<br/>
 * registered at a JobManager and ready to receive work.<br/>
 */<br/>
public class Instance implements SlotOwner {

    /** The instance gateway to communicate with the instance */<br/>
    private final TaskManagerGateway taskManagerGateway;

    /** The instance connection information for the data transfer. */<br/>
    private final TaskManagerLocation location;

    /** A description of the resources of the task manager */<br/>
    private final HardwareDescription resources;

    /** The ID identifying the taskManager. */<br/>
    private final InstanceID instanceId;

    /** The number of task slots available on the node */<br/>
    private final int numberOfSlots;

    /** A list of available slot positions */<br/>
    private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的

    /** Allocated slots on this taskManager */<br/>
    private final Set<Slot> allocatedSlots = new HashSet<Slot>();

    /** A listener to be notified upon new slot availability */<br/>
    private SlotAvailabilityListener slotAvailabilityListener;  //listener用于通知当slot状态发生变化

    /** Time when last heat beat has been received from the task manager running on this taskManager. */<br/>
    private volatile long lastReceivedHeartBeat = System.currentTimeMillis();

核心的操作,

申请slot

/**<br/>
 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot<br/>
 * is available at the moment.<br/>
 *<br/>
 * @param jobID The ID of the job that the slot is allocated for.<br/>
 *<br/>
 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the<br/>
 *         TaskManager instance has no more slots available.<br/>
 *<br/>
 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the<br/>
 *                               slot is allocated.<br/>
 */<br/>
public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {

    synchronized (instanceLock) {<br/>
        Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position<br/>
        if (nextSlot == null) {<br/>
            return null;<br/>
        }<br/>
        else {<br/>
            SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);<br/>
            allocatedSlots.add(slot);<br/>
            return slot;<br/>
        }<br/>
    }<br/>
}

 

归还slot

/**<br/>
 * Returns a slot that has been allocated from this instance. The slot needs have been canceled<br/>
 * prior to calling this method.<br/>
 *<br/>
 * <p>The method will transition the slot to the "released" state. If the slot is already in state<br/>
 * "released", this method will do nothing.</p><br/>
 *<br/>
 * @param slot The slot to return.<br/>
 * @return True, if the slot was returned, false if not.<br/>
 */<br/>
@Override<br/>
public boolean returnAllocatedSlot(Slot slot) {

    if (slot.markReleased()) {<br/>
        LOG.debug("Return allocated slot {}.", slot);<br/>
        synchronized (instanceLock) {

            if (this.allocatedSlots.remove(slot)) {<br/>
                this.availableSlots.add(slot.getSlotNumber());

                if (this.slotAvailabilityListener != null) {<br/>
                    this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用<br/>
                }

                return true;<br/>
            }<br/>
        }<br/>
    }<br/>
}