diff --git a/README.md b/README.md index 7d3c3a6..7932c76 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,8 @@ You can use any of the following methods to build. | [APPRISE_NOTIFICATION_TITLE](#apprise) | Apprise Notification Title . | None | | [APPRISE_NOTIFICATION_BODY](#apprise) | Apprise Notification Body . | None | | MAX_RESOURCE_WORKERS | Maximum workers for downloading resources | 3 | - +| MAX_PARALLEL_APPS | Maximum number of apps to process in parallel | 4 | +| DISABLE_CACHING | Disable download and resource caching | False | `*` - Can be overridden for individual app. ### App Level Config diff --git a/check_resource_updates.py b/check_resource_updates.py index 51aeb30..c8851be 100644 --- a/check_resource_updates.py +++ b/check_resource_updates.py @@ -1,5 +1,7 @@ """Check patching resource updates.""" +from threading import Lock + from environs import Env from loguru import logger @@ -16,12 +18,13 @@ def check_if_build_is_required() -> bool: config = RevancedConfig(env) needs_to_repatched = [] resource_cache: dict[str, tuple[str, str]] = {} + resource_lock = Lock() for app_name in env.list("PATCH_APPS", default_build): logger.info(f"Checking {app_name}") app_obj = get_app(config, app_name) old_patches_version = GitHubManager(env).get_last_version(app_obj, patches_version_key) old_patches_source = GitHubManager(env).get_last_version_source(app_obj, patches_dl_key) - app_obj.download_patch_resources(config, resource_cache) + app_obj.download_patch_resources(config, resource_cache, resource_lock) if GitHubManager(env).should_trigger_build( old_patches_version, old_patches_source, diff --git a/main.py b/main.py index 17b71b9..7f50bf7 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,9 @@ """Entry point.""" import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +from typing import Any from environs import Env from loguru import logger @@ -21,6 +24,53 @@ def get_app(config: RevancedConfig, app_name: str) -> APP: return APP(app_name=app_name, package_name=package_name, config=config) +def process_single_app( + app_name: str, + config: RevancedConfig, + caches: tuple[ + dict[tuple[str, str], tuple[str, str]], + dict[str, tuple[str, str]], + Lock, + Lock, + ], +) -> dict[str, Any]: + """Process a single app and return its update info.""" + download_cache, resource_cache, download_lock, resource_lock = caches + logger.info(f"Trying to build {app_name}") + try: + app = get_app(config, app_name) + + # Use shared resource cache with thread safety + app.download_patch_resources(config, resource_cache, resource_lock) + + patcher = Patches(config, app) + parser = Parser(patcher, config) + app_all_patches = patcher.get_app_configs(app) + + # Use shared APK cache with thread safety + app.download_apk_for_patching(config, download_cache, download_lock) + + parser.include_exclude_patch(app, app_all_patches, patcher.patches_dict) + logger.info(app) + app_update_info = save_patch_info(app, {}) + parser.patch_app(app) + except AppNotFoundError as e: + logger.info(e) + return {} + except PatchesJsonLoadError: + logger.exception("Patches.json not found") + return {} + except PatchingFailedError as e: + logger.exception(e) + return {} + except BuilderError as e: + logger.exception(f"Failed to build {app_name} because of {e}") + return {} + else: + logger.info(f"Successfully completed {app_name}") + return app_update_info + + def main() -> None: """Entry point.""" env = Env() @@ -35,37 +85,51 @@ def main() -> None: logger.info(f"Will Patch only {len(config.apps)} apps-:\n{config.apps}") - # Caches for reuse + # Shared caches for reuse across all apps (empty if caching disabled) download_cache: dict[tuple[str, str], tuple[str, str]] = {} resource_cache: dict[str, tuple[str, str]] = {} - for possible_app in config.apps: - logger.info(f"Trying to build {possible_app}") - try: - app = get_app(config, possible_app) + # Thread-safe locks for cache access + download_lock = Lock() + resource_lock = Lock() - # Use shared resource cache - app.download_patch_resources(config, resource_cache) + # Clear caches if caching is disabled + if config.disable_caching: + download_cache.clear() + resource_cache.clear() - patcher = Patches(config, app) - parser = Parser(patcher, config) - app_all_patches = patcher.get_app_configs(app) + # Determine optimal number of workers (don't exceed number of apps or CPU cores) + max_workers = min(len(config.apps), config.max_parallel_apps) - # Use shared APK cache - app.download_apk_for_patching(config, download_cache) + if len(config.apps) == 1 or config.ci_test: + # For single app or CI testing, use sequential processing + caches = (download_cache, resource_cache, download_lock, resource_lock) + for app_name in config.apps: + app_updates = process_single_app(app_name, config, caches) + updates_info.update(app_updates) + else: + # For multiple apps, use parallel processing + logger.info(f"Processing {len(config.apps)} apps in parallel with {max_workers} workers") - parser.include_exclude_patch(app, app_all_patches, patcher.patches_dict) - logger.info(app) - updates_info = save_patch_info(app, updates_info) - parser.patch_app(app) - except AppNotFoundError as e: - logger.info(e) - except PatchesJsonLoadError: - logger.exception("Patches.json not found") - except PatchingFailedError as e: - logger.exception(e) - except BuilderError as e: - logger.exception(f"Failed to build {possible_app} because of {e}") + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all app processing tasks + caches = (download_cache, resource_cache, download_lock, resource_lock) + future_to_app = { + executor.submit(process_single_app, app_name, config, caches): app_name for app_name in config.apps + } + + # Collect results as they complete + total_apps = len(config.apps) + + for completed_count, future in enumerate(as_completed(future_to_app), 1): + app_name = future_to_app[future] + try: + app_updates = future.result() + updates_info.update(app_updates) + logger.info(f"Progress: {completed_count}/{total_apps} apps completed ({app_name})") + except (AppNotFoundError, PatchesJsonLoadError, PatchingFailedError, BuilderError) as e: + logger.exception(f"Error processing {app_name}: {e}") + logger.info(f"Progress: {completed_count}/{total_apps} apps completed ({app_name} - FAILED)") write_changelog_to_file(updates_info) diff --git a/src/app.py b/src/app.py index 74c3596..55a9e9b 100644 --- a/src/app.py +++ b/src/app.py @@ -5,6 +5,7 @@ import hashlib import pathlib from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from threading import Lock from typing import Any, Self from loguru import logger @@ -61,6 +62,7 @@ class APP(object): self: Self, config: RevancedConfig, download_cache: dict[tuple[str, str], tuple[str, str]], + download_lock: Lock, ) -> None: """Download apk to be patched, skipping if already downloaded (matching source and version).""" from src.downloader.download import Downloader # noqa: PLC0415 @@ -81,16 +83,26 @@ class APP(object): cache_key = (self.download_source, self.app_version) - if cache_key in download_cache: - logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})") - self.download_file_name, self.download_dl = download_cache[cache_key] - return + # Thread-safe cache check and download + with download_lock: + if cache_key in download_cache: + logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})") + self.download_file_name, self.download_dl = download_cache[cache_key] + return - downloader = DownloaderFactory.create_downloader(config=config, apk_source=self.download_source) - self.download_file_name, self.download_dl = downloader.download(self.app_version, self) + # Check again after acquiring lock to handle race conditions + if cache_key in download_cache: + logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})") + self.download_file_name, self.download_dl = download_cache[cache_key] + return - # Save to cache using (source, version) tuple - download_cache[cache_key] = (self.download_file_name, self.download_dl) + logger.info(f"Cache miss for {self.app_name} ({self.app_version}). Proceeding with download.") + downloader = DownloaderFactory.create_downloader(config=config, apk_source=self.download_source) + self.download_file_name, self.download_dl = downloader.download(self.app_version, self) + + # Save to cache using (source, version) tuple + download_cache[cache_key] = (self.download_file_name, self.download_dl) + logger.info(f"Added {self.app_name} ({self.app_version}) to download cache.") def get_output_file_name(self: Self) -> str: """The function returns a string representing the output file name. @@ -211,6 +223,7 @@ class APP(object): self: Self, config: RevancedConfig, resource_cache: dict[str, tuple[str, str]], + resource_lock: Lock, ) -> None: """The function `download_patch_resources` downloads various resources req. for patching. @@ -220,6 +233,8 @@ class APP(object): The `config` parameter is an instance of the `RevancedConfig` class. It is used to provide configuration settings for the resource download tasks. resource_cache: dict[str, tuple[str, str]] + resource_lock: Lock + Thread lock for safe access to resource_cache """ logger.info("Downloading resources for patching.") @@ -229,28 +244,59 @@ class APP(object): (name, url, config, filter_pattern) for name, url, _, filter_pattern in base_tasks ] - with ThreadPoolExecutor(config.max_resource_workers) as executor: # Use configurable worker count - futures: dict[str, concurrent.futures.Future[tuple[str, str]]] = {} + # Track which resources need to be downloaded (outside of lock to minimize lock time) + resources_to_download: list[tuple[str, str, RevancedConfig, str]] = [] + # Thread-safe cache check + with resource_lock: for resource_name, raw_url, cfg, assets_filter in download_tasks: url = raw_url.strip() if url in resource_cache: logger.info(f"Skipping {resource_name} download, using cached resource: {url}") tag, file_name = resource_cache[url] self._handle_cached_resource(resource_name, tag, file_name) - continue + else: + resources_to_download.append((resource_name, url, cfg, assets_filter)) - futures[resource_name] = executor.submit(self.download, url, cfg, assets_filter) + # Download resources that are not cached (outside of lock for parallel downloads) + if resources_to_download: + with ThreadPoolExecutor(config.max_resource_workers) as executor: + futures: dict[str, concurrent.futures.Future[tuple[str, str]]] = {} - concurrent.futures.wait(futures.values()) + for resource_name, url, cfg, assets_filter in resources_to_download: + futures[resource_name] = executor.submit(self.download, url, cfg, assets_filter) - for resource_name, future in futures.items(): - try: - tag, file_name = future.result() - self._handle_downloaded_resource(resource_name, tag, file_name, download_tasks, resource_cache) - except BuilderError as e: - msg = f"Failed to download {resource_name} resource." - raise PatchingFailedError(msg) from e + concurrent.futures.wait(futures.values()) + + # Thread-safe cache update + with resource_lock: + for resource_name, future in futures.items(): + try: + tag, file_name = future.result() + # Double-check cache in case another thread already added it + corresponding_url = next( + url for name, url, _, _ in resources_to_download if name == resource_name + ) + if corresponding_url not in resource_cache: + self._handle_downloaded_resource( + resource_name, + tag, + file_name, + download_tasks, + resource_cache, + ) + logger.info(f"Added {resource_name} to resource cache: {corresponding_url}") + else: + logger.info( + f"Resource {resource_name} was already cached by another thread: " + f"{corresponding_url}", + ) + # Still need to handle the resource for this app instance + cached_tag, cached_file_name = resource_cache[corresponding_url] + self._handle_cached_resource(resource_name, cached_tag, cached_file_name) + except BuilderError as e: + msg = f"Failed to download {resource_name} resource." + raise PatchingFailedError(msg) from e @staticmethod def generate_filename(url: str) -> str: diff --git a/src/config.py b/src/config.py index b10087f..139fdec 100644 --- a/src/config.py +++ b/src/config.py @@ -32,3 +32,5 @@ class RevancedConfig(object): self.global_old_key = env.bool("GLOBAL_OLD_KEY", True) self.global_space_formatted = env.bool("GLOBAL_SPACE_FORMATTED_PATCHES", True) self.max_resource_workers = env.int("MAX_RESOURCE_WORKERS", 3) + self.max_parallel_apps = env.int("MAX_PARALLEL_APPS", 4) + self.disable_caching = env.bool("DISABLE_CACHING", False)