tests/functional: replace 'run_cmd' with subprocess helpers

The 'run_cmd' helper is re-implementing a convenient helper that
already exists in the form of the 'run' and 'check_call' methods
provided by 'subprocess'.

Reviewed-by: Thomas Huth <thuth@redhat.com>
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
Message-ID: <20241217155953.3950506-29-berrange@redhat.com>
Signed-off-by: Thomas Huth <thuth@redhat.com>
This commit is contained in:
Daniel P. Berrangé 2024-12-17 15:59:49 +00:00 committed by Thomas Huth
parent 3bb4c8b613
commit 37e9b19c34
6 changed files with 41 additions and 30 deletions

View file

@ -14,7 +14,6 @@ from urllib.parse import urlparse
import zipfile import zipfile
from .asset import Asset from .asset import Asset
from .cmd import run_cmd
def tar_extract(archive, dest_dir, member=None): def tar_extract(archive, dest_dir, member=None):
@ -52,9 +51,11 @@ def deb_extract(archive, dest_dir, member=None):
cwd = os.getcwd() cwd = os.getcwd()
os.chdir(dest_dir) os.chdir(dest_dir)
try: try:
(stdout, stderr, ret) = run_cmd(['ar', 't', archive]) proc = run(['ar', 't', archive],
file_path = stdout.split()[2] check=True, capture_output=True, encoding='utf8')
run_cmd(['ar', 'x', archive, file_path]) file_path = proc.stdout.split()[2]
check_call(['ar', 'x', archive, file_path],
stdout=DEVNULL, stderr=DEVNULL)
tar_extract(file_path, dest_dir, member) tar_extract(file_path, dest_dir, member)
finally: finally:
os.chdir(cwd) os.chdir(cwd)

View file

@ -6,18 +6,18 @@
# later. See the COPYING file in the top-level directory. # later. See the COPYING file in the top-level directory.
import logging import logging
from subprocess import run
from . import run_cmd
def tesseract_ocr(image_path, tesseract_args=''): def tesseract_ocr(image_path, tesseract_args=''):
console_logger = logging.getLogger('console') console_logger = logging.getLogger('console')
console_logger.debug(image_path) console_logger.debug(image_path)
(stdout, stderr, ret) = run_cmd(['tesseract', image_path, proc = run(['tesseract', image_path, 'stdout'],
'stdout']) capture_output=True, encoding='utf8')
if ret: if proc.returncode:
return None return None
lines = [] lines = []
for line in stdout.split('\n'): for line in proc.stdout.split('\n'):
sline = line.strip() sline = line.strip()
if len(sline): if len(sline):
console_logger.debug(sline) console_logger.debug(sline)

View file

@ -16,7 +16,7 @@ import os
from pathlib import Path from pathlib import Path
import pycotap import pycotap
import shutil import shutil
import subprocess from subprocess import run
import sys import sys
import tempfile import tempfile
import unittest import unittest
@ -27,7 +27,6 @@ from qemu.utils import kvm_available, tcg_available
from .archive import archive_extract from .archive import archive_extract
from .asset import Asset from .asset import Asset
from .cmd import run_cmd
from .config import BUILD_DIR from .config import BUILD_DIR
from .uncompress import uncompress from .uncompress import uncompress
@ -251,11 +250,11 @@ class QemuUserTest(QemuBaseTest):
self._ldpath.append(os.path.abspath(ldpath)) self._ldpath.append(os.path.abspath(ldpath))
def run_cmd(self, bin_path, args=[]): def run_cmd(self, bin_path, args=[]):
return subprocess.run([self.qemu_bin] return run([self.qemu_bin]
+ ["-L %s" % ldpath for ldpath in self._ldpath] + ["-L %s" % ldpath for ldpath in self._ldpath]
+ [bin_path] + [bin_path]
+ args, + args,
text=True, capture_output=True) text=True, capture_output=True)
class QemuSystemTest(QemuBaseTest): class QemuSystemTest(QemuBaseTest):
"""Facilitates system emulation tests.""" """Facilitates system emulation tests."""
@ -282,7 +281,9 @@ class QemuSystemTest(QemuBaseTest):
def set_machine(self, machinename): def set_machine(self, machinename):
# TODO: We should use QMP to get the list of available machines # TODO: We should use QMP to get the list of available machines
if not self._machinehelp: if not self._machinehelp:
self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; self._machinehelp = run(
[self.qemu_bin, '-M', 'help'],
capture_output=True, check=True, encoding='utf8').stdout
if self._machinehelp.find(machinename) < 0: if self._machinehelp.find(machinename) < 0:
self.skipTest('no support for machine ' + machinename) self.skipTest('no support for machine ' + machinename)
self.machine = machinename self.machine = machinename
@ -310,15 +311,17 @@ class QemuSystemTest(QemuBaseTest):
"available" % accelerator) "available" % accelerator)
def require_netdev(self, netdevname): def require_netdev(self, netdevname):
netdevhelp = run_cmd([self.qemu_bin, help = run([self.qemu_bin,
'-M', 'none', '-netdev', 'help'])[0]; '-M', 'none', '-netdev', 'help'],
if netdevhelp.find('\n' + netdevname + '\n') < 0: capture_output=True, check=True, encoding='utf8').stdout;
if help.find('\n' + netdevname + '\n') < 0:
self.skipTest('no support for " + netdevname + " networking') self.skipTest('no support for " + netdevname + " networking')
def require_device(self, devicename): def require_device(self, devicename):
devhelp = run_cmd([self.qemu_bin, help = run([self.qemu_bin,
'-M', 'none', '-device', 'help'])[0]; '-M', 'none', '-device', 'help'],
if devhelp.find(devicename) < 0: capture_output=True, check=True, encoding='utf8').stdout;
if help.find(devicename) < 0:
self.skipTest('no support for device ' + devicename) self.skipTest('no support for device ' + devicename)
def _new_vm(self, name, *args): def _new_vm(self, name, *args):

