qemu/tests/functional/qemu_test/uncompress.py
Daniel P. Berrangé 9813958892 tests/functional: stop output from zstd command when uncompressing
The zstd command will print incremental decompression progress to stderr
when running. Fortunately it is not on stdout as that would confuse the
TAP parsing, but we should still not have this printed. By switching
from 'check_call' to 'run' with the check=True and capture_output=True
we'll get the desired silence on success, and on failure the raised
exception will automatically include stdout/stderr data for diagnosis
purposes.

Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
Reviewed-by: Thomas Huth <thuth@redhat.com>
Message-ID: <20250228102738.3064045-8-berrange@redhat.com>
Signed-off-by: Thomas Huth <thuth@redhat.com>
2025-03-07 07:50:19 +01:00

107 lines
3 KiB
Python

# SPDX-License-Identifier: GPL-2.0-or-later
#
# Utilities for python-based QEMU tests
#
# Copyright 2024 Red Hat, Inc.
#
# Authors:
# Thomas Huth <thuth@redhat.com>
import gzip
import lzma
import os
import stat
import shutil
from urllib.parse import urlparse
from subprocess import run, CalledProcessError, DEVNULL
from .asset import Asset
def gzip_uncompress(gz_path, output_path):
if os.path.exists(output_path):
return
with gzip.open(gz_path, 'rb') as gz_in:
try:
with open(output_path, 'wb') as raw_out:
shutil.copyfileobj(gz_in, raw_out)
except:
os.remove(output_path)
raise
def lzma_uncompress(xz_path, output_path):
if os.path.exists(output_path):
return
with lzma.open(xz_path, 'rb') as lzma_in:
try:
with open(output_path, 'wb') as raw_out:
shutil.copyfileobj(lzma_in, raw_out)
except:
os.remove(output_path)
raise
def zstd_uncompress(zstd_path, output_path):
if os.path.exists(output_path):
return
try:
run(['zstd', "-f", "-d", zstd_path,
"-o", output_path], capture_output=True, check=True)
except CalledProcessError as e:
os.remove(output_path)
raise Exception(
f"Unable to decompress zstd file {zstd_path} with {e}") from e
# zstd copies source archive permissions for the output
# file, so must make this writable for QEMU
os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
'''
@params compressed: filename, Asset, or file-like object to uncompress
@params uncompressed: filename to uncompress into
@params format: optional compression format (gzip, lzma)
Uncompresses @compressed into @uncompressed
If @format is None, heuristics will be applied to guess the format
from the filename or Asset URL. @format must be non-None if @uncompressed
is a file-like object.
Returns the fully qualified path to the uncompessed file
'''
def uncompress(compressed, uncompressed, format=None):
if format is None:
format = guess_uncompress_format(compressed)
if format == "xz":
lzma_uncompress(str(compressed), uncompressed)
elif format == "gz":
gzip_uncompress(str(compressed), uncompressed)
elif format == "zstd":
zstd_uncompress(str(compressed), uncompressed)
else:
raise Exception(f"Unknown compression format {format}")
'''
@params compressed: filename, Asset, or file-like object to guess
Guess the format of @compressed, raising an exception if
no format can be determined
'''
def guess_uncompress_format(compressed):
if type(compressed) == Asset:
compressed = urlparse(compressed.url).path
elif type(compressed) != str:
raise Exception(f"Unable to guess compression cformat for {compressed}")
(name, ext) = os.path.splitext(compressed)
if ext == ".xz":
return "xz"
elif ext == ".gz":
return "gz"
elif ext in [".zstd", ".zst"]:
return 'zstd'
else:
raise Exception(f"Unknown compression format for {compressed}")