1
0
Fork 0

Catalogs: Rewrite of the craft zip creation using the Python zipfile module.

If a 'zip-excludes.lst' file is found in the craft's base directory, that will
overwrite the global catalog exclusion list.

Three new unit tests have been created to test the functionality.
This commit is contained in:
Edward d'Auvergne 2019-11-13 10:59:38 +01:00
parent fd6fae3858
commit b8c05410e3
11 changed files with 199 additions and 18 deletions

View file

@ -2,13 +2,16 @@
import argparse import argparse
import datetime import datetime
#import xml.etree.cElementTree as ET from fnmatch import fnmatch, translate
import lxml.etree as ET import lxml.etree as ET
import os import os
from os.path import exists, join, relpath
from os import walk
import re import re
import sgprops import sgprops
import sys import sys
import catalogTags import catalogTags
import zipfile
CATALOG_VERSION = 4 CATALOG_VERSION = 4
quiet = False quiet = False
@ -286,3 +289,107 @@ def make_aircraft_node(aircraftDirName, package, variants, downloadBase, mirrors
package_node.append(package['urls']._createXMLElement()) package_node.append(package['urls']._createXMLElement())
return package_node return package_node
def make_aircraft_zip(repo_path, craft_name, zip_file, global_zip_excludes, verbose=True):
"""Create a zip archive of the given aircraft."""
# Printout.
if verbose:
print("Zip file creation: %s.zip" % craft_name)
# Go to the directory of crafts to catalog.
savedir = os.getcwd()
os.chdir(repo_path)
# Clear out the old file.
if exists(zip_file):
os.remove(zip_file)
# Use the Python zipfile module to create the zip file.
zip_handle = zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED)
# Find a per-craft exclude list.
craft_path = join(repo_path, craft_name)
exclude_file = join(craft_path, 'zip-excludes.lst')
if exists(exclude_file):
if verbose:
print("Found the craft specific exclusion list '%s'" % exclude_file)
# Otherwise use the catalog default exclusion list.
else:
exclude_file = global_zip_excludes
# Process the exclusion list and find all matching file names.
blacklist = fetch_zip_exclude_list(craft_name, craft_path, exclude_file)
# Walk over all craft files.
print_format = " %-30s '%s'"
for root, dirs, files in walk(craft_path):
# Loop over the files.
for file in files:
# The directory and relative and absolute paths.
dir = relpath(root, start=repo_path)
full_path = join(root, file)
rel_path = relpath(full_path, start=repo_path)
# Skip blacklist files or directories.
skip = False
if file == 'zip-excludes.lst':
if verbose:
print(print_format % ("Skipping the file:", join(dir, 'zip-excludes.lst')))
skip = True
if dir in blacklist:
if verbose:
print(print_format % ("Skipping the file:", join(dir, file)))
skip = True
for name in blacklist:
if fnmatch(rel_path, name):
if verbose:
print(print_format % ("Skipping the file:", rel_path))
skip = True
break
if skip:
continue
# Otherwise add the file.
zip_handle.write(rel_path)
# Clean up.
os.chdir(savedir)
zip_handle.close()
def fetch_zip_exclude_list(name, path, exclude_path):
"""Use Unix style path regular expression to find all files to exclude."""
# Init.
blacklist = []
file = open(exclude_path)
exclude_list = file.readlines()
file.close()
old_path = os.getcwd()
os.chdir(path)
# Process each exclusion path or regular expression, converting to Python RE objects.
reobj_list = []
for i in range(len(exclude_list)):
reobj_list.append(re.compile(translate(exclude_list[i].strip())))
# Recursively loop over all files, finding the ones to exclude.
for root, dirs, files in walk(path):
for file in files:
full_path = join(root, file)
rel_path = join(name, relpath(full_path, start=path))
# Skip Unix shell-style wildcard matches
for i in range(len(reobj_list)):
if reobj_list[i].match(rel_path):
blacklist.append(rel_path)
break
# Return to the original path.
os.chdir(old_path)
# Return the list.
return blacklist

View file

@ -0,0 +1 @@
*.xcf*

View file

@ -0,0 +1 @@
12

View file

View file

