import os import subprocess import sys from atexit import register from pathlib import Path from queue import PriorityQueue from shutil import rmtree from subprocess import PIPE, Popen from time import perf_counter from typing import Dict, List, Tuple from loguru import logger from requests import Session from selectolax.lexbor import LexborHTMLParser from tqdm import tqdm temp_folder = Path("apks") session = Session() session.headers["User-Agent"] = "anything" apps = ["youtube", "youtube-music", "twitter", "reddit"] apk_mirror = "https://www.apkmirror.com" class Downloader: _CHUNK_SIZE = 2**21 * 5 _QUEUE = PriorityQueue() _QUEUE_LENGTH = 0 @classmethod def _download(cls, url: str, file_name: str) -> None: logger.debug(f"Trying to download {file_name} from {url}") cls._QUEUE_LENGTH += 1 start = perf_counter() resp = session.get(url, stream=True) total = int(resp.headers.get("content-length", 0)) with temp_folder.joinpath(file_name).open("wb") as dl_file, tqdm( desc=file_name, total=total, unit="iB", unit_scale=True, unit_divisor=1024, colour="green", ) as bar: for chunk in resp.iter_content(cls._CHUNK_SIZE): size = dl_file.write(chunk) bar.update(size) cls._QUEUE.put((perf_counter() - start, file_name)) logger.debug(f"Downloaded {file_name}") @classmethod def extract_download_link(cls, page: str, app: str): logger.debug(f"Extracting download link from {page}") parser = LexborHTMLParser(session.get(page).text) resp = session.get( apk_mirror + parser.css_first("a.accent_bg").attributes["href"] ) parser = LexborHTMLParser(resp.text) href = parser.css_first( "p.notes:nth-child(3) > span:nth-child(1) > a:nth-child(1)" ).attributes["href"] cls._download(apk_mirror + href, f"{app}.apk") logger.debug(f"Finished Extracting and download link from {page}") @classmethod def apkmirror(cls, app: str, version: str) -> None: logger.debug(f"Trying to download {app} apk from apkmirror") version = "-".join( v.zfill(2 if i else 0) for i, v in enumerate(version.split(".")) ) logger.debug(f"Version for {app} to download from apkmirror is {version}") page = f""" {apk_mirror}/apk/google-inc/ {app}/{app}-{version}-release/{app}-{version}-android-apk-download/ """ cls.extract_download_link(page, app) @classmethod def find_second_last(cls, text: str, pattern: str) -> int: return text.rfind(pattern, 0, text.rfind(pattern)) @classmethod def apkmirror_reddit_twitter(cls, app: str, version: str) -> None: logger.debug(f"Trying to download {app} apk from apkmirror in rt") if app == "reddit": page = f"{apk_mirror}/apk/redditinc/reddit/" elif app == "twitter": page = f"{apk_mirror}/apk/twitter-inc/twitter/" else: logger.debug("Invalid app") sys.exit(1) parser = LexborHTMLParser(session.get(page).text) suburl = parser.css_first(".appRowVariantTag>.accent_color").attributes["href"] last2 = cls.find_second_last(suburl, "/") version_url = suburl[last2:] minus_occurance = version_url.rfind("-") version_url = version_url[:minus_occurance] download_url = version_url + "-2-android-apk-download/" url = apk_mirror + suburl + download_url[1:] page = url cls.extract_download_link(page, app) logger.debug(f"Downloaded {app} apk from apkmirror in rt") @classmethod def repository(cls, name: str) -> None: logger.debug(f"Trying to download {name} from github") resp = session.get( f"https://github.com/revanced/revanced-{name}/releases/latest" ) parser = LexborHTMLParser(resp.text) url = parser.css("li.Box-row > div:nth-child(1) > a:nth-child(2)")[:-2][ -1 ].attributes["href"] cls._download("https://github.com" + url, Path(url).with_stem(name).name) @classmethod def report(cls) -> None: started = False while True: item = cls._QUEUE.get() logger.debug(f"{item[1]} downloaded in {item[0]:.2f} seconds.") cls._QUEUE.task_done() cls._QUEUE_LENGTH -= 1 if not started: started = True elif started and not cls._QUEUE_LENGTH: break class Patches: def __init__(self) -> None: logger.debug("fetching all patches") resp = session.get( "https://raw.githubusercontent.com/revanced/revanced-patches/main/README.md" ) available_patches = [] for app in resp.text.split("### 📦 ")[1:]: lines = app.splitlines() app_name = lines[0][1:-1] app_patches = [] for line in lines: patch = line.split("|")[1:-1] if len(patch) == 3: (n, d, v), a = [i.replace("`", "").strip() for i in patch], app_name app_patches.append((n, d, a, v)) available_patches.extend(app_patches[2:]) youtube, music, twitter, reddit = [], [], [], [] for n, d, a, v in available_patches: patch = {"name": n, "description": d, "app": a, "version": v} if "twitter" in a: twitter.append(patch) elif "reddit" in a: reddit.append(patch) elif "music" in a: music.append(patch) elif "youtube" in a: youtube.append(patch) self._yt = youtube self._ytm = music self._twitter = twitter self._reddit = reddit logger.debug(f"Total patches in youtube are {len(youtube)}") logger.debug(f"Total patches in youtube-music are {len(music)}") logger.debug(f"Total patches in twitter are {len(twitter)}") logger.debug(f"Total patches in reddit are {len(reddit)}") def get(self, app: str) -> Tuple[List[Dict[str, str]], str]: logger.debug("Getting patches for %s" % app) if "twitter" == app: patches = self._twitter elif "reddit" == app: patches = self._reddit elif "youtube-music" == app: patches = self._ytm elif "youtube" == app: patches = self._yt else: logger.debug("Invalid app name") sys.exit(-1) version = "" if app in ("youtube", "youtube-music"): version = next(i["version"] for i in patches if i["version"] != "all") logger.debug("Version for app is %s" % version) else: logger.debug("Empty version because it's not youtube or youtube-music") return patches, version class ArgParser: _PATCHES = [] @classmethod def include(cls, name: str) -> None: cls._PATCHES.extend(["-i", name]) @classmethod def exclude(cls, name: str) -> None: cls._PATCHES.extend(["-e", name]) @classmethod def run(cls, app: str) -> None: logger.debug(f"Sending request to revanced cli for building {app} revanced") args = [ "-jar", "cli.jar", "-a", app + ".apk", "-b", "patches.jar", "-m", "integrations.apk", "-o", f"{app}-output.apk", ] if app == "reddit": args.append("-r") args.remove("-m") args.remove("integrations.apk") args[1::2] = map(lambda i: temp_folder.joinpath(i), args[1::2]) if cls._PATCHES: args.extend(cls._PATCHES) start = perf_counter() process = Popen(["java", *args], stdout=PIPE) for line in process.stdout: logger.debug(line.decode(), flush=True, end="") process.wait() logger.debug( f"Patching completed for app {app} in {perf_counter() - start:.2f} " f"seconds." ) @register def close() -> None: session.close() cache = Path("revanced-cache") if cache.is_dir(): rmtree(cache) def check_java() -> None: logger.debug("Checking if java is available") jd = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT) jd = str(jd)[1:-1] if "Runtime Environment" not in jd: logger.debug("Java Must be installed") exit(-1) if "17" not in jd: logger.debug("Java 17 Must be installed") exit(-1) logger.debug("Cool!! Java is available") def pre_requisite(): check_java() patches = Patches() return patches def main() -> None: patches = pre_requisite() downloader = Downloader downloader.repository("cli") downloader.repository("integrations") downloader.repository("patches") # downloader.report() def get_patches() -> None: logger.debug(f"Excluding patches for app {app}") selected_patches = list(range(0, len(app_patches))) if app == "youtube": selected_patches.remove(9) for i, v in enumerate(app_patches): arg_parser.include( v["name"] ) if i in selected_patches else arg_parser.exclude(v["name"]) logger.debug(f"Excluded patches for app {app}") for app in apps: try: arg_parser = ArgParser logger.debug("Trying to build %s" % app) app_patches, version = patches.get(app=app) if os.getenv(f"{app}_VERSION".upper()): print("here") version = os.getenv(f"{app}_VERSION".upper()) if app == "reddit" or app == "twitter": downloader.apkmirror_reddit_twitter(app, version) else: downloader.apkmirror(app, version) get_patches() # downloader.report() logger.debug(f"Download completed {app}") arg_parser.run(app=app) except Exception as e: logger.exception(f"Failed to build {app} because of {e}") sys.exit(-1) if __name__ == "__main__": try: main() except KeyboardInterrupt: logger.error("Script halted because of keyboard interrupt.") sys.exit(-1)