1. Background

VM life cycle in CloudStack is current represented through a number of lifecycle VM states, following is a complete list of these states

 Starting, Running, Stopping, Stopped, Destroyed, Migrating, Expunging, Error, Unknown

Compared with VM states defined in underlying hypervisors, CloudStack lifecycle VM states contain more information that is to reflect VM's cloud environment, when we say a CloudStack VM is running, it usually means that

  • VM's storage volumes are ready
  • VM's network environment in Cloud is ready
  • VM is powered on at hypervisor host

To manage a CloudStack VM properly, current CloudStack has hypervisor resource-agent to participate VM lifecycle state management and periodically sync-back with CloudStack management server. Therefore, in addition for hypervisor resource agent to be aware of hypervisor specific VM power state, it needs also to know about the states introduced in CloudStack, especially to those transitional CloudStack VM states like Starting, Migrating, etc.

Upon hypervisor host-connect event, hypervisor resource-agent will first report all VMs on the host to management server, it triggers a "full-sync" process with management server to build an initial sync start point, the host won't be considered as in UP state until this "full-sync" process is completed. After host is connected, hypervisor host resource-agent will periodically perform "delta-sync" with CloudStack management server.

 "Full-sync" and "delta-sync" are currently forming the foundation of VMSync process in CloudStack. Although it works nicely most of time for VMs that are solely operated by CloudStack, as soon as the introduction of external VM managers, for example Citrix XenCenter, VMware vCenter, the state sync scenarios can become hard to handle out-of-band changes posted from external managers, following use cases sometimes can cause problematic issues during normal operations of CloudStack.

1) Takes a long time to bring up all hypervisor hosts in a large setup

During management restart, if things fall out of sync, "full-sync" on host connect-phase can trigger a series of chain actions (actions to bring state in sync) that takes a long time to finish

2) Activities from user, from HA process and VMSync process can collide and the resolution of conflicts is hard to cover all scenarios.

3) Hyprvisor resource-agent to participate into CloudStack VM state management has increased the complexity for people to write a new hypervisor support.

This improvement effort is to address these issues, it will help CloudStack better interage with third-party virtualization managers like VMware vCenter to perform HA, DRS, FT etc better and reliably through CloudStack.

2. Design

 2.1 High-level principals

At very high-level, we try to attack the problem in following areas

1) Hypervisor resource-agent to report raw VM power state only

This is to de-couple resource agent from CloudStack VM lifecycle state management, letting hypervisor resource-agent only carry on hypervisor-specific actions and report hypervisor raw VM state can greatly simplify the coding of hypervisor resource-agent

In theory, most of currently defined CloudStack transitional VM states are actually representing the states of corresponding transition jobs. For example, CloudStack Starting VM state merely means that there is a pending job in the system which is working on to bring VM from Stopped state to Running state. From end user's perspective, stationary states like Stopped, Running are more meaningful states about a VM. 

There is an important fact that stationary VM states (Stopped, Running) are universal across hypervisors and CloudStack, technically, using stationary VM states and the job status that is currently operating on the subject VM can clearly give user a detail view of the VM. To help us move towards this direction, VM power state is introduced, it currently directly reflects to the hypervisor VM state. With current refactoring work, we still keep original VM state to avoid massive code change and API update, in the future, VM state and VM power state will ultimately be converged into one.

With VM power state, hypervisor resource-agent no longer needs to know anything about a transition job status that is specific to CloudStack, all it needs to care is how to carry on a hypervisor-specific action or report VM power state periodically, there is no need to setup a sync start point, therefore we can eliminate "full-sync" process at all.

Following is a code snaplet that shows the old way of how a hypervisor resource-agent needs to do a sync report