View file

@ -11,11 +11,12 @@
import os import os
import stat import stat
from subprocess import check_call, DEVNULL
from qemu_test import QemuSystemTest from qemu_test import QemuSystemTest
from qemu_test import exec_command_and_wait_for_pattern from qemu_test import exec_command_and_wait_for_pattern
from qemu_test import wait_for_console_pattern from qemu_test import wait_for_console_pattern
from qemu_test import which, run_cmd, get_qemu_img from qemu_test import which, get_qemu_img
class TuxRunBaselineTest(QemuSystemTest): class TuxRunBaselineTest(QemuSystemTest):
@ -76,8 +77,9 @@ class TuxRunBaselineTest(QemuSystemTest):
disk_image = self.scratch_file("rootfs.ext4") disk_image = self.scratch_file("rootfs.ext4")
run_cmd(['zstd', "-f", "-d", disk_image_zst, check_call(['zstd', "-f", "-d", disk_image_zst,
"-o", disk_image]) "-o", disk_image],
stdout=DEVNULL, stderr=DEVNULL)
# zstd copies source archive permissions for the output # zstd copies source archive permissions for the output
# file, so must make this writable for QEMU # file, so must make this writable for QEMU
os.chmod(disk_image, stat.S_IRUSR | stat.S_IWUSR) os.chmod(disk_image, stat.S_IRUSR | stat.S_IWUSR)

View file

@ -12,10 +12,11 @@
import time import time
import logging import logging
from subprocess import check_call, DEVNULL
from qemu_test import QemuSystemTest, Asset from qemu_test import QemuSystemTest, Asset
from qemu_test import exec_command, wait_for_console_pattern from qemu_test import exec_command, wait_for_console_pattern
from qemu_test import get_qemu_img, run_cmd from qemu_test import get_qemu_img
class Aarch64VirtMachine(QemuSystemTest): class Aarch64VirtMachine(QemuSystemTest):
@ -96,7 +97,8 @@ class Aarch64VirtMachine(QemuSystemTest):
logger.info('creating scratch qcow2 image') logger.info('creating scratch qcow2 image')
image_path = self.scratch_file('scratch.qcow2') image_path = self.scratch_file('scratch.qcow2')
qemu_img = get_qemu_img(self) qemu_img = get_qemu_img(self)
run_cmd([qemu_img, 'create', '-f', 'qcow2', image_path, '8M']) check_call([qemu_img, 'create', '-f', 'qcow2', image_path, '8M'],
stdout=DEVNULL, stderr=DEVNULL)
# Add the device # Add the device
self.vm.add_args('-blockdev', self.vm.add_args('-blockdev',

View file

@ -11,9 +11,10 @@
# #
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
from subprocess import check_call, DEVNULL
import tempfile import tempfile
from qemu_test import run_cmd, Asset from qemu_test import Asset
from qemu_test.tuxruntest import TuxRunBaselineTest from qemu_test.tuxruntest import TuxRunBaselineTest
class TuxRunPPC64Test(TuxRunBaselineTest): class TuxRunPPC64Test(TuxRunBaselineTest):
@ -70,7 +71,9 @@ class TuxRunPPC64Test(TuxRunBaselineTest):
# Create a temporary qcow2 and launch the test-case # Create a temporary qcow2 and launch the test-case
with tempfile.NamedTemporaryFile(prefix=prefix, with tempfile.NamedTemporaryFile(prefix=prefix,
suffix='.qcow2') as qcow2: suffix='.qcow2') as qcow2:
run_cmd([self.qemu_img, 'create', '-f', 'qcow2', qcow2.name, ' 1G']) check_call([self.qemu_img, 'create', '-f', 'qcow2',
qcow2.name, ' 1G'],
stdout=DEVNULL, stderr=DEVNULL)
self.vm.add_args('-drive', 'file=' + qcow2.name + self.vm.add_args('-drive', 'file=' + qcow2.name +
',format=qcow2,if=none,id=' ',format=qcow2,if=none,id='