tests/functional: Allow asset downloading with concurrent threads

When running "make -j$(nproc) check-functional", tests that use the
same asset might be running in parallel. Improve the downloading to
detect this situation and wait for the other thread to finish the
download.

Message-ID: <20240830133841.142644-17-thuth@redhat.com>
Signed-off-by: Thomas Huth <thuth@redhat.com>
This commit is contained in:
Thomas Huth 2024-08-30 15:38:10 +02:00
parent f57213f85b
commit 34b17c0a65

View file

@ -12,6 +12,7 @@ import subprocess
import sys import sys
import unittest import unittest
import urllib.request import urllib.request
from time import sleep
from pathlib import Path from pathlib import Path
from shutil import copyfileobj from shutil import copyfileobj
@ -55,6 +56,35 @@ class Asset:
def valid(self): def valid(self):
return self.cache_file.exists() and self._check(self.cache_file) return self.cache_file.exists() and self._check(self.cache_file)
def _wait_for_other_download(self, tmp_cache_file):
# Another thread already seems to download the asset, so wait until
# it is done, while also checking the size to see whether it is stuck
try:
current_size = tmp_cache_file.stat().st_size
new_size = current_size
except:
if os.path.exists(self.cache_file):
return True
raise
waittime = lastchange = 600
while waittime > 0:
sleep(1)
waittime -= 1
try:
new_size = tmp_cache_file.stat().st_size
except:
if os.path.exists(self.cache_file):
return True
raise
if new_size != current_size:
lastchange = waittime
current_size = new_size
elif lastchange - waittime > 90:
return False
self.log.debug("Time out while waiting for %s!", tmp_cache_file)
raise
def fetch(self): def fetch(self):
if not self.cache_dir.exists(): if not self.cache_dir.exists():
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
@ -70,18 +100,28 @@ class Asset:
self.log.info("Downloading %s to %s...", self.url, self.cache_file) self.log.info("Downloading %s to %s...", self.url, self.cache_file)
tmp_cache_file = self.cache_file.with_suffix(".download") tmp_cache_file = self.cache_file.with_suffix(".download")
try: for retries in range(3):
resp = urllib.request.urlopen(self.url) try:
except Exception as e: with tmp_cache_file.open("xb") as dst:
self.log.error("Unable to download %s: %s", self.url, e) with urllib.request.urlopen(self.url) as resp:
raise copyfileobj(resp, dst)
break
except FileExistsError:
self.log.debug("%s already exists, "
"waiting for other thread to finish...",
tmp_cache_file)
if self._wait_for_other_download(tmp_cache_file):
return str(self.cache_file)
self.log.debug("%s seems to be stale, "
"deleting and retrying download...",
tmp_cache_file)
tmp_cache_file.unlink()
continue
except Exception as e:
self.log.error("Unable to download %s: %s", self.url, e)
tmp_cache_file.unlink()
raise
try:
with tmp_cache_file.open("wb+") as dst:
copyfileobj(resp, dst)
except:
tmp_cache_file.unlink()
raise
try: try:
# Set these just for informational purposes # Set these just for informational purposes
os.setxattr(str(tmp_cache_file), "user.qemu-asset-url", os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",