protected HashMap<String, State> sync() {
        HashMap<String, State> changes = new HashMap<String, State>();
        HashMap<String, State> oldStates = null;

        try {
            synchronized (_vms) {
                HashMap<String, State> newStates = getVmStates();
                oldStates = new HashMap<String, State>(_vms.size());
                oldStates.putAll(_vms);

                for (final Map.Entry<String, State> entry : newStates.entrySet()) {
                    final String vm = entry.getKey();

                    State newState = entry.getValue();
                    final State oldState = oldStates.remove(vm);

                    if (s_logger.isTraceEnabled()) {
                        s_logger.trace("VM " + vm + ": vSphere has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null"));
                    }

                    if (vm.startsWith("migrating")) {
                        s_logger.debug("Migrating detected.  Skipping");
                        continue;
                    }

                    if (oldState == null) {
                        _vms.put(vm, newState);
                        s_logger.debug("Detecting a new state but couldn't find a old state so adding it to the changes: " + vm);
                        changes.put(vm, newState);
                    } else if (oldState == State.Starting) {
                        if (newState == State.Running) {
                            _vms.put(vm, newState);
                        } else if (newState == State.Stopped) {
                            s_logger.debug("Ignoring vm " + vm + " because of a lag in starting the vm.");
                        }
                    } else if (oldState == State.Migrating) {
                        if (newState == State.Running) {
                            s_logger.debug("Detected that an migrating VM is now running: " + vm);
                            _vms.put(vm, newState);
                        }
                    } else if (oldState == State.Stopping) {
                        if (newState == State.Stopped) {
                            _vms.put(vm, newState);
                        } else if (newState == State.Running) {
                            s_logger.debug("Ignoring vm " + vm + " because of a lag in stopping the vm. ");
                        }
                    } else if (oldState != newState) {
                        _vms.put(vm, newState);
                        if (newState == State.Stopped) {
                        }
                        changes.put(vm, newState);
                    }
                }

                for (final Map.Entry<String, State> entry : oldStates.entrySet()) {
                    final String vm = entry.getKey();
                    final State oldState = entry.getValue();

                    if (isVmInCluster(vm)) {
                        if (s_logger.isDebugEnabled()) {
                            s_logger.debug("VM " + vm + " is now missing from host report but we detected that it might be migrated to other host by vCenter");
                        }

                        if(oldState != State.Starting && oldState != State.Migrating) {
                            s_logger.debug("VM " + vm + " is now missing from host report and VM is not at starting/migrating state, remove it from host VM-sync map, oldState: " + oldState);
                            _vms.remove(vm);
                        } else {
                            s_logger.debug("VM " + vm + " is missing from host report, but we will ignore VM " + vm + " in transition state " + oldState);
                        }
                        continue;
                    }

                    if (s_logger.isDebugEnabled()) {
                        s_logger.debug("VM " + vm + " is now missing from host report");
                    }

                    if (oldState == State.Stopping) {
                        s_logger.debug("Ignoring VM " + vm + " in transition state stopping.");
                        _vms.remove(vm);
                    } else if (oldState == State.Starting) {
                        s_logger.debug("Ignoring VM " + vm + " in transition state starting.");
                    } else if (oldState == State.Stopped) {
                        _vms.remove(vm);
                    } else if (oldState == State.Migrating) {
                        s_logger.debug("Ignoring VM " + vm + " in migrating state.");
                    } else {
                        State state = State.Stopped;
                        changes.put(entry.getKey(), state);
                    }
                }
            }
        } catch (Throwable e) {
	    // ...
        }
        return changes;
    }

You can see that if CloudStack ever needs to define a new transitional VM state, it will be very hard for hypervisor resource-agent developer to follow, most of sync originated problems happen when developer fails to correctly manage the state cache maintained at resource side.

Since now resource-agent is only required to report raw VM power state, above code logic can become as simple as following

    protected HashMap<String, PowerState> sync() {
    	return getVmStates();
    }

The schema change to add VM power state support is shown below.

ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state` VARCHAR(64) DEFAULT 'PowerUnknown';
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state_update_time` DATETIME;
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state_update_count` INT DEFAULT 0;
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_host` bigint unsigned;
ALTER TABLE `cloud`.`vm_instance` ADD CONSTRAINT `fk_vm_instance__power_host` FOREIGN KEY (`power_host`) REFERENCES `cloud`.`host`(`id`);

There is one thing that is worth to note, since majority of time VM will stay in a particular stationary state for a long time, to reduce the number of DB writes, we will only update consecutive same-state update for a limit number of times. power_state_update_count is designed for this purpose.

2) Serialize VM operations

Currently, handling of state transition always happens at in-place context, for example, when management server receives hypervisor VM state report, the handling of the report is processed within the context, even if there may be another thread that is handling user request on the same VM. Although we try to coordinate by checking the state of the VM, by simplify failing it with concurrent-access exception.  