@ -3,8 +3,13 @@
import unittest import unittest
import sgprops import sgprops
import os import os
from os.path import join
import catalog import catalog
import lxml.etree as ET import lxml.etree as ET
from shutil import rmtree
from tempfile import mkdtemp
import zipfile
catalog.quiet = True catalog.quiet = True
@ -234,5 +239,85 @@ class UpdateCatalogTests(unittest.TestCase):
self.assertFalse(parsedPkgNode.hasChild('minimum-fg-version')); self.assertFalse(parsedPkgNode.hasChild('minimum-fg-version'));
self.assertFalse(parsedPkgNode.hasChild('variant')); self.assertFalse(parsedPkgNode.hasChild('variant'));
class ZipTests(unittest.TestCase):
"""Specific craft zip file creation tests."""
def check_zip(self, file_name, expected_content=None):
"""General checks for the zip file."""
# Check for file existence.
self.assert_(os.access(file_name, os.F_OK))
# Check the contents.
file = zipfile.ZipFile(file_name)
zip_contents = file.namelist()
if len(zip_contents) != len(expected_content):
print("Zip contents:\n %s" % zip_contents)
print("Expected contents:\n %s" % expected_content)
self.assertEqual(len(zip_contents), len(expected_content))
for i in range(len(zip_contents)):
self.assertEqual(zip_contents[i], expected_content[i])
def setUp(self):
"""Common set up for these system tests."""
# Store the current directory.
self._cwd = os.getcwd()
# Create a temporary directory for dumping files.
self.tmpdir = mkdtemp()
def tearDown(self):
"""Delete temp files."""
# Force return to the correct directory.
os.chdir(self._cwd)
# Remove temporary file (if there is a deletion failure, continue to allow the test suite to survive).
try:
rmtree(self.tmpdir)
except:
pass
# Remove the variable.
del self.tmpdir
def test_zip_creation(self):
"""Test the creation of a basic craft zip archive."""
# Create a basic zip file.
name = "c172"
catalog.make_aircraft_zip(join(os.getcwd(), "testData/Aircraft"), name, join(self.tmpdir, name+'.zip'), join(os.getcwd(), 'fgaddon-catalog/zip-excludes.lst'), verbose=False)
# Checks.
self.check_zip(join(self.tmpdir, name+'.zip'), expected_content=['c172/c172-set.xml'])
def test_zip_exclusion_global(self):
"""Test file exclusion in a craft zip archive using the global catalog exclusion list."""
# Create a basic zip file.
name = "dc3"
catalog.make_aircraft_zip(join(os.getcwd(), "testData/Aircraft"), name, join(self.tmpdir, name+'.zip'), join(os.getcwd(), 'fgaddon-catalog/zip-excludes.lst'), verbose=False)
# Checks.
self.check_zip(join(self.tmpdir, name+'.zip'), expected_content=['dc3/dc3-set.xml'])
def test_zip_exclusion_local(self):
"""Test file exclusion in a craft zip archive using a local catalog exclusion list."""
# Create a basic zip file.
name = "c150"
catalog.make_aircraft_zip(join(os.getcwd(), "testData/Aircraft"), name, join(self.tmpdir, name+'.zip'), join(os.getcwd(), 'testData/Aircraft/c150/zip-excludes.lst'), verbose=False)
# Checks.
self.check_zip(join(self.tmpdir, name+'.zip'), expected_content=['c150/c150-set.xml', 'c150/Resources/crazy_20Gb_file'])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -13,6 +13,8 @@ import sgprops
import sys import sys
import catalogTags import catalogTags
import catalog import catalog
from catalog import make_aircraft_node, make_aircraft_zip
CATALOG_VERSION = 4 CATALOG_VERSION = 4
@ -66,21 +68,6 @@ def scan_dir_for_change_date_mtime(path):
maxsec = mtime maxsec = mtime
return maxsec return maxsec
def make_aircraft_zip(repo_path, name, zip_file):
if (not args.quiet):
print "Updating:", name + '.zip'
savedir = os.getcwd()
os.chdir(repo_path)
if os.path.exists(zip_file):
os.remove(zip_file)
command = ['zip', '-rq', '-9']
if os.path.exists(zip_excludes):
command += ['-x@' + zip_excludes]
else:
print "warning: no zip-excludes.lst file provided", zip_excludes
command += [zip_file, name]
subprocess.call(command)
os.chdir(savedir)
def get_md5sum(file): def get_md5sum(file):
f = open(file, 'r') f = open(file, 'r')
@ -148,7 +135,7 @@ def process_aircraft_dir(name, repo_path):
if not args.quiet: if not args.quiet:
print "%s:" % name, print "%s:" % name,
package_node = catalog.make_aircraft_node(name, package, variants, download_base, mirrors) package_node = make_aircraft_node(name, package, variants, download_base, mirrors)
download_url = download_base + name + '.zip' download_url = download_base + name + '.zip'
if 'thumbnail' in package: if 'thumbnail' in package:
@ -177,7 +164,7 @@ def process_aircraft_dir(name, repo_path):
# rebuild zip file # rebuild zip file
if not args.quiet: if not args.quiet:
print "updating:", zipfile print "updating:", zipfile
make_aircraft_zip(repo_path, name, zipfile) make_aircraft_zip(repo_path, name, zipfile, zip_excludes, verbose=not args.quiet)
md5sum = get_md5sum(zipfile) md5sum = get_md5sum(zipfile)
else: else:
if not args.quiet: if not args.quiet: