Improvements

This commit is contained in:
Nikhil Badyal
2025-06-19 23:07:17 +05:30
committed by Nikhil Badyal
parent c436c4bc98
commit 3fdb5b247a
5 changed files with 162 additions and 46 deletions
+2 -1
View File
@@ -131,7 +131,8 @@ You can use any of the following methods to build.
| [APPRISE_NOTIFICATION_TITLE](#apprise) | Apprise Notification Title . | None | | [APPRISE_NOTIFICATION_TITLE](#apprise) | Apprise Notification Title . | None |
| [APPRISE_NOTIFICATION_BODY](#apprise) | Apprise Notification Body . | None | | [APPRISE_NOTIFICATION_BODY](#apprise) | Apprise Notification Body . | None |
| MAX_RESOURCE_WORKERS | Maximum workers for downloading resources | 3 | | MAX_RESOURCE_WORKERS | Maximum workers for downloading resources | 3 |
| MAX_PARALLEL_APPS | Maximum number of apps to process in parallel | 4 |
| DISABLE_CACHING | Disable download and resource caching | False |
`*` - Can be overridden for individual app. `*` - Can be overridden for individual app.
### App Level Config ### App Level Config
+4 -1
View File
@@ -1,5 +1,7 @@
"""Check patching resource updates.""" """Check patching resource updates."""
from threading import Lock
from environs import Env from environs import Env
from loguru import logger from loguru import logger
@@ -16,12 +18,13 @@ def check_if_build_is_required() -> bool:
config = RevancedConfig(env) config = RevancedConfig(env)
needs_to_repatched = [] needs_to_repatched = []
resource_cache: dict[str, tuple[str, str]] = {} resource_cache: dict[str, tuple[str, str]] = {}
resource_lock = Lock()
for app_name in env.list("PATCH_APPS", default_build): for app_name in env.list("PATCH_APPS", default_build):
logger.info(f"Checking {app_name}") logger.info(f"Checking {app_name}")
app_obj = get_app(config, app_name) app_obj = get_app(config, app_name)
old_patches_version = GitHubManager(env).get_last_version(app_obj, patches_version_key) old_patches_version = GitHubManager(env).get_last_version(app_obj, patches_version_key)
old_patches_source = GitHubManager(env).get_last_version_source(app_obj, patches_dl_key) old_patches_source = GitHubManager(env).get_last_version_source(app_obj, patches_dl_key)
app_obj.download_patch_resources(config, resource_cache) app_obj.download_patch_resources(config, resource_cache, resource_lock)
if GitHubManager(env).should_trigger_build( if GitHubManager(env).should_trigger_build(
old_patches_version, old_patches_version,
old_patches_source, old_patches_source,
+88 -24
View File
@@ -1,6 +1,9 @@
"""Entry point.""" """Entry point."""
import sys import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from typing import Any
from environs import Env from environs import Env
from loguru import logger from loguru import logger
@@ -21,6 +24,53 @@ def get_app(config: RevancedConfig, app_name: str) -> APP:
return APP(app_name=app_name, package_name=package_name, config=config) return APP(app_name=app_name, package_name=package_name, config=config)
def process_single_app(
app_name: str,
config: RevancedConfig,
caches: tuple[
dict[tuple[str, str], tuple[str, str]],
dict[str, tuple[str, str]],
Lock,
Lock,
],
) -> dict[str, Any]:
"""Process a single app and return its update info."""
download_cache, resource_cache, download_lock, resource_lock = caches
logger.info(f"Trying to build {app_name}")
try:
app = get_app(config, app_name)
# Use shared resource cache with thread safety
app.download_patch_resources(config, resource_cache, resource_lock)
patcher = Patches(config, app)
parser = Parser(patcher, config)
app_all_patches = patcher.get_app_configs(app)
# Use shared APK cache with thread safety
app.download_apk_for_patching(config, download_cache, download_lock)
parser.include_exclude_patch(app, app_all_patches, patcher.patches_dict)
logger.info(app)
app_update_info = save_patch_info(app, {})
parser.patch_app(app)
except AppNotFoundError as e:
logger.info(e)
return {}
except PatchesJsonLoadError:
logger.exception("Patches.json not found")
return {}
except PatchingFailedError as e:
logger.exception(e)
return {}
except BuilderError as e:
logger.exception(f"Failed to build {app_name} because of {e}")
return {}
else:
logger.info(f"Successfully completed {app_name}")
return app_update_info
def main() -> None: def main() -> None:
"""Entry point.""" """Entry point."""
env = Env() env = Env()
@@ -35,37 +85,51 @@ def main() -> None:
logger.info(f"Will Patch only {len(config.apps)} apps-:\n{config.apps}") logger.info(f"Will Patch only {len(config.apps)} apps-:\n{config.apps}")
# Caches for reuse # Shared caches for reuse across all apps (empty if caching disabled)
download_cache: dict[tuple[str, str], tuple[str, str]] = {} download_cache: dict[tuple[str, str], tuple[str, str]] = {}
resource_cache: dict[str, tuple[str, str]] = {} resource_cache: dict[str, tuple[str, str]] = {}
for possible_app in config.apps: # Thread-safe locks for cache access
logger.info(f"Trying to build {possible_app}") download_lock = Lock()
try: resource_lock = Lock()
app = get_app(config, possible_app)
# Use shared resource cache # Clear caches if caching is disabled
app.download_patch_resources(config, resource_cache) if config.disable_caching:
download_cache.clear()
resource_cache.clear()
patcher = Patches(config, app) # Determine optimal number of workers (don't exceed number of apps or CPU cores)
parser = Parser(patcher, config) max_workers = min(len(config.apps), config.max_parallel_apps)
app_all_patches = patcher.get_app_configs(app)
# Use shared APK cache if len(config.apps) == 1 or config.ci_test:
app.download_apk_for_patching(config, download_cache) # For single app or CI testing, use sequential processing
caches = (download_cache, resource_cache, download_lock, resource_lock)
for app_name in config.apps:
app_updates = process_single_app(app_name, config, caches)
updates_info.update(app_updates)
else:
# For multiple apps, use parallel processing
logger.info(f"Processing {len(config.apps)} apps in parallel with {max_workers} workers")
parser.include_exclude_patch(app, app_all_patches, patcher.patches_dict) with ThreadPoolExecutor(max_workers=max_workers) as executor:
logger.info(app) # Submit all app processing tasks
updates_info = save_patch_info(app, updates_info) caches = (download_cache, resource_cache, download_lock, resource_lock)
parser.patch_app(app) future_to_app = {
except AppNotFoundError as e: executor.submit(process_single_app, app_name, config, caches): app_name for app_name in config.apps
logger.info(e) }
except PatchesJsonLoadError:
logger.exception("Patches.json not found") # Collect results as they complete
except PatchingFailedError as e: total_apps = len(config.apps)
logger.exception(e)
except BuilderError as e: for completed_count, future in enumerate(as_completed(future_to_app), 1):
logger.exception(f"Failed to build {possible_app} because of {e}") app_name = future_to_app[future]
try:
app_updates = future.result()
updates_info.update(app_updates)
logger.info(f"Progress: {completed_count}/{total_apps} apps completed ({app_name})")
except (AppNotFoundError, PatchesJsonLoadError, PatchingFailedError, BuilderError) as e:
logger.exception(f"Error processing {app_name}: {e}")
logger.info(f"Progress: {completed_count}/{total_apps} apps completed ({app_name} - FAILED)")
write_changelog_to_file(updates_info) write_changelog_to_file(updates_info)
+66 -20
View File
@@ -5,6 +5,7 @@ import hashlib
import pathlib import pathlib
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
from threading import Lock
from typing import Any, Self from typing import Any, Self
from loguru import logger from loguru import logger
@@ -61,6 +62,7 @@ class APP(object):
self: Self, self: Self,
config: RevancedConfig, config: RevancedConfig,
download_cache: dict[tuple[str, str], tuple[str, str]], download_cache: dict[tuple[str, str], tuple[str, str]],
download_lock: Lock,
) -> None: ) -> None:
"""Download apk to be patched, skipping if already downloaded (matching source and version).""" """Download apk to be patched, skipping if already downloaded (matching source and version)."""
from src.downloader.download import Downloader # noqa: PLC0415 from src.downloader.download import Downloader # noqa: PLC0415
@@ -81,16 +83,26 @@ class APP(object):
cache_key = (self.download_source, self.app_version) cache_key = (self.download_source, self.app_version)
if cache_key in download_cache: # Thread-safe cache check and download
logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})") with download_lock:
self.download_file_name, self.download_dl = download_cache[cache_key] if cache_key in download_cache:
return logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})")
self.download_file_name, self.download_dl = download_cache[cache_key]
return
downloader = DownloaderFactory.create_downloader(config=config, apk_source=self.download_source) # Check again after acquiring lock to handle race conditions
self.download_file_name, self.download_dl = downloader.download(self.app_version, self) if cache_key in download_cache:
logger.info(f"Skipping download. Reusing APK from cache for {self.app_name} ({self.app_version})")
self.download_file_name, self.download_dl = download_cache[cache_key]
return
# Save to cache using (source, version) tuple logger.info(f"Cache miss for {self.app_name} ({self.app_version}). Proceeding with download.")
download_cache[cache_key] = (self.download_file_name, self.download_dl) downloader = DownloaderFactory.create_downloader(config=config, apk_source=self.download_source)
self.download_file_name, self.download_dl = downloader.download(self.app_version, self)
# Save to cache using (source, version) tuple
download_cache[cache_key] = (self.download_file_name, self.download_dl)
logger.info(f"Added {self.app_name} ({self.app_version}) to download cache.")
def get_output_file_name(self: Self) -> str: def get_output_file_name(self: Self) -> str:
"""The function returns a string representing the output file name. """The function returns a string representing the output file name.
@@ -211,6 +223,7 @@ class APP(object):
self: Self, self: Self,
config: RevancedConfig, config: RevancedConfig,
resource_cache: dict[str, tuple[str, str]], resource_cache: dict[str, tuple[str, str]],
resource_lock: Lock,
) -> None: ) -> None:
"""The function `download_patch_resources` downloads various resources req. for patching. """The function `download_patch_resources` downloads various resources req. for patching.
@@ -220,6 +233,8 @@ class APP(object):
The `config` parameter is an instance of the `RevancedConfig` class. It is used to provide The `config` parameter is an instance of the `RevancedConfig` class. It is used to provide
configuration settings for the resource download tasks. configuration settings for the resource download tasks.
resource_cache: dict[str, tuple[str, str]] resource_cache: dict[str, tuple[str, str]]
resource_lock: Lock
Thread lock for safe access to resource_cache
""" """
logger.info("Downloading resources for patching.") logger.info("Downloading resources for patching.")
@@ -229,28 +244,59 @@ class APP(object):
(name, url, config, filter_pattern) for name, url, _, filter_pattern in base_tasks (name, url, config, filter_pattern) for name, url, _, filter_pattern in base_tasks
] ]
with ThreadPoolExecutor(config.max_resource_workers) as executor: # Use configurable worker count # Track which resources need to be downloaded (outside of lock to minimize lock time)
futures: dict[str, concurrent.futures.Future[tuple[str, str]]] = {} resources_to_download: list[tuple[str, str, RevancedConfig, str]] = []
# Thread-safe cache check
with resource_lock:
for resource_name, raw_url, cfg, assets_filter in download_tasks: for resource_name, raw_url, cfg, assets_filter in download_tasks:
url = raw_url.strip() url = raw_url.strip()
if url in resource_cache: if url in resource_cache:
logger.info(f"Skipping {resource_name} download, using cached resource: {url}") logger.info(f"Skipping {resource_name} download, using cached resource: {url}")
tag, file_name = resource_cache[url] tag, file_name = resource_cache[url]
self._handle_cached_resource(resource_name, tag, file_name) self._handle_cached_resource(resource_name, tag, file_name)
continue else:
resources_to_download.append((resource_name, url, cfg, assets_filter))
futures[resource_name] = executor.submit(self.download, url, cfg, assets_filter) # Download resources that are not cached (outside of lock for parallel downloads)
if resources_to_download:
with ThreadPoolExecutor(config.max_resource_workers) as executor:
futures: dict[str, concurrent.futures.Future[tuple[str, str]]] = {}
concurrent.futures.wait(futures.values()) for resource_name, url, cfg, assets_filter in resources_to_download:
futures[resource_name] = executor.submit(self.download, url, cfg, assets_filter)
for resource_name, future in futures.items(): concurrent.futures.wait(futures.values())
try:
tag, file_name = future.result() # Thread-safe cache update
self._handle_downloaded_resource(resource_name, tag, file_name, download_tasks, resource_cache) with resource_lock:
except BuilderError as e: for resource_name, future in futures.items():
msg = f"Failed to download {resource_name} resource." try:
raise PatchingFailedError(msg) from e tag, file_name = future.result()
# Double-check cache in case another thread already added it
corresponding_url = next(
url for name, url, _, _ in resources_to_download if name == resource_name
)
if corresponding_url not in resource_cache:
self._handle_downloaded_resource(
resource_name,
tag,
file_name,
download_tasks,
resource_cache,
)
logger.info(f"Added {resource_name} to resource cache: {corresponding_url}")
else:
logger.info(
f"Resource {resource_name} was already cached by another thread: "
f"{corresponding_url}",
)
# Still need to handle the resource for this app instance
cached_tag, cached_file_name = resource_cache[corresponding_url]
self._handle_cached_resource(resource_name, cached_tag, cached_file_name)
except BuilderError as e:
msg = f"Failed to download {resource_name} resource."
raise PatchingFailedError(msg) from e
@staticmethod @staticmethod
def generate_filename(url: str) -> str: def generate_filename(url: str) -> str:
+2
View File
@@ -32,3 +32,5 @@ class RevancedConfig(object):
self.global_old_key = env.bool("GLOBAL_OLD_KEY", True) self.global_old_key = env.bool("GLOBAL_OLD_KEY", True)
self.global_space_formatted = env.bool("GLOBAL_SPACE_FORMATTED_PATCHES", True) self.global_space_formatted = env.bool("GLOBAL_SPACE_FORMATTED_PATCHES", True)
self.max_resource_workers = env.int("MAX_RESOURCE_WORKERS", 3) self.max_resource_workers = env.int("MAX_RESOURCE_WORKERS", 3)
self.max_parallel_apps = env.int("MAX_PARALLEL_APPS", 4)
self.disable_caching = env.bool("DISABLE_CACHING", False)