About in-place handling style, following code snaplet shows an example.

    
    protected Command compareState(long hostId, VMInstanceVO vm, final AgentVmInfo info, final boolean fullSync, boolean trackExternalChange) {
        State agentState = info.state;
        final State serverState = vm.getState();
        final String serverName = vm.getInstanceName();

        Command command = null;
        s_logger.debug("VM " + serverName + ": cs state = " + serverState + " and realState = " + agentState);
        if (s_logger.isDebugEnabled()) {
            s_logger.debug("VM " + serverName + ": cs state = " + serverState + " and realState = " + agentState);
        }

        if (agentState == State.Error) {
            agentState = State.Stopped;

            short alertType = AlertManager.ALERT_TYPE_USERVM;
            if (VirtualMachine.Type.DomainRouter.equals(vm.getType())) {
                alertType = AlertManager.ALERT_TYPE_DOMAIN_ROUTER;
            } else if (VirtualMachine.Type.ConsoleProxy.equals(vm.getType())) {
                alertType = AlertManager.ALERT_TYPE_CONSOLE_PROXY;
            } else if (VirtualMachine.Type.SecondaryStorageVm.equals(vm.getType())) {
                alertType = AlertManager.ALERT_TYPE_SSVM;
            }

            HostPodVO podVO = _podDao.findById(vm.getPodIdToDeployIn());
            DataCenterVO dcVO = _dcDao.findById(vm.getDataCenterId());
            HostVO hostVO = _hostDao.findById(vm.getHostId());

            String hostDesc = "name: " + hostVO.getName() + " (id:" + hostVO.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
            _alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodIdToDeployIn(), "VM (name: " + vm.getInstanceName() + ", id: " + vm.getId() + ") stopped on host " + hostDesc
                    + " due to storage failure", "Virtual Machine " + vm.getInstanceName() + " (id: " + vm.getId() + ") running on host [" + vm.getHostId() + "] stopped due to storage failure.");
        }

        if (trackExternalChange) {
            if (serverState == State.Starting) {
                if (vm.getHostId() != null && vm.getHostId() != hostId) {
                    s_logger.info("CloudStack is starting VM on host " + vm.getHostId() + ", but status report comes from a different host " + hostId + ", skip status sync for vm: "
                            + vm.getInstanceName());
                    return null;
                }
            }
            if (vm.getHostId() == null || hostId != vm.getHostId()) {
                try {
                    ItWorkVO workItem = _workDao.findByOutstandingWork(vm.getId(), State.Migrating);
                    if(workItem == null){
                        stateTransitTo(vm, VirtualMachine.Event.AgentReportMigrated, hostId);
                    }
                } catch (NoTransitionException e) {
                }
            }
        }

        // during VM migration time, don't sync state will agent status update
        if (serverState == State.Migrating) {
            s_logger.debug("Skipping vm in migrating state: " + vm);
            return null;
        }

        if (trackExternalChange) {
            if (serverState == State.Starting) {
                if (vm.getHostId() != null && vm.getHostId() != hostId) {
                    s_logger.info("CloudStack is starting VM on host " + vm.getHostId() + ", but status report comes from a different host " + hostId + ", skip status sync for vm: "
                            + vm.getInstanceName());
                    return null;
                }
            }

            if (serverState == State.Running) {
                try {
                    //
                    // we had a bug that sometimes VM may be at Running State
                    // but host_id is null, we will cover it here.
                    // means that when CloudStack DB lost of host information,
                    // we will heal it with the info reported from host
                    //
                    if (vm.getHostId() == null || hostId != vm.getHostId()) {
                        if (s_logger.isDebugEnabled()) {
                            s_logger.debug("detected host change when VM " + vm + " is at running state, VM could be live-migrated externally from host " + vm.getHostId() + " to host " + hostId);
                        }

                        stateTransitTo(vm, VirtualMachine.Event.AgentReportMigrated, hostId);
                    }
                } catch (NoTransitionException e) {
                    s_logger.warn(e.getMessage());
                }
            }
        }

        if (agentState == serverState) {
            if (s_logger.isDebugEnabled()) {
                s_logger.debug("Both states are " + agentState + " for " + vm);
            }
            assert (agentState == State.Stopped || agentState == State.Running) : "If the states we send up is changed, this must be changed.";
            if (agentState == State.Running) {
                try {
                    stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, hostId);
                } catch (NoTransitionException e) {
                    s_logger.warn(e.getMessage());
                }
                // FIXME: What if someone comes in and sets it to stopping? Then
                // what?
                return null;
            }

            s_logger.debug("State matches but the agent said stopped so let's send a cleanup command anyways.");
            return cleanup(vm);
        }

        if (agentState == State.Shutdowned) {
            if (serverState == State.Running || serverState == State.Starting || serverState == State.Stopping) {
                try {
                    advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
                } catch (AgentUnavailableException e) {
                    assert (false) : "How do we hit this with forced on?";
                    return null;
                } catch (OperationTimedoutException e) {
                    assert (false) : "How do we hit this with forced on?";
                    return null;
                } catch (ConcurrentOperationException e) {
                    assert (false) : "How do we hit this with forced on?";
                    return null;
                }
            } else {
                s_logger.debug("Sending cleanup to a shutdowned vm: " + vm.getInstanceName());
                command = cleanup(vm);
            }
        } else if (agentState == State.Stopped) {
            // This state means the VM on the agent was detected previously
            // and now is gone. This is slightly different than if the VM
            // was never completed but we still send down a Stop Command
            // to ensure there's cleanup.
            if (serverState == State.Running) {
                // Our records showed that it should be running so let's restart
                // it.
                _haMgr.scheduleRestart(vm, false);
            } else if (serverState == State.Stopping) {
                _haMgr.scheduleStop(vm, hostId, WorkType.ForceStop);
                s_logger.debug("Scheduling a check stop for VM in stopping mode: " + vm);
            } else if (serverState == State.Starting) {
                s_logger.debug("Ignoring VM in starting mode: " + vm.getInstanceName());
                _haMgr.scheduleRestart(vm, false);
            }
            command = cleanup(vm);
        } else if (agentState == State.Running) {
            if (serverState == State.Starting) {
                if (fullSync) {
                    try {
                        ensureVmRunningContext(hostId, vm, Event.AgentReportRunning);
                    } catch (OperationTimedoutException e) {
                        s_logger.error("Exception during update for running vm: " + vm, e);
                        return null;
                    } catch (ResourceUnavailableException e) {
                        s_logger.error("Exception during update for running vm: " + vm, e);
                        return null;
                    }catch (InsufficientAddressCapacityException e) {
                        s_logger.error("Exception during update for running vm: " + vm, e);
                        return null;
                    }catch (NoTransitionException e) {
                        s_logger.warn(e.getMessage());
                    }
                }
            } else if (serverState == State.Stopping) {
                s_logger.debug("Scheduling a stop command for " + vm);
                _haMgr.scheduleStop(vm, hostId, WorkType.Stop);
            } else {
                s_logger.debug("server VM state " + serverState + " does not meet expectation of a running VM report from agent");

                // just be careful not to stop VM for things we don't handle
                // command = cleanup(vm);
            }
        }
        return command;
    }

