diff --git a/qemu/qmp.go b/qemu/qmp.go index bc2596823..fc09543f8 100644 --- a/qemu/qmp.go +++ b/qemu/qmp.go @@ -254,6 +254,15 @@ func (q *QMP) processQMPInput(line []byte, cmdQueue *list.List) { } } +func currentCommandDoneCh(cmdQueue *list.List) <-chan struct{} { + cmdEl := cmdQueue.Front() + if cmdEl == nil { + return nil + } + cmd := cmdEl.Value.(*qmpCommand) + return cmd.ctx.Done() +} + func (q *QMP) writeNextQMPCommand(cmdQueue *list.List) { cmdEl := cmdQueue.Front() cmd := cmdEl.Value.(*qmpCommand) @@ -293,6 +302,16 @@ func failOutstandingCommands(cmdQueue *list.List) { } } +func (q *QMP) cancelCurrentCommand(cmdQueue *list.List) { + cmdEl := cmdQueue.Front() + cmd := cmdEl.Value.(*qmpCommand) + if cmd.resultReceived { + q.finaliseCommand(cmdEl, cmdQueue, false) + } else { + cmd.filter = nil + } +} + func (q *QMP) parseVersion(version []byte) *QMPVersion { var qmp map[string]interface{} err := json.Unmarshal(version, &qmp) @@ -327,6 +346,39 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion { } } +// The qemu package allows multiple QMP commands to be submitted concurrently +// from different Go routines. Unfortunately, QMP doesn't really support parallel +// commands as there is no way reliable way to associate a command response +// with a request. For this reason we need to submit our commands to +// QMP serially. The qemu package performs this serialisation using a +// queue (cmdQueue owned by mainLoop). We use a queue rather than a simple +// mutex so we can support cancelling of commands (see below) and ordered +// execution of commands, i.e., if command B is issued before command C, +// it should be executed before command C even if both commands are initially +// blocked waiting for command A to finish. This would be hard to achieve with +// a simple mutex. +// +// Cancelling is a little tricky. Commands such as ExecuteQMPCapabilities +// can be cancelled by cancelling or timing out their contexts. When a +// command is cancelled the calling function, e.g., ExecuteQMPCapabilities, +// will return but we may not be able to remove the command's entry from +// the command queue or issue the next command. There are two scenarios +// here. +// +// 1. The command has been processed by QMP, i.e., we have received a +// return or an error, but is still blocking as it is waiting for +// an event. For example, the ExecuteDeviceDel blocks until a DEVICE_DELETED +// event is received. When such a command is cancelled we can remove it +// from the queue and start issuing the next command. When the DEVICE_DELETED +// event eventually arrives it will just be ignored. +// +// 2. The command has not been processed by QMP. In this case the command +// needs to remain on the cmdQueue until the response to this command is +// received from QMP. During this time no new commands can be issued. When the +// response is received, it is discarded (as no one is interested in the result +// any more), the entry is removed from the cmdQueue and we can proceed to +// execute the next command. + func (q *QMP) mainLoop() { cmdQueue := list.New().Init() fromVMCh := make(chan []byte) @@ -343,6 +395,7 @@ func (q *QMP) mainLoop() { }() version := []byte{} + var cmdDoneCh <-chan struct{} DONE: for { @@ -359,6 +412,7 @@ DONE: } if cmdQueue.Len() >= 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } break DONE } @@ -373,14 +427,25 @@ DONE: return } _ = cmdQueue.PushBack(&cmd) - if cmdQueue.Len() >= 1 { + + // We only want to execute the new cmd if there + // are no other commands pending. If there are + // commands pending our new command will get + // run when the pending commands complete. + + if cmdQueue.Len() == 1 { q.writeNextQMPCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } case line, ok := <-fromVMCh: if !ok { return } q.processQMPInput(line, cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) + case <-cmdDoneCh: + q.cancelCurrentCommand(cmdQueue) + cmdDoneCh = currentCommandDoneCh(cmdQueue) } } } diff --git a/qemu/qmp_test.go b/qemu/qmp_test.go index 4d60c0099..54ec792cd 100644 --- a/qemu/qmp_test.go +++ b/qemu/qmp_test.go @@ -133,7 +133,7 @@ func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) { }() } -func (b *qmpTestCommandBuffer) AddCommmand(name string, args map[string]interface{}, +func (b *qmpTestCommandBuffer) AddCommand(name string, args map[string]interface{}, result string, data map[string]interface{}) { b.cmds = append(b.cmds, qmpTestCommand{name, args}) if data == nil { @@ -186,7 +186,9 @@ func (b *qmpTestCommandBuffer) Read(p []byte) (n int, err error) { func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { var cmdJSON map[string]interface{} - if b.currentCmd >= len(b.cmds) { + currentCmd := b.currentCmd + b.currentCmd++ + if currentCmd >= len(b.cmds) { b.t.Fatalf("Unexpected command") } err := json.Unmarshal(p, &cmdJSON) @@ -195,14 +197,14 @@ func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) { } cmdName := cmdJSON["execute"] gotCmdName := cmdName.(string) - result := b.results[b.currentCmd].result - if gotCmdName != b.cmds[b.currentCmd].name { + result := b.results[currentCmd].result + if gotCmdName != b.cmds[currentCmd].name { b.t.Errorf("Unexpected command. Expected %s found %s", - b.cmds[b.currentCmd].name, gotCmdName) + b.cmds[currentCmd].name, gotCmdName) result = "error" } resultMap := make(map[string]interface{}) - resultMap[result] = b.results[b.currentCmd].data + resultMap[result] = b.results[currentCmd].data encodedRes, err := json.Marshal(&resultMap) if err != nil { b.t.Errorf("Unable to encode result: %v", err) @@ -263,7 +265,7 @@ func TestQMPCapabilities(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("qmp_capabilities", nil, "return", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -286,7 +288,7 @@ func TestQMPStop(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("stop", nil, "return", nil) + buf.AddCommand("stop", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -309,7 +311,7 @@ func TestQMPCont(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("cont", nil, "return", nil) + buf.AddCommand("cont", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -331,7 +333,7 @@ func TestQMPQuit(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("quit", nil, "return", nil) + buf.AddCommand("quit", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -353,7 +355,7 @@ func TestQMPBlockdevAdd(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("blockdev-add", nil, "return", nil) + buf.AddCommand("blockdev-add", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -376,7 +378,7 @@ func TestQMPDeviceAdd(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("device_add", nil, "return", nil) + buf.AddCommand("device_add", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -401,7 +403,7 @@ func TestQMPXBlockdevDel(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("x-blockdev-del", nil, "return", nil) + buf.AddCommand("x-blockdev-del", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -435,7 +437,7 @@ func TestQMPDeviceDel(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("device_del", nil, "return", nil) + buf.AddCommand("device_del", nil, "return", nil) buf.AddEvent("DEVICE_DELETED", time.Millisecond*200, map[string]interface{}{ "path": path, @@ -500,7 +502,7 @@ func TestQMPDeviceDelTimeout(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("device_del", nil, "return", nil) + buf.AddCommand("device_del", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -527,8 +529,8 @@ func TestQMPCancel(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("qmp_capabilities", nil, "return", nil) - buf.AddCommmand("qmp_capabilities", nil, "return", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) cfg := QMPConfig{Logger: qmpTestLogger{}} q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) @@ -562,7 +564,7 @@ func TestQMPSystemPowerdown(t *testing.T) { connectedCh := make(chan *QMPVersion) disconnectedCh := make(chan struct{}) buf := newQMPTestCommandBuffer(t) - buf.AddCommmand("system_powerdown", nil, "return", nil) + buf.AddCommand("system_powerdown", nil, "return", nil) buf.AddEvent("SHUTDOWN", time.Millisecond*100, nil, map[string]interface{}{ @@ -582,6 +584,101 @@ func TestQMPSystemPowerdown(t *testing.T) { wg.Wait() } +// Checks that event commands can be cancelled. +// +// We start a QMPLoop, send the system_powerdown command. This command +// will time out after 1 second as the SHUTDOWN event never arrives. +// We then send a quit command to terminate the session. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The quit command should be successfully received and the QMP loop should +// exit gracefully. +func TestQMPEventedCommandCancel(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + buf.AddCommand("system_powerdown", nil, "return", nil) + buf.AddCommand("quit", nil, "return", nil) + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + ctx, cancelFN := context.WithTimeout(context.Background(), time.Second) + err := q.ExecuteSystemPowerdown(ctx) + cancelFN() + if err == nil { + t.Fatalf("Expected SystemPowerdown to fail") + } + err = q.ExecuteQuit(context.Background()) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + +// Checks that queued commands execute after an evented command is cancelled. +// +// This test is similar to the previous test with the exception that it +// tries to ensure that a second command is placed on the QMP structure's +// command queue before the evented command is cancelled. This allows us +// to test a slightly different use case. We start a QMPLoop, send the +// system_powerdown command. We do this by sending the command directly +// down the QMP.cmdCh rather than calling a higher level function as this +// allows us to ensure that we have another command queued before we +// timeout the first command. We then send a qmp_capabilities command and +// then we shutdown. +// +// The system_powerdown command should be correctly sent but should block +// waiting for the SHUTDOWN event and should be successfully cancelled. +// The query_capabilities command should be successfully received and the +// QMP loop should exit gracefully. +func TestQMPEventedCommandCancelConcurrent(t *testing.T) { + var wg sync.WaitGroup + connectedCh := make(chan *QMPVersion) + disconnectedCh := make(chan struct{}) + buf := newQMPTestCommandBuffer(t) + + buf.AddCommand("system_powerdown", nil, "error", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) + + cfg := QMPConfig{Logger: qmpTestLogger{}} + q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) + checkVersion(t, connectedCh) + buf.startEventLoop(&wg) + + resCh := make(chan qmpResult) + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + q.cmdCh <- qmpCommand{ + ctx: ctx, + res: resCh, + name: "system_powerdown", + filter: &qmpEventFilter{ + eventName: "SHUTDOWN", + }, + } + + var cmdWg sync.WaitGroup + cmdWg.Add(1) + go func() { + err := q.ExecuteQMPCapabilities(context.Background()) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + cmdWg.Done() + }() + + <-resCh + cancelFn() + cmdWg.Wait() + q.Shutdown() + <-disconnectedCh + wg.Wait() +} + // Checks that events can be received and parsed. // // Two events are provisioned and the QMPLoop is started with an valid eventCh. @@ -683,13 +780,13 @@ func TestQMPLostLoop(t *testing.T) { q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh) checkVersion(t, connectedCh) close(buf.forceFail) - buf.AddCommmand("qmp_capabilities", nil, "return", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) err := q.ExecuteQMPCapabilities(context.Background()) if err == nil { t.Error("Expected executeQMPCapabilities to fail") } <-disconnectedCh - buf.AddCommmand("qmp_capabilities", nil, "return", nil) + buf.AddCommand("qmp_capabilities", nil, "return", nil) err = q.ExecuteQMPCapabilities(context.Background()) if err == nil { t.Error("Expected executeQMPCapabilities to fail")