import re import subprocess import sys from atexit import register from concurrent.futures import ThreadPoolExecutor from pathlib import Path from queue import PriorityQueue from shutil import rmtree from subprocess import PIPE, Popen from time import perf_counter from typing import Any, Dict, List, Tuple, Type import requests from environs import Env from loguru import logger from requests import Session from selectolax.lexbor import LexborHTMLParser from tqdm import tqdm env = Env() temp_folder = Path("apks") session = Session() session.headers["User-Agent"] = "anything" supported_apps = [ "youtube", "youtube_music", "twitter", "reddit", "tiktok", "warnwetter", ] apps = env.list("PATCH_APPS", supported_apps) keystore_name = env.str("KEYSTORE_FILE_NAME", "revanced.keystore") apk_mirror = "https://www.apkmirror.com" github = "https://www.github.com" apk_mirror_urls = { "reddit": f"{apk_mirror}/apk/redditinc/reddit/", "twitter": f"{apk_mirror}/apk/twitter-inc/twitter/", "tiktok": f"{apk_mirror}/apk/tiktok-pte-ltd/tik-tok-including-musical-ly/", "warnwetter": f"{apk_mirror}/apk/deutscher-wetterdienst/warnwetter/", "youtube": f"{apk_mirror}/apk/google-inc/youtube/", "youtube_music": f"{apk_mirror}/apk/google-inc/youtube-music/", } apk_mirror_version_urls = { "reddit": f"{apk_mirror_urls.get('reddit')}reddit", "twitter": f"{apk_mirror_urls.get('twitter')}twitter", "tiktok": f"{apk_mirror_urls.get('tiktok')}tik-tok-including-musical-ly", "warnwetter": f"{apk_mirror_urls.get('warnwetter')}warnwetter", "youtube": f"{apk_mirror_urls.get('youtube')}youtube", "youtube_music": f"{apk_mirror_urls.get('youtube_music')}youtube-music", } class Downloader(object): _CHUNK_SIZE = 2**21 * 5 _QUEUE = PriorityQueue() _QUEUE_LENGTH = 0 @classmethod def _download(cls, url: str, file_name: str) -> None: logger.debug(f"Trying to download {file_name} from {url}") cls._QUEUE_LENGTH += 1 start = perf_counter() resp = session.get(url, stream=True) total = int(resp.headers.get("content-length", 0)) bar = tqdm( desc=file_name, total=total, unit="iB", unit_scale=True, unit_divisor=1024, colour="green", ) with temp_folder.joinpath(file_name).open("wb") as dl_file, bar: for chunk in resp.iter_content(cls._CHUNK_SIZE): size = dl_file.write(chunk) bar.update(size) cls._QUEUE.put((perf_counter() - start, file_name)) logger.debug(f"Downloaded {file_name}") @classmethod def extract_download_link(cls, page: str, app: str): logger.debug(f"Extracting download link from\n{page}") parser = LexborHTMLParser(session.get(page).text) resp = session.get( apk_mirror + parser.css_first("a.accent_bg").attributes["href"] ) parser = LexborHTMLParser(resp.text) href = parser.css_first( "p.notes:nth-child(3) > span:nth-child(1) > a:nth-child(1)" ).attributes["href"] cls._download(apk_mirror + href, f"{app}.apk") logger.debug("Finished Extracting link and downloading") @classmethod def get_download_page(cls, parser, main_page): apm = parser.css(".apkm-badge") sub_url = "" for is_apm in apm: if "APK" in is_apm.text(): parser = is_apm.parent sub_url = parser.css_first(".accent_color").attributes["href"] break if sub_url == "": logger.exception( f"Unable to find any apk on apkmirror_specific_version on {main_page}" ) sys.exit(-1) download_url = apk_mirror + sub_url return download_url @classmethod def apkmirror_specific_version(cls, app: str, version: str) -> str: logger.debug(f"Trying to download {app},specific version {version}") version = version.replace(".", "-") main_page = f"{apk_mirror_version_urls.get(app)}-{version}-release/" parser = LexborHTMLParser(session.get(main_page).text) download_page = cls.get_download_page(parser, main_page) cls.extract_download_link(download_page, app) logger.debug(f"Downloaded {app} apk from apkmirror_specific_version") return version @classmethod def apkmirror_latest_version(cls, app: str) -> str: logger.debug(f"Trying to download {app}'s latest version from apkmirror") page = apk_mirror_urls.get(app) if not page: logger.debug("Invalid app") sys.exit(1) parser = LexborHTMLParser(session.get(page).text) main_page = parser.css_first(".appRowVariantTag>.accent_color").attributes[ "href" ] int_version = re.search(r"\d", main_page).start() extra_release = main_page.rfind("release") - 1 version = main_page[int_version:extra_release] version = version.replace("-", ".") main_page = f"{apk_mirror}{main_page}" parser = LexborHTMLParser(session.get(main_page).text) download_page = cls.get_download_page(parser, main_page) cls.extract_download_link(download_page, app) logger.debug(f"Downloaded {app} apk from apkmirror_specific_version in rt") return version @classmethod def repository(cls, owner: str, name: str, file_name: str) -> None: logger.debug(f"Trying to download {name} from github") repo_url = f"https://api.github.com/repos/{owner}/{name}/releases/latest" r = requests.get( repo_url, headers={"Content-Type": "application/vnd.github.v3+json"} ) if name == "revanced-patches": download_url = r.json()["assets"][1]["browser_download_url"] else: download_url = r.json()["assets"][0]["browser_download_url"] cls._download(download_url, file_name=file_name) @classmethod def report(cls) -> None: started = False while True: item = cls._QUEUE.get() logger.debug(f"{item[1]} downloaded in {item[0]:.2f} seconds.") cls._QUEUE.task_done() cls._QUEUE_LENGTH -= 1 if not started: started = True elif started and not cls._QUEUE_LENGTH: break class Patches(object): def __init__(self) -> None: logger.debug("fetching all patches") resp = session.get( "https://raw.githubusercontent.com/revanced/revanced-patches/main/README.md" ) available_patches = [] for app in resp.text.split("### 📦 ")[1:]: lines = app.splitlines() app_name = lines[0][1:-1] app_patches = [] for line in lines: patch = line.split("|")[1:-1] if len(patch) == 3: (n, d, v), a = [i.replace("`", "").strip() for i in patch], app_name app_patches.append((n, d, a, v)) available_patches.extend(app_patches[2:]) youtube, music, twitter, reddit, tiktok, warnwetter = [], [], [], [], [], [] for n, d, a, v in available_patches: patch = {"name": n, "description": d, "app": a, "version": v} if "twitter" in a: twitter.append(patch) elif "reddit" in a: reddit.append(patch) elif "music" in a: music.append(patch) elif "youtube" in a: youtube.append(patch) elif "trill" in a: tiktok.append(patch) elif "warnapp" in a: warnwetter.append(patch) self._yt = youtube self._ytm = music self._twitter = twitter self._reddit = reddit self._tiktok = tiktok self._warnwetter = warnwetter logger.debug(f"Total patches in youtube are {len(youtube)}") logger.debug(f"Total patches in youtube-music are {len(music)}") logger.debug(f"Total patches in twitter are {len(twitter)}") logger.debug(f"Total patches in reddit are {len(reddit)}") logger.debug(f"Total patches in tiktok are {len(tiktok)}") logger.debug(f"Total patches in warnwetter are {len(warnwetter)}") def get(self, app: str) -> Tuple[List[Dict[str, str]], str]: logger.debug("Getting patches for %s" % app) if "twitter" == app: patches = self._twitter elif "reddit" == app: patches = self._reddit elif "youtube_music" == app: patches = self._ytm elif "youtube" == app: patches = self._yt elif "tiktok" == app: patches = self._tiktok elif "warnwetter" == app: patches = self._warnwetter else: logger.debug("Invalid app name") sys.exit(-1) version = "" if app in ("youtube", "youtube_music"): version = next(i["version"] for i in patches if i["version"] != "all") logger.debug(f"Recommended Version for patching {app} is {version}") else: logger.debug("No recommended version.") return patches, version class ArgParser(object): def __init__(self): self._PATCHES = [] self._EXCLUDED = [] def include(self, name: str) -> None: self._PATCHES.extend(["-i", name]) def exclude(self, name: str) -> None: self._PATCHES.extend(["-e", name]) self._EXCLUDED.append(name) def get_excluded_patches(self) -> List[Any]: return self._EXCLUDED def run(self, app: str, version: str, is_experimental: bool = False) -> None: logger.debug(f"Sending request to revanced cli for building {app} revanced") args = [ "-jar", "revanced-cli.jar", "-a", app + ".apk", "-b", "revanced-patches.jar", "-m", "revanced-integrations.apk", "-o", f"Re-{app}-{version}-output.apk", "--keystore", keystore_name, ] if is_experimental: logger.debug("Using experimental features") args.append("--experimental") if app in ("reddit", "tiktok"): args.remove("-m") args.remove("revanced-integrations.apk") args[1::2] = map(lambda i: temp_folder.joinpath(i), args[1::2]) if app in ("reddit", "tiktok"): args.append("-r") if self._PATCHES: args.extend(self._PATCHES) start = perf_counter() process = Popen(["java", *args], stdout=PIPE) for line in process.stdout: logger.debug(line.decode(), flush=True, end="") process.wait() logger.debug( f"Patching completed for app {app} in {perf_counter() - start:.2f} " f"seconds." ) @register def close() -> None: session.close() cache = Path("revanced-cache") if cache.is_dir(): rmtree(cache) def check_java() -> None: logger.debug("Checking if java is available") jd = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT) jd = str(jd)[1:-1] if "Runtime Environment" not in jd: logger.debug("Java Must be installed") exit(-1) if "17" not in jd: logger.debug("Java 17 Must be installed") exit(-1) logger.debug("Cool!! Java is available") def pre_requisite(): check_java() patches = Patches() return patches def download_revanced(downloader: Type[Downloader]) -> None: assets = ( ("revanced", "revanced-cli", "revanced-cli.jar"), ("revanced", "revanced-integrations", "revanced-integrations.apk"), ("revanced", "revanced-patches", "revanced-patches.jar"), ("inotia00", "VancedMicroG", "VancedMicroG.apk"), ) with ThreadPoolExecutor() as executor: executor.map(lambda repo: downloader.repository(*repo), assets) logger.info("Downloaded revanced microG ,cli, integrations and patches.") def download_from_apkmirror( version: str, app: str, downloader: Type[Downloader] ) -> str: if version and version != "latest": return downloader.apkmirror_specific_version(app, version) else: return downloader.apkmirror_latest_version(app) def main() -> None: patches = pre_requisite() downloader = Downloader download_revanced(downloader) def get_patches() -> None: logger.debug(f"Excluding patches for app {app}") excluded_patches = env.list(f"EXCLUDE_PATCH_{app}".upper(), []) for patch in app_patches: arg_parser.include(patch["name"]) if patch[ "name" ] not in excluded_patches else arg_parser.exclude(patch["name"]) excluded = arg_parser.get_excluded_patches() if excluded: logger.debug(f"Excluded patches {excluded} for {app}") else: logger.debug(f"No excluded patches for {app}") def get_patches_version() -> Any: experiment = False total_patches, recommended_version = patches.get(app=app) env_version = env.str(f"{app}_VERSION".upper(), None) if env_version: logger.debug(f"Picked {app} version {env_version} from env.") if env_version == "latest" or env_version > recommended_version: experiment = True recommended_version = env_version return total_patches, recommended_version, experiment logger.info(f"Will Patch only {apps}") for app in apps: try: arg_parser = ArgParser() logger.debug("Trying to build %s" % app) app_patches, version, is_experimental = get_patches_version() version = download_from_apkmirror(version, app, downloader) get_patches() logger.debug(f"Downloaded {app}, version {version}") arg_parser.run(app=app, version=version, is_experimental=is_experimental) except Exception as e: logger.exception(f"Failed to build {app} because of {e}") if __name__ == "__main__": try: main() except KeyboardInterrupt: logger.error("Script halted because of keyboard interrupt.") sys.exit(-1)