Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,15 @@ public void done(ErrorCodeList errorCodeList) {
judger.getClass(), self.getUuid(), self.getName()));
changeVmStateInDb(VmInstanceStateEvent.stopped, null, HaStartVmInstanceMsg.class.getName());

if (!VmSystemTags.HA_PRE_FENCE_PENDING.hasTag(self.getUuid())) {
SystemTagCreator creator = VmSystemTags.HA_PRE_FENCE_PENDING.newSystemTagCreator(self.getUuid());
creator.inherent = true;
creator.recreate = false;
creator.ignoreIfExisting = true;
creator.tag = VmSystemTags.HA_PRE_FENCE_PENDING.getTagFormat();
creator.create();
}

startVm(msg, new Completion(msg, chain) {
@Override
public void success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.componentloader.PluginRegistry;
import org.zstack.core.workflow.FlowChainBuilder;
import org.zstack.header.core.Completion;
import org.zstack.header.core.workflow.Flow;
import org.zstack.header.core.workflow.FlowChain;
import org.zstack.header.core.workflow.FlowDoneHandler;
import org.zstack.header.core.workflow.FlowErrorHandler;
import org.zstack.header.core.workflow.FlowRollback;
import org.zstack.header.core.workflow.FlowTrigger;
import org.zstack.header.core.workflow.NoRollbackFlow;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.host.HostConstant;
import org.zstack.header.message.MessageReply;
import org.zstack.header.vm.*;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
Expand All @@ -26,34 +34,67 @@ public class VmStartOnHypervisorFlow implements Flow {
@Autowired
private PluginRegistry pluginRgty;

private final List<VmBeforeStartOnHypervisorExtensionPoint> exts = pluginRgty.getExtensionList(VmBeforeStartOnHypervisorExtensionPoint.class);;

private void fireExtensions(VmInstanceSpec spec) {
for (VmBeforeStartOnHypervisorExtensionPoint ext : exts) {
ext.beforeStartVmOnHypervisor(spec);
}
}

@Override
public void run(final FlowTrigger chain, final Map data) {
final VmInstanceSpec spec = (VmInstanceSpec) data.get(VmInstanceConstant.Params.VmInstanceSpec.toString());

fireExtensions(spec);

StartVmOnHypervisorMsg msg = new StartVmOnHypervisorMsg();
msg.setVmSpec(spec);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, spec.getDestHost().getUuid());
bus.send(msg, new CloudBusCallBack(chain) {
final List<VmBeforeStartOnHypervisorExtensionPoint> exts =
pluginRgty.getExtensionList(VmBeforeStartOnHypervisorExtensionPoint.class);
FlowChain fchain = FlowChainBuilder.newSimpleFlowChain();
fchain.setName(String.format("vm-start-on-hypervisor-vm-%s", spec.getVmInventory().getUuid()));
fchain.then(new NoRollbackFlow() {
@Override
public void run(MessageReply reply) {
if (reply.isSuccess()) {
data.put(VmStartOnHypervisorFlow.class.getName(), true);
chain.next();
} else {
chain.fail(reply.getError());
}
public void run(FlowTrigger trigger, Map d) {
Iterator<VmBeforeStartOnHypervisorExtensionPoint> it = exts.iterator();
Runnable[] step = new Runnable[1];
step[0] = () -> {
if (!it.hasNext()) {
trigger.next();
return;
}
it.next().beforeStartVmOnHypervisor(spec, new Completion(trigger) {
@Override
public void success() {
step[0].run();
}
@Override
public void fail(ErrorCode err) {
trigger.fail(err);
}
});
};
step[0].run();
}
});
fchain.then(new NoRollbackFlow() {
@Override
public void run(FlowTrigger trigger, Map d) {
StartVmOnHypervisorMsg msg = new StartVmOnHypervisorMsg();
msg.setVmSpec(spec);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, spec.getDestHost().getUuid());
bus.send(msg, new CloudBusCallBack(trigger) {
@Override
public void run(MessageReply reply) {
if (reply.isSuccess()) {
data.put(VmStartOnHypervisorFlow.class.getName(), true);
trigger.next();
} else {
trigger.fail(reply.getError());
}
}
});
}
});
fchain.done(new FlowDoneHandler(chain) {
@Override
public void handle(Map d) {
chain.next();
}
}).error(new FlowErrorHandler(chain) {
@Override
public void handle(ErrorCode errCode, Map d) {
chain.fail(errCode);
}
}).start();
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions compute/src/main/java/org/zstack/compute/vm/VmSystemTags.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,8 @@ public String desensitizeTag(SystemTag systemTag, String tag) {

public static PatternedSystemTag VM_STATE_PAUSED_AFTER_MIGRATE = new PatternedSystemTag(("vmPausedAfterMigrate"), VmInstanceVO.class);

public static SystemTag HA_PRE_FENCE_PENDING =
new SystemTag("haPreFencePending", VmInstanceVO.class);

public static PatternedSystemTag VM_MEMORY_ACCESS_MODE_SHARED = new PatternedSystemTag(("vmMemoryAccessModeShared"), VmInstanceVO.class);
}
6 changes: 6 additions & 0 deletions conf/springConfigXml/Kvm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@
</zstack:plugin>
</bean>

<bean id="KvmHaPreFenceVmExtension" class="org.zstack.kvm.KvmHaPreFenceVmExtension">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.vm.VmBeforeStartOnHypervisorExtensionPoint" />
</zstack:plugin>
</bean>

<bean id="KvmResourceConfigExtension" class="org.zstack.kvm.KvmResourceConfigExtension">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.vm.ResourceConfigMemorySnapshotExtensionPoint" />
Expand Down
35 changes: 35 additions & 0 deletions header/src/main/java/org/zstack/header/vm/FenceVmOnHostMsg.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.zstack.header.vm;

import org.zstack.header.host.HostMessage;
import org.zstack.header.message.NeedReplyMessage;

public class FenceVmOnHostMsg extends NeedReplyMessage implements HostMessage {
private String hostUuid;
private String suspectHostUuid;
private String vmUuid;

@Override
public String getHostUuid() {
return hostUuid;
}

public void setHostUuid(String hostUuid) {
this.hostUuid = hostUuid;
}

public String getSuspectHostUuid() {
return suspectHostUuid;
}

public void setSuspectHostUuid(String suspectHostUuid) {
this.suspectHostUuid = suspectHostUuid;
}

public String getVmUuid() {
return vmUuid;
}

public void setVmUuid(String vmUuid) {
this.vmUuid = vmUuid;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.zstack.header.vm;

import org.zstack.header.message.MessageReply;

public class FenceVmOnHostReply extends MessageReply {
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.zstack.header.vm;

import org.zstack.header.core.Completion;

/**
*/
public interface VmBeforeStartOnHypervisorExtensionPoint {
void beforeStartVmOnHypervisor(VmInstanceSpec spec);
void beforeStartVmOnHypervisor(VmInstanceSpec spec, Completion completion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.zstack.core.CoreGlobalProperty;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.core.errorcode.ErrorFacade;
import org.zstack.header.core.Completion;
import org.zstack.header.errorcode.OperationFailureException;
import org.zstack.header.vm.VmBeforeCreateOnHypervisorExtensionPoint;
import org.zstack.header.vm.VmBeforeStartOnHypervisorExtensionPoint;
Expand Down Expand Up @@ -65,7 +66,8 @@ public void beforeCreateVmOnHypervisor(VmInstanceSpec spec) {
}

@Override
public void beforeStartVmOnHypervisor(VmInstanceSpec spec) {
public void beforeStartVmOnHypervisor(VmInstanceSpec spec, Completion completion) {
checkManagementIp(spec, false);
completion.success();
}
}
41 changes: 41 additions & 0 deletions plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -5040,4 +5040,45 @@ public void setMemoryUsage(long memoryUsage) {
}
}

public static class FenceVmOnSuspectHostCmd extends AgentCommand implements java.io.Serializable {
@GrayVersion(value = "5.4.8")
public String vmUuid;

@GrayVersion(value = "5.4.8")
public String targetHostUuid;

@GrayVersion(value = "5.4.8")
public String targetHostIp;

@GrayVersion(value = "5.4.8")
public String targetHostUsername;

@GrayVersion(value = "5.4.8")
@NoLogging
public String targetHostPassword;

@GrayVersion(value = "5.4.8")
public Integer targetHostSshPort;

@GrayVersion(value = "5.4.8")
public Integer sshTimeoutSec;
}

public static class FenceVmOnSuspectHostRsp extends AgentResponse {
@GrayVersion(value = "5.4.8")
public Boolean qemuConfirmedDead;

@GrayVersion(value = "5.4.8")
public Boolean qemuStillAlive;

@GrayVersion(value = "5.4.8")
public Boolean targetHostUnreachable;

@GrayVersion(value = "5.4.8")
public String stdout;

@GrayVersion(value = "5.4.8")
public String stderr;
}

}
1 change: 1 addition & 0 deletions plugin/kvm/src/main/java/org/zstack/kvm/KVMConstant.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public interface KVMConstant {
String KVM_LOGOUT_ISCSI_PATH = "/iscsi/target/logout";
String KVM_LOGIN_ISCSI_PATH = "/iscsi/target/login";
String KVM_HARDEN_CONSOLE_PATH = "/vm/console/harden";
String KVM_HA_FENCE_VM_ON_SUSPECT_HOST_PATH = "/ha/vm/fenceonsuspecthost";
String KVM_DELETE_CONSOLE_FIREWALL_PATH = "/vm/console/deletefirewall";
String KVM_UPDATE_HOST_OS_PATH = "/host/updateos";
String KVM_HOST_UPDATE_DEPENDENCY_PATH = "/host/updatedependency";
Expand Down
91 changes: 91 additions & 0 deletions plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ protected void handleLocalMessage(Message msg) {
handle((RebootVmOnHypervisorMsg) msg);
} else if (msg instanceof DestroyVmOnHypervisorMsg) {
handle((DestroyVmOnHypervisorMsg) msg);
} else if (msg instanceof FenceVmOnHostMsg) {
handle((FenceVmOnHostMsg) msg);
} else if (msg instanceof AttachVolumeToVmOnHypervisorMsg) {
handle((AttachVolumeToVmOnHypervisorMsg) msg);
} else if (msg instanceof DetachVolumeFromVmOnHypervisorMsg) {
Expand Down Expand Up @@ -982,6 +984,94 @@ public void run(MessageReply reply) {
});
}

private void handle(final FenceVmOnHostMsg msg) {
final FenceVmOnHostReply reply = new FenceVmOnHostReply();
final String peerHostUuid = self.getUuid();
final String suspectHostUuid = msg.getSuspectHostUuid();
if (suspectHostUuid == null || suspectHostUuid.equals(peerHostUuid)) {
reply.setError(operr("HA-start vm[%s]: invalid pre-fence routing -- suspectHostUuid=[%s], peer=[%s].",
msg.getVmUuid(), suspectHostUuid, peerHostUuid));
bus.reply(msg, reply);
return;
}

KVMHostVO suspectVO = dbf.findByUuid(suspectHostUuid, KVMHostVO.class);
if (suspectVO == null) {
reply.setError(operr("HA-start vm[%s]: suspect KVM host[%s] no longer exists; refuse to start to prevent split-brain.",
msg.getVmUuid(), suspectHostUuid));
bus.reply(msg, reply);
return;
}

FenceVmOnSuspectHostCmd cmd = new FenceVmOnSuspectHostCmd();
cmd.vmUuid = msg.getVmUuid();
cmd.targetHostUuid = suspectHostUuid;
cmd.targetHostIp = suspectVO.getManagementIp();
cmd.targetHostUsername = suspectVO.getUsername();
cmd.targetHostPassword = suspectVO.getPassword();
cmd.targetHostSshPort = suspectVO.getPort() != null ? suspectVO.getPort() : 22;
cmd.sshTimeoutSec = 20;

KVMHostAsyncHttpCallMsg fmsg = new KVMHostAsyncHttpCallMsg();
fmsg.setHostUuid(peerHostUuid);
fmsg.setPath(KVMConstant.KVM_HA_FENCE_VM_ON_SUSPECT_HOST_PATH);
fmsg.setCommand(cmd);
bus.makeTargetServiceIdByResourceUuid(fmsg, HostConstant.SERVICE_ID, peerHostUuid);

logger.info(String.format("[HA pre-fence] vm[%s] peer[%s] killing on suspect host[%s ip=%s]",
msg.getVmUuid(), peerHostUuid, suspectHostUuid, suspectVO.getManagementIp()));

bus.send(fmsg, new CloudBusCallBack(msg) {
@Override
public void run(MessageReply r) {
if (!r.isSuccess()) {
logger.warn(String.format("[HA pre-fence] vm[%s] transport error to sibling[%s] killing on suspect host[%s]: %s",
msg.getVmUuid(), peerHostUuid, suspectHostUuid, r.getError()));
reply.setError(operr("HA-start vm[%s]: transport error asking sibling[%s] to kill vm on suspect host[%s]. " +
"Refuse to start to prevent split-brain. See management log for details.",
msg.getVmUuid(), peerHostUuid, suspectHostUuid));
bus.reply(msg, reply);
return;
}
FenceVmOnSuspectHostRsp rsp = ((KVMHostAsyncHttpCallReply) r).toResponse(FenceVmOnSuspectHostRsp.class);
boolean confirmedDead = Boolean.TRUE.equals(rsp.qemuConfirmedDead);
boolean stillAlive = Boolean.TRUE.equals(rsp.qemuStillAlive);
boolean unreachable = Boolean.TRUE.equals(rsp.targetHostUnreachable);
int verdicts = (confirmedDead ? 1 : 0) + (stillAlive ? 1 : 0) + (unreachable ? 1 : 0);

if (stillAlive) {
logger.warn(String.format("[HA pre-fence] vm[%s] qemu still alive on suspect[%s] via sibling[%s]; " +
"agent-error=[%s] stdout-len=%d stderr-len=%d",
msg.getVmUuid(), suspectHostUuid, peerHostUuid,
rsp.getError(),
rsp.stdout == null ? 0 : rsp.stdout.length(),
rsp.stderr == null ? 0 : rsp.stderr.length()));
reply.setError(operr("HA-start vm[%s]: qemu is still alive on suspect host[%s] after sibling[%s] " +
"force-destroy attempt. Refuse to start to prevent split-brain. " +
"See management log for kill agent output.",
msg.getVmUuid(), suspectHostUuid, peerHostUuid));
bus.reply(msg, reply);
return;
}
if (!rsp.isSuccess() || verdicts != 1) {
logger.warn(String.format("[HA pre-fence] vm[%s] ambiguous kill verdict from sibling[%s] for suspect[%s]: " +
"success=%s confirmedDead=%s stillAlive=%s unreachable=%s agent-error=[%s]",
msg.getVmUuid(), peerHostUuid, suspectHostUuid,
rsp.isSuccess(), rsp.qemuConfirmedDead, rsp.qemuStillAlive, rsp.targetHostUnreachable,
rsp.getError()));
reply.setError(operr("HA-start vm[%s]: sibling[%s] returned an ambiguous kill verdict for suspect host[%s]. " +
"Refuse to start to prevent split-brain. See management log for details.",
msg.getVmUuid(), peerHostUuid, suspectHostUuid));
bus.reply(msg, reply);
return;
}
logger.info(String.format("[HA pre-fence] vm[%s] cleared by sibling[%s]: confirmedDead=%s, targetUnreachable=%s",
msg.getVmUuid(), peerHostUuid, confirmedDead, unreachable));
bus.reply(msg, reply);
}
});
}

private void handle(CommitVolumeSnapshotOnHypervisorMsg msg) {
inQueue().name(String.format("commit-volume-snapshot-on-kvm-%s", self.getUuid()))
.asyncBackup(msg)
Expand Down Expand Up @@ -6893,4 +6983,5 @@ public void fail(ErrorCode errorCode) {
}
});
}

}
Loading