diff --git a/deployment/core/bootstrap.py b/deployment/core/bootstrap.py index a8d3c29..19abe76 100644 --- a/deployment/core/bootstrap.py +++ b/deployment/core/bootstrap.py @@ -1,6 +1,7 @@ import os import shutil +from core.tasks import GetDeploymentConfig from lib.printer import clear_screen from lib.types import Stage from lib.task_types import SuiteTask, SuiteSubTask @@ -26,6 +27,7 @@ class EnsureBuildPaths(SuiteTask): + _deps = [GetDeploymentConfig] _can_skip = False """Task 1: Ensure build paths exist""" diff --git a/deployment/core/suite.py b/deployment/core/suite.py index 0e22a5b..0809d25 100644 --- a/deployment/core/suite.py +++ b/deployment/core/suite.py @@ -2,9 +2,51 @@ from pathlib import Path from lib.errors import SuiteError -from lib.task_types import SuiteTask +from lib.task_types import SuiteTask, Task from core.task_runner import TaskRunner from lib.types import BuildEnv +from core.bootstrap import * +from core.task_runner import * +from core.suite import * + + +def load_parser(): + parser = argparse.ArgumentParser(description="Blog Deployment Suite") + + parser.add_argument("--config", required=True) + parser.add_argument("--task", type=str, default="HealthCheck") + parser.add_argument("--hotfix", action="store_true") + parser.add_argument("--full-pipeline", action="store_true") + parser.add_argument("--branch", required=True) + parser.add_argument("--root", type=str, help="The root directory of the project") + parser.add_argument("--stage", type=str, help="Run a specific stage of the build") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a trial run without executing tasks", + ) + print("ran parser") + + # Capture specific test names: e.g., --tests ArtifactsTest.Cleanup FullChainTest.EndToEnd + parser.add_argument( + "--tests", + nargs="+", + type=str, + default=[], + help="List of specific test names", + ) + + # Capture a regex filter: e.g., --filter Artifacts.* + parser.add_argument( + "--filter", type=str, help="Filter tests with regex (maps to ctest -R)" + ) + + # --tasks 1 2 5 + parser.add_argument("--tasks", nargs="+", type=int, help="List of task IDs to run") + + # --skip 0 3 + parser.add_argument("--skip", nargs="+", type=int, help="List of task IDs to skip") + return parser class DeploymentSuite(SuiteTask): @@ -29,6 +71,7 @@ self._owner = self self._parser() + Task.owner = self super().__init__(self, *args, owner=self, *kwargs) self._parent = self @@ -38,50 +81,8 @@ self.kwargs = kwargs def _parser(self): - parser = argparse.ArgumentParser(description="Blog Deployment Suite") - parser.add_argument("--config", required=True) - parser.add_argument("--hotfix", action="store_true") - parser.add_argument("--full-pipeline", action="store_true") - parser.add_argument("--branch", required=True) - parser.add_argument( - "--root", type=str, help="The root directory of the project" - ) - parser.add_argument( - "--stage", type=str, help="Run a specific stage of the build" - ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Perform a trial run without executing tasks", - ) - print("ran parser") - - # Capture specific test names: e.g., --tests ArtifactsTest.Cleanup FullChainTest.EndToEnd - parser.add_argument( - "--tests", - nargs="+", - type=str, - default=[], - help="List of specific test names", - ) - - # Capture a regex filter: e.g., --filter Artifacts.* - parser.add_argument( - "--filter", type=str, help="Filter tests with regex (maps to ctest -R)" - ) - - # --tasks 1 2 5 - parser.add_argument( - "--tasks", nargs="+", type=int, help="List of task IDs to run" - ) - - # --skip 0 3 - parser.add_argument( - "--skip", nargs="+", type=int, help="List of task IDs to skip" - ) - - self.parser = parser + parser = load_parser() self._owner.args = vars(parser.parse_args()) def initialized(): @@ -94,4 +95,36 @@ raise SuiteError(self, *args, **kwargs) def _run(self): - TaskRunner(self, owner=self._owner).run() + from core.tests import TestRunner + + from core.tasks import ( + GetDeploymentConfig, + LoadServerConfig, + HotFix, + YarnBuild, + AtomicDeploy, + HealthCheck, + PipelineSuccess, + ) + + all_tasks = [ + CheckNix, + VerifySystemDependencies, + GetDeploymentConfig, + LoadServerConfig, + HotFix, + EnsureBuildPaths, + YarnBuild, + TestRunner, + AtomicDeploy, + HealthCheck, + ] + Task.__init__(self._owner, all_tasks) + task = self.get_arg("task") + Task.run(task) + return self + runner = TaskRunner( + self, all_tasks, base_class_name="SuiteTask", owner=self._owner + ) + runner.queue_tasks(all_tasks) + runner.run() diff --git a/deployment/core/task_runner.py b/deployment/core/task_runner.py index 5b06f4b..d7b86c7 100755 --- a/deployment/core/task_runner.py +++ b/deployment/core/task_runner.py @@ -1,13 +1,14 @@ +from os import wait from typing import List -from lib.types import Stage +from lib.types import Stage, typename from lib.errors import TaskError from lib.task_types import SuiteTask from core.bootstrap import * from core.task_runner import * from core.suite import * -from core.tests import * + from core.tasks import ( GetDeploymentConfig, @@ -26,6 +27,9 @@ tasks: list | None # Input from cli _all_tasks: List[SuiteTask] _queue: List = [] + _loaded: dict = {} + _initialized: bool = False + base_class_name: str def __init__( self, @@ -34,6 +38,7 @@ skip: list | None = None, tasks: list | None = None, stage: Stage = Stage.ANY, + base_class_name="TaskRunner", **kwargs, ): super().__init__(owner, *args, owner=owner, **kwargs) @@ -42,51 +47,136 @@ self.name = "Task Runner" self._skip = skip - self._all_tasks = [] + self._all_tasks: List = [] self.tasks = tasks self.current_stage = stage + self.base_class_name = base_class_name - all_tasks = [ - CheckNix, - VerifySystemDependencies, - GetDeploymentConfig, - LoadServerConfig, - HotFix, - EnsureBuildPaths, - YarnBuild, - TestRunner, - AtomicDeploy, - HealthCheck, - # DetermineRoot, - # VerifyEnvironment, - # EnsureBuildPaths, - # RunUnitTests, - # UpdateVersion, - # BuildServer, - # StartServer, - ] - for _task in all_tasks: + def queue_tasks(all_tasks): + return self._queue_tasks(all_tasks, *args, **kwargs) - task = _task(owner, *args, owner=owner, **kwargs) - self._all_tasks.append(task) + self.queue_tasks = queue_tasks + if TaskRunner._initialized: + return + + TaskRunner._initialized = True + + def _queue_tasks(self, all_tasks, *args, cls_name="SuiteTask", **kwargs): + selected_task = self.get_arg("tasks") + if selected_task: + self.print("Selected Task: ", type(selected_task)) + tasks = [task for task in all_tasks if task.__name__ == selected_task] + else: + tasks = all_tasks + + task_suite = TaskRunner._loaded + + for _task in tasks: + try: + task = _task(self._owner, *args, owner=self._owner, **kwargs) + self._all_tasks.append(task) + task_suite[typename(task)] = task + except Exception as e: + raise Exception("Task initialization failed: ", e) from e + return self @staticmethod - def is_loaded(deps: List[type[SuiteTask]]): + def load_deps(parent, deps: List[SuiteTask], cls_name="SuiteTask"): + from lib.task_types import typename + + parent.print(" Loading deps for ", typename(parent)) + for dep in deps: + # print("test", dep) + dep_name = dep.__name__ + parent.print(" dep: ", dep_name) + try: + if not TaskRunner.is_loaded(deps): + raise Exception("This dependency was not initialized.", deps) + elif dep.skip: + raise Exception("This dependency was marked as 'skip'.") + + dep_list = TaskRunner._get_dep_list() + # print("test", dep_list) + if dep_name not in dep_list.keys(): + + # parent.print(f"Running {type(dep)}", end="") + # parent.print(f"Running {dep}", end="") + + task = TaskRunner._get_dep(dep_name) + + task.run() + parent.print("Complete.") + + except Exception as e: + raise Exception("Dependency resolution failed: ", e) from e + + @staticmethod + def _get_dep_list(): + try: + task_suite = TaskRunner._loaded + # task_suite = TaskRunner._loaded.get(base_class_name) + except AttributeError as e: + raise Exception("Oops i messed up", e) + + if type(task_suite).__name__ != "dict": + # print(TaskRunner._loaded) + raise AttributeError(f"Expected a dict, got: {task_suite}: ") + return task_suite + + @staticmethod + def _get_dep(dep_name): + task_suite = TaskRunner._get_dep_list() + # print(TaskRunner._loaded) + if len(task_suite.keys()) == 0: + raise AttributeError( + f"Dependency '{dep_name}' not set in {task_suite_name}", cls_name + ) + task = task_suite.get(dep_name) + if typename(task) not in ["SuiteTask", "SuiteSubTask"]: + raise AttributeError(f"Dependency '{dep_name}' not found in task_suite") + return task + + @staticmethod + def is_loaded(deps: List[SuiteTask]): """ Validates if all required task types exist in the TaskRunner queue. TaskRunner.queue is expected to be a set or list of SuiteTask instances. """ # Extract the classes of the tasks currently in the queue - queued_task_types = {type(task) for task in TaskRunner._queue} + if len(deps) == 0: + return True + results = [] + for dep in deps: + # Get the base class name + base_class = dep.__bases__[0].__name__ - # Returns True if every dependency class is found in the queued types - return all(dep in queued_task_types for dep in deps) + # Lookup the task in that queue + dep_list = TaskRunner._get_dep_list().values() - def _run(self): + dep_name_list = [type(task).__name__ for task in dep_list] + + print(dep.__name__) # The dependency's name + print(dep_name_list) + + # for d in dep_base_classes: + + # Is the dep in that task list? + if dep.__name__ in dep_name_list: + results.append(True) + for result in results: + if not result: + return False + + return True + + def _sanity_check(self): if self._owner is None: raise ValueError("Owner is not set") if self._parent is None: raise ValueError("Parent is not set") + + def _run(self): + self._sanity_check() all_tasks = self._all_tasks if self.tasks is not None and len(self.tasks) > 0: @@ -108,22 +198,31 @@ self.print(f"Queue initialized with {len(queue)} tasks") + self._runner(queue) + + def _runner(self, queue): # Execute the filtered queue + selected_task = self.get_arg("task") for task in queue: - self.last_task = task.name - try: - # current_stage = self.current_stage - # task_stage = task.get_stage() - # if task_stage is not current_stage: - # continue - if task.run() is False: - self.fail(f"Pipeline stopped at task: {task.name}") - except PipelineSuccess as e: - self.print(e) - break - except ModuleNotFoundError as e: - self.print(f" [ERROR] Task {task.name} failed: {e}") - self.fail(f"Pipeline stopped at task: {self.last_task}") - except TaskError as e: - self.print(f" [ERROR] Task {task.name} failed: {e}") - self.fail(f"Pipeline stopped at task: {self.last_task}") + from lib.task_types import typename + + task_name = typename(task) + + if selected_task is None or task_name == selected_task: + self.last_task = task_name + try: + # current_stage = self.current_stage + # task_stage = task.get_stage() + # if task_stage is not current_stage: + # continue + if task.run() is False: + self.fail(f"Pipeline stopped at task: {task.name}") + except PipelineSuccess as e: + self.print(e) + break + except ModuleNotFoundError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") + except TaskError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") diff --git a/deployment/core/tasks.py b/deployment/core/tasks.py index 513bd32..9138633 100644 --- a/deployment/core/tasks.py +++ b/deployment/core/tasks.py @@ -141,7 +141,7 @@ # 2. Pull changes try: self.sh( - "sudo -u {self.env.user} git pull origin " + self.env.deploy_branch, + "git pull origin " + self.env.deploy_branch, cwd=live_path, handle_exception=False, ) @@ -181,7 +181,7 @@ if os.path.exists(build_git_path): try: self.sh( - f"sudo -u {self.env.user} git pull origin {self.env.deploy_branch}", + f"git pull origin {self.env.deploy_branch}", cwd=self.env.build_dir, handle_exception=False, ) @@ -219,7 +219,9 @@ def _run(self): # Determine success from the TestRunner flag - test_success = getattr(self.env, "test_success", False) + test_success = getattr(self.env, "test_success", False) or not self.get_arg( + "enforce_testing" + ) # Select appropriate Lua config table cfg = self.env.release if test_success else self.env.testing @@ -249,6 +251,7 @@ else: self.print(" [SKIP] Test failure detected. Symlink swap bypassed.") self.print(f" [INFO] Artifact preserved at: {final_release_dir}") + self.fail() return True diff --git a/deployment/core/tests.py b/deployment/core/tests.py index ea6feff..ce5eb60 100644 --- a/deployment/core/tests.py +++ b/deployment/core/tests.py @@ -123,7 +123,6 @@ runner.queue_tasks(sub_tasks).run() def _run(self): - return # 1. Check if we should even be here skip_param = self.args.get("skip_tests", False) @@ -156,11 +155,10 @@ self.fail(f" [ERROR] Critical failure during test execution: {e}") finally: - if self.do_dry_run(): - return - # 3. Forced Teardown - # If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted - self.print(" [CLEAN] Ensuring test environment teardown...") - cleanup = StopTestApp(parent=self, owner=self._owner) - cleanup.run() - self.env.test_success = success + if not self.do_dry_run(): + # 3. Forced Teardown + # If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted + self.print(" [CLEAN] Ensuring test environment teardown...") + cleanup = StopTestApp(parent=self, owner=self._owner) + cleanup.run() + self.env.test_success = success diff --git a/deployment/lib/errors.py b/deployment/lib/errors.py index e4e64e1..72a1bbe 100644 --- a/deployment/lib/errors.py +++ b/deployment/lib/errors.py @@ -7,14 +7,22 @@ self, parent, *args, critical: bool = False, code: int | None = None, **kwargs ): super().__init__(*args, **kwargs) - parent.dump_print_queue() - traceback.print_stack() - print(*args, **kwargs) + try: + if type(parent).__name__ == "str": + print(parent) + raise Exception("Um... this is embarrassing. Parent? ", parent) + parent.dump_print_queue() + traceback.print_stack() + print(*args, **kwargs) - if code is not None: - sys.exit(code) - if critical: - raise RuntimeError(*args, **kwargs) + if code is not None: + sys.exit(code) + if critical: + raise RuntimeError(*args, **kwargs) + except Exception as e: + raise Exception( + f"There was an error while handling an exception: {e}" + ) from e sys.exit(1) diff --git a/deployment/lib/printer.py b/deployment/lib/printer.py index b5df1b8..49294e4 100755 --- a/deployment/lib/printer.py +++ b/deployment/lib/printer.py @@ -24,7 +24,7 @@ ): self._parent = parent self._instance = instance - self._parent_id = parent._id if parent else 0 + self._parent_id = type(parent) self._instance_id = instance.get_id() def dump(self): diff --git a/deployment/lib/task_types.py b/deployment/lib/task_types.py index 5b33757..b640762 100755 --- a/deployment/lib/task_types.py +++ b/deployment/lib/task_types.py @@ -5,16 +5,117 @@ import subprocess from pathlib import Path from abc import ABC, abstractmethod -from typing import List, TYPE_CHECKING +from typing import List, TYPE_CHECKING, Optional from shlex import split as shlex_split from lib.printer import Printer from lib.errors import TaskError +from lib.types import typename if TYPE_CHECKING: from types import Stage, BuildEnv from task_types import BlogDeploySuite + from task_runner import TaskRunner + + +class Task: + _initialized: bool = False + _registry: dict = {} + _loaded: dict = {} + _completed: dict = {} + _owner = None + + # 1. The owner initializes the class + @staticmethod + def __init__(owner, task_list): + if Task._initialized: + return + Task._owner = owner + for task in task_list: + task_name = Task.get_key(task) + print(f"Registring {task_name}") + Task._registry[task_name] = task + return + + # 2. Add a dependency from the registry + @staticmethod + def add(key): + """Adds a depndency only if it exists in the registry""" + key = Task.get_key(key) + + # 1. Determine if the key exists in the registry + if not Task.exists(key): + raise ValueError(f"Dependency {key} does not exist in the registry") + + # 2. Determine if the key has been initialized + if Task.initialized(key): + print(f"Fetching task: {key}") + # Return the key if already initialized + return Task._loaded[key] + + # 3. Initialize the key + print(f"Initializing {key}") + dep = Task._registry[key] + task = dep(Task._owner, owner=Task._owner) + Task._loaded[key] = task + return task + + @staticmethod + def exists(key): + key = Task.get_key(key) + return key in Task._registry.keys() + + + @staticmethod + def initialized(key): + """Returns the initialized object""" + key = Task.get_key(key) + return key in Task._completed.keys() + + + # 3. Run the task + @staticmethod + def run(key): + """Runs a task from the registry""" + key = Task.get_key(key) + + # 1. Determine if the task has already ran + if Task.completed(key): + print(f"fetching result for {key}") + return Task._completed[key] + + # 2. Initialize the dependency + # This is harmless if already initialized + # due to internal checks + task = Task.add(key) # Also provides the object + + # 3. Run the task and store its result + print(f"Running task: {key}") + result = task.run() + Task._completed[key] = result + + return result + + @staticmethod + def get_key(key): + """Convert a raw class to a key""" + if type(key) is not str: + key = key.__name__ + return key + + @staticmethod + def completed(key): + """Returns a bool if the task has been completed or not""" + key = Task.get_key(key) + return key in Task._completed.keys() + + + + + @staticmethod + def get_owner(): + return Task._owner class SuiteTask(ABC): @@ -32,6 +133,8 @@ _initialized = False _deps = [] env: "BuildEnv" + complete: bool = False + skip_list: List = [] def __init__( self, @@ -42,15 +145,18 @@ attach_printer: bool = True, **kwargs, ): + self.add_deps() + from lib.task_types import SuiteTask if owner is None and not SuiteTask._initialized: raise ValueError("Owner is not set") if parent is None: raise ValueError("Parent is not set") - if kwargs and self.__class__.__name__ in kwargs.get("skip"): - self.skip = True - return + # print(kwargs, self.__class__.__name__) + # if kwargs and self.__class__.__name__ in self.get_arg("skip_list"): + # self.skip = True + # return SuiteTask._initialized = True if cwd is not None: @@ -77,6 +183,21 @@ if attach_printer: self.attach_printer(parent) + + def initialize_deps(): + for dep in self._deps: + dep_name = dep.__name__ + if not Task.initalized(dep_name): + Task.load(dep) + + def add_deps(self): + for dep in self._deps: + Task.add(dep) + + def run_deps(self): + for dep in self._deps: + Task.run(dep) + def get_arg(self, arg): return self._owner.args.get(arg) @@ -141,6 +262,15 @@ self.do_dry_run = func def run(self): + self.print("Running ", typename(self)) + try: + if len(self._deps) > 0: + from core.task_runner import TaskRunner + + TaskRunner.add_deps(self) + TaskRunner.run_deps(self) + except Exception as e: + raise Exception(f"Failed to load dependency for {typename(self)}: {e}") return self._run() def fail(self, *args, critical: bool = False, **kwargs): @@ -313,3 +443,17 @@ @staticmethod def get_count(): return SuiteSubTask._sub_counter + + def run(self): + self.print("Running ", typename(self)) + try: + if len(self._deps) > 0: + from core.task_runner import TaskRunner + + TaskRunner.load_deps( + self, + self._deps, + ) + except Exception as e: + raise Exception(f"Failed to load dependency for {typename(self)}: {e}") + return self._run() diff --git a/deployment/lib/types.py b/deployment/lib/types.py index bbac897..908a929 100644 --- a/deployment/lib/types.py +++ b/deployment/lib/types.py @@ -3,6 +3,10 @@ from pathlib import Path +def typename(t): + return type(t).__name__ + + class Stage(Enum): ANY = "any" BOOTSTRAP = "bootstrap" diff --git a/deployment/main.py b/deployment/main.py index 28858d4..4ff19d4 100644 --- a/deployment/main.py +++ b/deployment/main.py @@ -9,18 +9,23 @@ runner = DeploymentSuite() exit_code = 0 + import traceback + try: runner.run() print("🚀 Deployment Successful") exit_code = 0 except KeyboardInterrupt: - runner.print("\n[System] Termination signal received. Cleaning up...") runner.dump_print_queue() + traceback.print_exc() + runner.print("\n[System] Termination signal received. Cleaning up...") exit_code = 0 except Exception as e: + runner.dump_print_queue() + traceback.print_exc() print(f"❌ Deployment Failed at: {e.with_traceback(e.__traceback__)}") exit_code = 1 - runner.dump_print_queue() if exit_code != 0: print(exit_code) sys.exit(exit_code or 1) + runner.dump_print_queue()