CompareState is called upon receive of host state report. The running context of compareState is within the host reporting thread, the time can happen at any time, regardless if there is any active job that may be operating on the VM from another thread, therefore it has to handle all scenarios that a VM could possibly be in. For example, to deal with scenarios if currently VM is at Migrating state, or at Starting, Stopping, etc. If there is ever a need to introduce another transition VM state, it again will be very hard for developers to follow. In the situation of VM migration case, two host may report the state about the same VM at an unexpected order, this also increase the complexity to handle the situation correctly.

Together with the requirement at resource agent-side of being aware of CloudStack defined VM states, the two partitioned logic in resource-agent and management server on the VM state sync processing has made it extremely hard for a ordinary developer to follow. since things are tightly coupled everywhere, resource-agent has to understand how to cache state in Starting state for example, compareState has to exhaust all possible state-transition scenarios, to make every case work reliably seems to become an impossible task under current framework. 

In this refactoring proposal, a fundamental change has been made to the VM Sync process, we now rely on job scheduling and serializing job execution to make the whole process loose-coupled and easier to handle. Changes between two stationary states (i.e, from Stopped to Running, or from Running to Stopped will always be associated with a transition job, and all jobs operating on the same VM will be executed in order, at one given time, there will be at most one and only one job can be executed upon. From job handling perspective, VM state report appears only be trigger event. This means that there will be no central place like compareState that needs to exhaust all handling scenarios.

Follow code snaplet gives a synchronous handling logic example that is supported in the new model.

    @Override
    public <T extends VMInstanceVO> boolean advanceStop(final T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
    	VmWorkJobVO workJob = null;
    	Transaction txn = Transaction.currentTxn();
    	try {
        	txn.start();

        	_vmDao.lockRow(vm.getId(), true);

        	List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
        		VirtualMachine.Type.Instance, vm.getId(), VmWorkConstants.VM_WORK_STOP);

        	if(pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
        		assert(pendingWorkJobs.size() == 1);
        		workJob = pendingWorkJobs.get(0);
        	} else {
        		workJob = new VmWorkJobVO();

        		workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
        		workJob.setCmd(VmWorkConstants.VM_WORK_STOP);

        		workJob.setAccountId(account.getId());
        		workJob.setUserId(user.getId());
        		workJob.setStep(VmWorkJobVO.Step.Prepare);
        		workJob.setVmType(vm.getType());
        		workJob.setVmInstanceId(vm.getId());

        		// save work context info (there are some duplications)
        		VmWorkStop workInfo = new VmWorkStop();
        		workInfo.setAccountId(account.getId());
        		workInfo.setUserId(user.getId());
        		workInfo.setVmId(vm.getId());
        		workInfo.setForceStop(forced);
        		workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo));

        		_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
        	}

        	txn.commit();
    	} catch(Throwable e) {
    		s_logger.error("Unexpected exception", e);
    		txn.rollback();
    		throw new ConcurrentOperationException("Unhandled exception, converted to ConcurrentOperationException");
    	}

    	final long jobId = workJob.getId();
    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);

    	//
    	// TODO : this will be replaced with fully-asynchronous way later so that we don't need
    	// to wait here. The reason we do it synchronous here is that callers of advanceStart is expecting
    	// synchronous semantics
    	//
    	//
    	_jobMgr.waitAndCheck(
    		new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
    		3000L, 600000L, new Predicate() {

				@Override
				public boolean checkCondition() {
					VMInstanceVO instance = _vmDao.findById(vm.getId());
					if(instance.getPowerState() == VirtualMachine.PowerState.PowerOff)
						return true;

					VmWorkJobVO workJob = _workJobDao.findById(jobId);
					if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
						return true;

					return false;
				}
    		});

    	try {
    		AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId);
    	} catch(Exception e) {
    		s_logger.error("Unexpected exception", e);
    		return false;
    	}
    	return true;
    }

