Extensions: add support for installing extensions from local files

This commit is contained in:
Campbell Barton
2024-02-04 18:04:14 +11:00
parent 8fecb4da2c
commit be647aa561
4 changed files with 428 additions and 64 deletions

View File

@ -389,6 +389,7 @@ def pkg_manifest_archive_from_dict_and_validate(
if isinstance(manifest, str):
return manifest
assert isinstance(manifest, PkgManifest)
return PkgManifest_Archive(
manifest=manifest,
# TODO: check these ID's exist.
@ -444,6 +445,14 @@ def pkg_manifest_from_tarfile_and_validate(
return pkg_manifest_from_dict_and_validate(pkg_idname, manifest_dict, from_repo=False)
def pkg_manifest_from_archive_and_validate(
filepath: str,
) -> Union[PkgManifest, str]:
# TODO: handle errors.
with tarfile.open(filepath, mode="r:xz") as tar_fh:
return pkg_manifest_from_tarfile_and_validate(tar_fh)
def remote_url_from_repo_url(url: str) -> str:
if REMOTE_REPO_HAS_JSON_IMPLIED:
return url
@ -864,6 +873,7 @@ def toml_from_filepath(filepath: str) -> Optional[Dict[str, Any]]:
def extract_metadata_from_archive(filepath: str) -> Optional[Dict[str, Any]]:
# TODO: error handling, corrupt archive or invalid TOML.
with tarfile.open(filepath, "r:xz") as tar_fh:
try:
file_content = tar_fh.extractfile(PKG_MANIFEST_FILENAME_TOML)
@ -1053,6 +1063,17 @@ def generic_arg_package_list_positional(subparse: argparse.ArgumentParser) -> No
)
def generic_arg_file_list_positional(subparse: argparse.ArgumentParser) -> None:
subparse.add_argument(
dest="files",
type=str,
nargs="+",
help=(
"The files to operate on (one or more arguments)."
),
)
def generic_arg_repo_dir(subparse: argparse.ArgumentParser) -> None:
subparse.add_argument(
"--repo-dir",
@ -1322,6 +1343,116 @@ class subcmd_client:
)
return success
@staticmethod
def _install_package_from_file_impl(
msg_fn: MessageFn,
*,
local_dir: str,
filepath_archive: str,
manifest_compare: Optional[PkgManifest],
) -> bool:
# Implement installing a package to a repository.
# Used for installing from local cache as well as installing a local package from a file.
# Remove `filepath_local_pkg_temp` if this block exits.
directories_to_clean: List[str] = []
with CleanupPathsContext(files=(), directories=directories_to_clean):
with tarfile.open(filepath_archive, mode="r:xz") as tar_fh:
manifest = pkg_manifest_from_tarfile_and_validate(tar_fh)
if isinstance(manifest, str):
message_warn(
msg_fn,
"Error loading manifest from: {:s}".format(manifest),
)
return False
if manifest_compare is not None:
# The archive ID name must match the server name,
# otherwise the package will install but not be able to collate
# the installed package with the remote ID.
if manifest_compare.id != manifest.id:
message_warn(
msg_fn,
"Package ID mismatch (remote: \"{:s}\", archive: \"{:s}\")".format(
manifest_compare.id,
manifest.id,
)
)
return False
if manifest_compare.version != manifest.version:
message_warn(
msg_fn,
"Package version mismatch (remote: \"{:s}\", archive: \"{:s}\")".format(
manifest_compare.version,
manifest.version,
)
)
return False
# We have the cache, extract it to a directory.
# This will be a directory.
filepath_local_pkg = os.path.join(local_dir, manifest.id)
# First extract into a temporary directory, validate the package is not corrupt,
# then move the package to it's expected location.
filepath_local_pkg_temp = filepath_local_pkg + "@"
# It's unlikely this exist, nevertheless if it does - it must be removed.
if os.path.isdir(filepath_local_pkg_temp):
shutil.rmtree(filepath_local_pkg_temp)
directories_to_clean.append(filepath_local_pkg_temp)
try:
tar_fh.extractall(filepath_local_pkg_temp)
except BaseException as ex:
message_warn(
msg_fn,
"Failed to extract files for \"{:s}\": {:s}".format(manifest.id, str(ex)),
)
return False
is_reinstall = False
if os.path.isdir(filepath_local_pkg):
shutil.rmtree(filepath_local_pkg)
is_reinstall = True
os.rename(filepath_local_pkg_temp, filepath_local_pkg)
directories_to_clean.remove(filepath_local_pkg_temp)
if is_reinstall:
message_status(msg_fn, "Re-Installed \"{:s}\"".format(manifest.id))
else:
message_status(msg_fn, "Installed \"{:s}\"".format(manifest.id))
return True
@staticmethod
def install_packages_from_files(
msg_fn: MessageFn,
*,
local_dir: str,
package_files: Sequence[str],
) -> bool:
if not os.path.exists(local_dir):
message_error(msg_fn, "destination directory \"{:s}\" does not exist".format(local_dir))
return False
# This is a simple file extraction, the main difference is that it validates the manifest before installing.
directories_to_clean: List[str] = []
with CleanupPathsContext(files=(), directories=directories_to_clean):
for filepath_archive in package_files:
if not subcmd_client._install_package_from_file_impl(
msg_fn,
local_dir=local_dir,
filepath_archive=filepath_archive,
# There is no manifest from the repository, leave this unset.
manifest_compare=None,
):
# The package failed to install.
continue
return True
@staticmethod
def install_packages(
msg_fn: MessageFn,
@ -1490,69 +1621,16 @@ class subcmd_client:
# All packages have been downloaded, install them.
for manifest_archive in packages_info:
pkg_idname = manifest_archive.manifest.id
pkg_version = manifest_archive.manifest.version
filepath_local_cache_archive = os.path.join(local_cache_dir, manifest_archive.manifest.id + PKG_EXT)
# We have the cache, extract it to a directory.
# This will be a directory.
filepath_local_pkg = os.path.join(local_dir, pkg_idname)
filepath_local_cache_archive = os.path.join(local_cache_dir, pkg_idname + PKG_EXT)
# First extract into a temporary directory, validate the package is not corrupt,
# then move the package to it's expected location.
filepath_local_pkg_temp = filepath_local_pkg + "@"
# It's unlikely this exist, nevertheless if it does - it must be removed.
if os.path.isdir(filepath_local_pkg_temp):
shutil.rmtree(filepath_local_pkg_temp)
# Remove `filepath_local_pkg_temp` if this block exits or continues.
with CleanupPathsContext(files=(), directories=(filepath_local_pkg_temp,)):
with tarfile.open(filepath_local_cache_archive, mode="r:xz") as tar_fh:
manifest_from_archive = pkg_manifest_from_tarfile_and_validate(tar_fh)
if isinstance(manifest_from_archive, str):
message_warn(
msg_fn,
"Error loading manifest for {:s}: {:s}".format(pkg_idname, manifest_from_archive),
)
continue
# The archive ID name must match the server name,
# otherwise the package will install but not be able to collate
# the installed package with the remote ID.
if pkg_idname != manifest_from_archive.id:
message_warn(
msg_fn,
"Package ID mismatch (remote: \"{:s}\", archive: \"{:s}\")".format(
pkg_idname,
manifest_from_archive.id,
)
)
continue
if pkg_version != manifest_from_archive.version:
message_warn(
msg_fn,
"Package version mismatch (remote: \"{:s}\", archive: \"{:s}\")".format(
pkg_version,
manifest_from_archive.version,
)
)
continue
tar_fh.extractall(filepath_local_pkg_temp)
is_reinstall = False
if os.path.isdir(filepath_local_pkg):
shutil.rmtree(filepath_local_pkg)
is_reinstall = True
os.rename(filepath_local_pkg_temp, filepath_local_pkg)
# TODO: think of how to handle messages & progress.
if is_reinstall:
message_status(msg_fn, "Re-Installed \"{:s}\"".format(pkg_idname))
else:
message_status(msg_fn, "Installed \"{:s}\"".format(pkg_idname))
if not subcmd_client._install_package_from_file_impl(
msg_fn,
local_dir=local_dir,
filepath_archive=filepath_local_cache_archive,
manifest_compare=manifest_archive.manifest,
):
# The package failed to install.
continue
return True
@ -1889,6 +1967,27 @@ def argparse_create_client_sync(subparsers: "argparse._SubParsersAction[argparse
)
def argparse_create_client_install_files(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
subparse = subparsers.add_parser(
"install-files",
help="Install package from the file-system.",
description="Install packages from the file-system.",
formatter_class=argparse.RawTextHelpFormatter,
)
generic_arg_file_list_positional(subparse)
generic_arg_local_dir(subparse)
generic_arg_output_type(subparse)
subparse.set_defaults(
func=lambda args: subcmd_client.install_packages_from_files(
msg_fn_from_args(args),
local_dir=args.local_dir,
package_files=args.files,
),
)
def argparse_create_client_install(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
subparse = subparsers.add_parser(
"install",
@ -2062,6 +2161,7 @@ def argparse_create() -> argparse.ArgumentParser:
# Manipulating Actions.
argparse_create_client_sync(subparsers)
argparse_create_client_install_files(subparsers)
argparse_create_client_install(subparsers)
argparse_create_client_uninstall(subparsers)