Every job on the VM will start with a known stationary VM state, and since the execution has been serialized, every job only needs to handle scenarios that are expected in the specified flow, any unexpected change should be considered to fail the job as whole. There will be no traditional sync concept actually, VM state changes will be notified through messaging facility to its interested receivers. In the above example, After carrying out a stop command, Stopping job will just wait for an expected VM PoweredOff for a successful job completion, otherwise, simply fail the job as necessary.

3) Message bus to coordinate with activities

We will try to use a message-bus to co-ordinate different activities within the management server. This facility is different with the existing feature of "Event Bus", the later one is mainly to integrate external systems through persist-able message-queue servers.

Message bus is a loosely-coupled way for publish/subscribe pattern. We currently use publish/subscribe pattern a lot, but in a strong-type way, for example, Listener interface

public interface Listener {
    boolean processAnswers(long agentId, long seq, Answer[] answers);
    boolean processCommands(long agentId, long seq, Command[] commands);
    AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd);
    void processConnect(HostVO host, StartupCommand cmd, boolean forRebalance) throws ConnectionException;
    boolean processDisconnect(long agentId, Status state);
    boolean isRecurring();
    int getTimeout();
    boolean processTimeout(long agentId, long seq);
}

All subscribers on agent related topic (i.e., VirtualMachineManagerImpl, ConsoleProxyManagerImpl etc) have to explicitly wire to the publisher (AgentManagerImpl), as the needs of different topic grows, these explicit wirings make the whole system tightly coupled between components.  

2.2 Code changes

2.2.1 Message Bus facility

public interface MessageBus {

void setMessageSerializer(MessageSerializer messageSerializer);

MessageSerializer getMessageSerializer();


void subscribe(String topic, MessageSubscriber subscriber);

void unsubscribe(String topic, MessageSubscriber subscriber);

void clearAll();

void prune();


void publish(String senderAddress, String topic, PublishScope scope, Object args);

}

 
MessageBus defines the interface of the message bus facility, it implements a simple publish/subscribe pattern, publishers and subscribers can be linked by sharing a common topic,  topic can be in hierarchy mode, a subscriber at higher hierarchy mode can receive messages from all topics that are below. 

MessageBusBase

A simple in-memory message bus implementation. PublishScope defines two available scope value, PublishScope.LOCAL means to broadcast message to all subscribed components running within the local management server. PublishScope.GLOBAL means to publish messages to all management servers across the cluster.

Since majority of time to access internal message bus data structure will be READs, MessageBusBase uses a special MessageBusBase.Gate to implement Read/Write lock behave for the sake of performance in multi-threaded running environment. 

MessageSerializer

Message bus can be extended remotely, across management server within the cluster or even extend its boundary to remote agents. This can be achieved by plugging-in different MessageBus implementations. To make the whole system to be loosely coupled and be adaptive to implementation from non-Java environment, we need to define a common exchange ground for the on-wire format.  MessageSerializer serves the purpose of this.

MessageHandler

Java annotation for subscriber to specify a message handler. Used together with MessageDispatcher. Message subscribers can declare a message handler in following pattern.

class FooSubscriber {

    @MessageHandler(topic="Host")
    public void processHostMessage(String sender, String topic, Object args) {
    ...
}

Subscribers using this pattern can be written in POJO(Plain-Old-Java-Object) style, eliminate the needs to implement the MessageSubscriber interface. To register such object into message bus, we use a decoration object as following

MessageDispatcher decoratedSubscriber = new MessageDispatcher(FooSubscriber);

_messageBus.subscriber("Host", decoratedSubscriber);

MessageDispatcher

For message subscriber to use to dispatch received messages to annotated message handlers, the above example shows the use case from design point of view.

MessageDetector

Message bus pattern by nature conforms to event-driven programming model. However, due to the reason that we have a large amount of code base that follows synchronized programming model, we have considerable needs that most of time we need a mechanism to feed asynchronized event notification into a synchronized flow. Following is a typical usage pattern for such scenario.

	//
    	// TODO : this will be replaced with fully-asynchronized way later so that we don't need
    	// to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting
    	// synchronized semantics
    	//
    	//
    	_jobMgr.waitAndCheck(
    		new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
    		3000L, 600000L, new Predicate() {

				@Override
				public boolean checkCondition() {
					VMInstanceVO instance = _vmDao.findById(vm.getId());
					if(instance.getPowerState() == VirtualMachine.PowerState.PowerOff)
						return true;

					VmWorkJobVO workJob = _workJobDao.findById(jobId);
					if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
						return true;

					return false;
				}
    		});


_jobMgr.waitAndCheck relies on the service provided by MessageDetector. To fit into synchronized programming model, MessageDetector will block the execution until an interested event appears on the message bus.

TopicConstants

public interface TopicConstants {
	// VM power state messages on message bus
	public static final String VM_POWER_STATE = "vm.powerstate";

	// job messages on message bus
	public static final String JOB_HEARTBEAT = "job.heartbeat";
	public static final String JOB_STATE = "job.state";


	...
}

In loosely-coupled model, topics on the message bus become the glue point between components, to avoid the coupling become too loose, all topics on the message bus are put into central management. Topics and related parameters with the topic needs to be defined in TopicConstants. 

2.2.2 Job facility

Job types

A job in CloudStack actually represents an orchestration work flow. Due to historic reason, CloudStack has taken considerable efforts trying to make the concept of job implicit to programmers, this is done through the API Command pattern, for API command that is executed asynchronously, the request will first be posted to an internal job facility but real execution/processing will be called back into the command object from within the job thread context. The whole job facility has been made implicit intentionally, in most of cases, job facility is used as a context switcher to just provide the execution thread context. This implicit use of job facility actually treats job as secondary class, since explicit job control is discouraged, it leads to the programming model to handle things in-place within the calling context, synchronization is then usually done through locking. 

In this refactoring proposal, we will promote jobs into first-class objects, jobs are encouraged to be used in a more explicit way. We will use orderly execution to help reduce the use of locking across the code base and manage orchestration processes explicitly. This will give us better control on managing system load.

However, due to the legacy bagage we have, moving to this direction will be a long journey, in order to make existing model work without too much change, instead of having one ideal abstract job facility that does general job scheduling, execution and ordering, we will have 3 major job types currently.

1) API job

API job gives a running context for an asynchronous API request, it usually starts an orchestration process. 

2) Work job

Work job in the new model carries the real orchestrator process, its run will be serialized on the target VM object.

3) Pseudo Job

Inside CloudStack, there are a few manager components that use their own threads to manage service activities, when it comes to use the newly introduced work jobs for orchestration, we sometimes need a pseudo job context,  pseudo job provides just that context. The difference between Pseudo job and an high level API job is that pseudo job runs in its own thread context, while API job runs in the thread from job thread pool.

Job joining

Like a process in operating system, a job can have multiple execution states, it could be put in blocking, or be in currently running state, etc. Joining another job means to wait for completion of the subject job, be either a successful completion or a failure completion. A blocked job may be rescheduled to run based on triggering of events. Currently, a job that is joining to another job can only be scheduled to run upon wakeup events.

Job wakeup

When a job joins to another job, to wait for the completion status of joined job, there are two ways to achieve that. We've shown it for the first way in _jobMgr.waitAndCheck - blocking the executing thread until the condition is satisfied. The problem of this approach is that it holds a real executing thread. If a caller already has a persistent thread, it is not a problem, however, for most of API initiated orchestration jobs, they all share a global job thread pool, blocking executing thread is not the most efficient way for system scalability. The new job facility provides a support for a second approach, this approach will put the job into blocking state, release the executing thread, and then reschedule job execution based on wakeup calls(event triggered).

Message bus provides delivery-at-best service to CloudStack components, for locally broadcast messages(within one management server), it is reliable when management server is running, however, for messages that are across-ing management server boundaries, it is not a 100 percent reliable service for building a reliable orchestration process. When a job is joining and waiting for another job to complete, in order for job check-up process keep on going, the job will be periodically waken up on a specified interval. Therefore, message bus service can be used for efficient event notification and in case that message bus service fails, this wakeup service will help ensure the reliability of the whole orchestration process.

Job heartbeat and Job monitoring

When a job is in executing, to help better monitor system health and load, we've added a Job monitoring facility, to track all active job threads and job heartbeats. All cooperative jobs are required to periodically report job heartbeats on the message bus with topic of TopicConstants.JOB_HEARTBEAT. Stalling jobs can be identified through this monitoring service and in the future we will add graceful job cancellation support if the job's active activity can be interrupted and cancelled.

AsyncJobManagerImpl

Refactor it to decouple the tight linkage with API jobs before, make it generic not only executing async API request jobs but also executing internal VM operating jobs

ApiAsyncJobDispatcher

Dispatch async API request jobs

VmWorkJobDispatcher

dispatch internal async VM operation jobs

VmWorkWakeupJobDispatcher

Dispatch wakeup event to blocking jobs

VmWorkJobVO/VmWorkJobDao/VmWorkJobDaoImpl

Persist classes for internal VM operation jobs

AsyncJobJournalVO/AsyncJobJournalDao/AsyncJobJournalDaoImpl 

Implements job journal facility, all jobs can now have a persist job journal facility

2.2.3 VM Power state sync

VirtualMachinePowerStateSync/VirtualMachinePowerStateSyncImpl

Strictly speaking, there will be no "Sync" fix in the new model at all. Since the whole previous "sync" process has become unnecessary in the new model. When a VM power state report is received by management server, all it does can be abstracted in following work flow

if(  there is a pending job on the subject VM ) {

	Raise wakeup call to the job

} else {

	Save the report info

        trigger and schedule responding job if neccessary

}

When there is no pending job working on the VM, VM should always stay at stationary states (i.e., PoweredOn or PoweredOff), out of sync situation between what CloudStack DB has record and what a host has reported will be resolved with a new job flow, depends on HA configuration, we can either try to eventually bring VM state to be in sync with CloudStack DB or let CloudStack honor what it is reported.

Every flow job starts from one stationary VM state and ends to another/same stationary state, the transition process will be solely conducted and monitored by the job itself, there is no need for a traditional "Sync" process to get involved. 

2.2.4 Other refactored classes

VirtualMachineManagerImpl

HighAvailabilityManagerImpl

VirtualMachineGuru

ReservationContext

Hypervisor resource classes

etc.

3. Use cases

To limit the scope of required refactoring work, we still keep legacy VM states as we have before, the difference is that for operations that could cause state transitions, it will always be managed solely from a job and as long as the management server is running, any transition process should either succeed or fail within a certain limit of time period. Following is the guideline to determine any expected system behave.

1) As long as management server is running, no VM should be stuck at a certain transitional state for too long, these transitional states include Starting, Stoppping, Migrating and Expunging.

2) For any transitional process, i.e., transition from Stopped -> Running through Starting state, if during the time there is external changes on the VM (i.e., stop a just started VM from vCenter), it can only affect the result of job, if the transition direction from external side is the same as what the job is expecting, it has no effect to the final result, otherwise, it can eventually cause the job to fail, in any way, upon the completion of the job, VM will be returned to a stationary state.

3) Upon management server restart, any pending un-completed jobs will be failed, related resources in CloudStack should be released and related VMs should be put into the last known stationary state.

4) If host reports a out-of-sync power state while there is no pending job that is acting on the VM, for non-HA case, CloudStack will always update VM to be in sync with what it is reported from hypervisor, however, since this state transition is not gone through a normal transitional job, and if VM is put into Running state, VM's network environment will most likely be not ready, we will report an system alert about this VM for manual attention from administrator or end-user. However, since the VM has been put into a stationary state that is in-sync with what hypervisor has reported, CloudStack or end-user can always issue an correction command to either stop or start the VM through normal process.

5) To support external VM live migration, a host change in host report will always be honored by CloudStack, regardless whether or not there is a pending job that is acting on the VM. To honor host change of VM, CloudStack updates the VM/host relationship accordingly. Considering a complex migration case as below,

A VM-migrate transition job is trying to lively migrate VM from host 1 to host 2, however, during the time, if external manager like vCenter interrupts it and has migrated it from host 1 to host 3 successfully, the VM-migrating job in CloudStack should fail as the end result of the VM state ends up at an unexpected host, nevertheless, upon completion of this failed job, VM's host change will still be honored so that user can re-perform the migration again, but this time, command will be issued from host 3 to host 2. 

4. Schema changes

ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`;

ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`;

ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_type`;

ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_address`;




ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_type` VARCHAR(32);

ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64);

ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_executing_msid` bigint;

ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_pending_signals` int(10) NOT NULL DEFAULT 0;




ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state` VARCHAR(64) DEFAULT 'PowerUnknown';

ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state_update_time` DATETIME;

ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_state_update_count` INT DEFAULT 0;

ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `power_host` bigint unsigned;

ALTER TABLE `cloud`.`vm_instance` ADD CONSTRAINT `fk_vm_instance__power_host` FOREIGN KEY (`power_host`) REFERENCES `cloud`.`host`(`id`);




CREATE TABLE `cloud`.`vm_work_job` (

  `id` bigint unsigned UNIQUE NOT NULL,

  `step` char(32) NOT NULL COMMENT 'state',

  `vm_type` char(32) NOT NULL COMMENT 'type of vm',

  `vm_instance_id` bigint unsigned NOT NULL COMMENT 'vm instance',

  PRIMARY KEY (`id`),

  CONSTRAINT `fk_vm_work_job__instance_id` FOREIGN KEY (`vm_instance_id`) REFERENCES `vm_instance`(`id`) ON DELETE CASCADE,

  INDEX `i_vm_work_job__vm`(`vm_type`, `vm_instance_id`),

  INDEX `i_vm_work_job__step`(`step`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;




CREATE TABLE `cloud`.`async_job_journal` (

  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',

  `job_id` bigint unsigned NOT NULL,

  `journal_type` varchar(32),

  `journal_text` varchar(1024) COMMENT 'journal descriptive informaton',

  `journal_obj` varchar(1024) COMMENT 'journal strutural information, JSON encoded object',

  `created` datetime NOT NULL COMMENT 'date created',

  PRIMARY KEY (`id`),

  CONSTRAINT `fk_async_job_journal__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE

) ENGINE=InnoDB DEFAULT CHARSET=utf8;




CREATE TABLE `cloud`.`async_job_join_map` (

  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',

  `job_id` bigint unsigned NOT NULL,

  `join_job_id` bigint unsigned NOT NULL,

  `join_status` int NOT NULL,

  `join_result` varchar(1024),

  `join_msid` bigint,

  `complete_msid` bigint,

  `sync_source_id` bigint COMMENT 'upper-level job sync source info before join',

  `wakeup_handler` varchar(64),

  `wakeup_dispatcher` varchar(64),

  `wakeup_interval` bigint NOT NULL DEFAULT 3000 COMMENT 'wakeup interval in seconds',

  `created` datetime NOT NULL,

  `last_updated` datetime,

  `next_wakeup` datetime,

  `expiration` datetime,

  PRIMARY KEY (`id`),

  CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE,

  CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) REFERENCES `async_job`(`id`),

  CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`),

  INDEX `i_async_job_join_map__join_job_id`(`join_job_id`),

  INDEX `i_async_job_join_map__created`(`created`),

  INDEX `i_async_job_join_map__last_updated`(`last_updated`),

  INDEX `i_async_job_join_map__next_wakeup`(`next_wakeup`),

  INDEX `i_async_job_join_map__expiration`(`expiration`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

4. API/UI

This is low-level change that should keep API compatible, UI change is also not mandatory, we can have UI change to take advantage of better job management in the future(i.e. job journal for more descriptive error messages) 



  • No labels