diff --git a/deployment/__init__.py b/deployment/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/deployment/__init__.py +++ /dev/null diff --git a/deployment/__main__.py b/deployment/__main__.py deleted file mode 100644 index 0d3ea05..0000000 --- a/deployment/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -if __name__ == "__main__": - from deployment_pipeline.main import main - - main() diff --git a/deployment/core/__init__.py b/deployment/core/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/deployment/core/__init__.py +++ /dev/null diff --git a/deployment/core/bootstrap.py b/deployment/core/bootstrap.py deleted file mode 100644 index ef3ca0b..0000000 --- a/deployment/core/bootstrap.py +++ /dev/null @@ -1,26 +0,0 @@ -import shutil - -from pipeline_runner.lib.types import Stage -from pipeline_runner.lib.task_types import SuiteTask - - -class VerifySystemDependencies(SuiteTask): - _can_skip = False - _stage = Stage.BOOTSTRAP - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Verifying System Dependencies" - - def _run(self): - """Verifies non-Python dependencies required for the C++ Core.""" - if self._owner._in_nix_shell: - self.print("Skipping: in nix shell") - return True - - deps = ["yarn", "git", "rsync", "curl", "node"] - for dep in deps: - if not shutil.which(dep): - self.fail(f"Missing system dependency: {dep}") - self.printer.print(" [OK] System tools detected.") - return True diff --git a/deployment/core/suite.py b/deployment/core/suite.py deleted file mode 100644 index 046afa0..0000000 --- a/deployment/core/suite.py +++ /dev/null @@ -1,127 +0,0 @@ -import argparse -from pathlib import Path - -from pipeline_runner.lib.exceptions import SuiteError -from pipeline_runner.lib.task_types import SuiteTask, Task -from pipeline_runner.core.bootstrap import * - -from deployment_pipeline.core.task_runner import * - -from deployment_pipeline.lib.types import BuildEnv - -# from core.suite import * - - -def load_parser(): - parser = argparse.ArgumentParser(description="Blog Deployment Suite") - - parser.add_argument("--config", required=True) - parser.add_argument("--task", type=str, default="HealthCheck") - parser.add_argument("--hotfix", action="store_true") - parser.add_argument("--full-pipeline", action="store_true") - parser.add_argument("--branch", required=True) - parser.add_argument("--root", type=str, help="The root directory of the project") - parser.add_argument("--stage", type=str, help="Run a specific stage of the build") - parser.add_argument( - "--dry-run", - action="store_true", - help="Perform a trial run without executing tasks", - ) - print("ran parser") - - # Capture specific test names: e.g., --tests ArtifactsTest.Cleanup FullChainTest.EndToEnd - parser.add_argument( - "--tests", - nargs="+", - type=str, - default=[], - help="List of specific test names", - ) - - # Capture a regex filter: e.g., --filter Artifacts.* - parser.add_argument( - "--filter", type=str, help="Filter tests with regex (maps to ctest -R)" - ) - - # --tasks 1 2 5 - parser.add_argument("--tasks", nargs="+", type=int, help="List of task IDs to run") - - # --skip 0 3 - parser.add_argument("--skip", nargs="+", type=int, help="List of task IDs to skip") - return parser - - -class DeploymentSuite(SuiteTask): - """ - Orchestrates the Hexascript logic verification pipeline. - Replaces tdd_loop.sh with zero subprocess overhead for Python logic. - """ - - name = "Deployment Test Runner" - root_dir: Path | None - _in_nix_shell: bool - _owner: "DeploymentSuite" - - def __init__(self, *args, root: str | None = None, **kwargs): - self.disable_dry_run() - self.parser = None - self._in_nix_shell = False - self.engine = None - self.env = BuildEnv() - self.args: dict = dict() - self.toml: dict = dict() - - self._owner = self - self._parser() - Task.owner = self - super().__init__(self, *args, owner=self, *kwargs) - - self._parent = self - - self.root_dir = Path(root) if root else None - - self.kwargs = kwargs - - def _parser(self): - - parser = load_parser() - self._owner.args = vars(parser.parse_args()) - - def initialized(): - print("Parser already initialized") - - self._parser = initialized - - def fail(self, *args, **kwargs): - """Helper to raise the state-aware exception.""" - raise SuiteError(self, *args, **kwargs) - - def _run(self): - from core.tests import TestRunner - - from core.tasks import ( - GetDeploymentConfig, - LoadServerConfig, - HotFix, - YarnBuild, - AtomicDeploy, - HealthCheck, - PipelineSuccess, - ) - - all_tasks = [ - CheckNix, - VerifySystemDependencies, - GetDeploymentConfig, - LoadServerConfig, - HotFix, - EnsureBuildPaths, - YarnBuild, - TestRunner, - AtomicDeploy, - HealthCheck, - ] - Task.__init__(self._owner, all_tasks) - task = self.get_arg("task") - Task.run(task) - return self diff --git a/deployment/core/task_runner.py b/deployment/core/task_runner.py deleted file mode 100755 index ed0bda4..0000000 --- a/deployment/core/task_runner.py +++ /dev/null @@ -1,220 +0,0 @@ -from os import wait -from typing import List - -from pipeline_runner.lib.types import Stage, typename -from pipeline_runner.lib.exceptions import TaskError -from pipeline_runner.lib.task_types import SuiteTask - -from pipeline_runner.core.bootstrap import * -from deployment_pipeline.core.task_runner import * -from pipeline_runner.core.suite import * - - -from deployment_pipeline.core.tasks import PipelineSuccess - - -class TaskRunner(SuiteTask): - _stage = Stage.BOOTSTRAP - skip: list | None - tasks: list | None # Input from cli - _all_tasks: List[SuiteTask] - _queue: List = [] - _loaded: dict = {} - _initialized: bool = False - base_class_name: str - - def __init__( - self, - *args, - owner: "DeploymentSuite", - skip: list | None = None, - tasks: list | None = None, - stage: Stage = Stage.ANY, - base_class_name="TaskRunner", - **kwargs, - ): - super().__init__(owner, *args, owner=owner, **kwargs) - self.last_task = None - self.disable_dry_run() - - self.name = "Task Runner" - self._skip = skip - self._all_tasks: List = [] - self.tasks = tasks - self.current_stage = stage - self.base_class_name = base_class_name - - def queue_tasks(all_tasks): - return self._queue_tasks(all_tasks, *args, **kwargs) - - self.queue_tasks = queue_tasks - if TaskRunner._initialized: - return - - TaskRunner._initialized = True - - def _queue_tasks(self, all_tasks, *args, cls_name="SuiteTask", **kwargs): - selected_task = self.get_arg("tasks") - if selected_task: - self.print("Selected Task: ", type(selected_task)) - tasks = [task for task in all_tasks if task.__name__ == selected_task] - else: - tasks = all_tasks - - task_suite = TaskRunner._loaded - - for _task in tasks: - try: - task = _task(self._owner, *args, owner=self._owner, **kwargs) - self._all_tasks.append(task) - task_suite[typename(task)] = task - except Exception as e: - raise Exception("Task initialization failed: ", e) from e - return self - - @staticmethod - def load_deps(parent, deps: List[SuiteTask], cls_name="SuiteTask"): - from lib.task_types import typename - - parent.print(" Loading deps for ", typename(parent)) - for dep in deps: - # print("test", dep) - dep_name = dep.__name__ - parent.print(" dep: ", dep_name) - try: - if not TaskRunner.is_loaded(deps): - raise Exception("This dependency was not initialized.", deps) - elif dep.skip: - raise Exception("This dependency was marked as 'skip'.") - - dep_list = TaskRunner._get_dep_list() - # print("test", dep_list) - if dep_name not in dep_list.keys(): - - # parent.print(f"Running {type(dep)}", end="") - # parent.print(f"Running {dep}", end="") - - task = TaskRunner._get_dep(dep_name) - - task.run() - parent.print("Complete.") - - except Exception as e: - raise Exception("Dependency resolution failed: ", e) from e - - @staticmethod - def _get_dep_list(): - try: - task_suite = TaskRunner._loaded - # task_suite = TaskRunner._loaded.get(base_class_name) - except AttributeError as e: - raise Exception("Oops i messed up", e) - - if type(task_suite).__name__ != "dict": - # print(TaskRunner._loaded) - raise AttributeError(f"Expected a dict, got: {task_suite}: ") - return task_suite - - @staticmethod - def _get_dep(dep_name): - task_suite = TaskRunner._get_dep_list() - # print(TaskRunner._loaded) - if len(task_suite.keys()) == 0: - raise AttributeError( - f"Dependency '{dep_name}' not set in {task_suite_name}", cls_name - ) - task = task_suite.get(dep_name) - if typename(task) not in ["SuiteTask", "SuiteSubTask"]: - raise AttributeError(f"Dependency '{dep_name}' not found in task_suite") - return task - - @staticmethod - def is_loaded(deps: List[SuiteTask]): - """ - Validates if all required task types exist in the TaskRunner queue. - TaskRunner.queue is expected to be a set or list of SuiteTask instances. - """ - # Extract the classes of the tasks currently in the queue - if len(deps) == 0: - return True - results = [] - for dep in deps: - # Get the base class name - base_class = dep.__bases__[0].__name__ - - # Lookup the task in that queue - dep_list = TaskRunner._get_dep_list().values() - - dep_name_list = [type(task).__name__ for task in dep_list] - - print(dep.__name__) # The dependency's name - print(dep_name_list) - - # for d in dep_base_classes: - - # Is the dep in that task list? - if dep.__name__ in dep_name_list: - results.append(True) - for result in results: - if not result: - return False - - return True - - def _sanity_check(self): - if self._owner is None: - raise ValueError("Owner is not set") - if self._parent is None: - raise ValueError("Parent is not set") - - def _run(self): - self._sanity_check() - all_tasks = self._all_tasks - - if self.tasks is not None and len(self.tasks) > 0: - TaskRunner._queue = [all_tasks[i] for i in self.tasks if i < len(all_tasks)] - else: - skip_set = self._skip or set() - TaskRunner._queue = [ - task for i, task in enumerate(all_tasks) if i not in skip_set - ] - queue = TaskRunner._queue - - num_tasks = len(queue) - if num_tasks < 1: - self.print(all_tasks) - self.print(queue) - self.print(self.tasks) - self.print(self.skip_task()) - self.fail("No tasks queued.") - - self.print(f"Queue initialized with {len(queue)} tasks") - - self._runner(queue) - - def _runner(self, queue): - # Execute the filtered queue - selected_task = self.get_arg("task") - for task in queue: - from lib.task_types import typename - - task_name = typename(task) - - if selected_task is None or task_name == selected_task: - self.last_task = task_name - try: - # current_stage = self.current_stage - # task_stage = task.get_stage() - # if task_stage is not current_stage: - # continue - if task.run() is False: - self.fail(f"Pipeline stopped at task: {task.name}") - except PipelineSuccess as e: - self.print(e) - break - except ModuleNotFoundError as e: - self.print(f" [ERROR] Task {task.name} failed: {e}") - self.fail(f"Pipeline stopped at task: {self.last_task}") - except TaskError as e: - self.print(f" [ERROR] Task {task.name} failed: {e}") - self.fail(f"Pipeline stopped at task: {self.last_task}") diff --git a/deployment/core/tasks.py b/deployment/core/tasks.py deleted file mode 100644 index b31dbff..0000000 --- a/deployment/core/tasks.py +++ /dev/null @@ -1,281 +0,0 @@ -import os -import time -import tomllib -from lupa import LuaRuntime -from pathlib import Path -from deployment_pipeline.lib.task_types import DeploymentTask -from pipeline_runner.lib.types import Stage - - -class GetDeploymentConfig(DeploymentTask): - - _stage = Stage.BOOTSTRAP - _deps = [] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Get the deployment configuration" - - def _run(self): - # 1. Load Lua - lua = LuaRuntime(unpack_returned_tuples=True) - config_path = self.get_arg("config") - - self.print("Reading file: ", config_path) - with open(config_path, "r") as f: - try: - lua_content = f.read() - cfg = lua.execute(lua_content) - except Exception as e: - self.fail("Failed to load deployment config: ", e) - - # 4. Hydrate self.env - self.cfg = cfg # Store the lua object for functional calls later - self.env.lua_cfg = cfg # Store the lua object for functional calls later - self.env.app_name = cfg.app_name - self.env.repo = cfg.repo - self.env.timestamp_format = cfg.timestamp_format - self.env.yarn_path = cfg.yarn_path - self.env.corepack_home = cfg.corepack_home - self.env.user = cfg.user - - self.env.deploy_branch = self.get_arg("branch").split("/")[-1] - self.env.release = cfg.release - self.env.testing = cfg.testing - - self.print(f"✅ Context hydrated for {self.env.app_name}") - # self.env.build_dir = Path(config.paths.build) - return True - - -class LoadServerConfig(DeploymentTask): - """Verifies TOML existence and hydrates the environment with health check URI components""" - - _stage = Stage.BOOTSTRAP - _deps = [GetDeploymentConfig] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Verify and Hydrate Server Configuration" - - def _run(self): - # 1. Physical existence check - self.env.toml["release"] = self.get_config("release") - self.env.toml["testing"] = self.get_config("testing") - - def get_config(self, env_type): - config_file = getattr(self.env, env_type).config_file - - self.print(f" [CHECK] Verifying {env_type} configuration: {config_file}") - - if not os.path.exists(config_file): - self.fail(f"CRITICAL: {env_type} config not found at {config_file}.") - # 1. Physical existence check - - # 2. Parse TOML for internal deployment metadata - try: - with open(config_file, "rb") as f: - data = tomllib.load(f) - - return { - "public": self.get_server_cfg(data, "public"), - "network": self.get_server_cfg(data, "network"), - } - - except Exception as e: - self.fail(f"FAILED to parse {env_type} TOML: ", e) - - def get_server_cfg(self, data, server_type): - try: - server = data.get(server_type) - - # 3. Hydrate self.env for HealthCheck and WaitForReadiness tasks - config = { - "schema": server.get("schema"), - "domain": server.get("domain"), - "address": server.get("address"), - "port": str(server.get("port")), - } - health_path = data.get("meta").get("health_check") - - if server_type == "network": - config["loc"] = config.get("address") - elif server_type == "public": - config["loc"] = config.get("domain") - - config["health_endpoint"] = ( - f"{config['schema']}://{config['loc']}:" - f"{config['port']}{health_path}" - ) - - self.print( - f" [READY] {server_type} Health URI: {config['health_endpoint']}" - ) - return config - - except Exception as e: - self.fail(f"FAILED to parse {server_type} TOML: ", e) - - -class PipelineSuccess(Exception): - pass - - -class HotFix(DeploymentTask): - """Bypasses the full build to update the current live deployment""" - - _stage = Stage.DEPLOY - _deps = [GetDeploymentConfig, LoadServerConfig] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Hot fix" - - def _run(self): - if not self.get_arg("hotfix"): - return - - cfg = self.env.release - # 1. Target the current active symlink - live_path = self.env.release.deploy_link - - # 2. Pull changes - try: - self.sh( - "git pull origin " + self.env.deploy_branch, - cwd=live_path, - handle_exception=False, - ) - except: - self.sh("git fetch origin ", cwd=live_path) - self.sh("git reset --hard origin/" + self.env.deploy_branch, cwd=live_path) - - # 3. Quick Asset Rebuild (Skip yarn install unless package.json changed) - # We check for changes in package.json to decide if we need a full install - self.sh("yarn combine:css", cwd=live_path) - - # 4. Restart to pick up Node.js changes - self.sh(f"sudo systemctl restart {cfg.service_name}", shlex=True) - - raise PipelineSuccess("Hot fix applied successfully") - - -class YarnBuild(DeploymentTask): - """Executes dependency installation and asset compilation""" - - _stage = Stage.BUILD - _deps = [GetDeploymentConfig, LoadServerConfig] - skip: bool = False - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Running Yarn build process" - - def _run(self): - timestamp = time.strftime(self.env.timestamp_format) - self.env.release_dir = f"{Path(self.env.testing.deploy_link)}-{timestamp}" - - build_git_path = os.path.join(self.env.build_dir, ".git") - - self.print("Build git path:", build_git_path) - - if os.path.exists(build_git_path): - try: - self.sh( - f"git pull origin {self.env.deploy_branch}", - cwd=self.env.build_dir, - handle_exception=False, - ) - except: - self.sh("git fetch origin", cwd=self.env.build_dir) - self.sh( - f"git reset --hard origin/{self.env.deploy_branch}", - cwd=self.env.build_dir, - ) - else: - self.sh( - f"git clone --branch {self.env.deploy_branch} {self.env.repo} {self.env.build_dir}" - ) - self.sh("git submodule update --init --recursive", cwd=self.env.build_dir) - self.sh("yarn config set enableGlobalCache true", cwd=self.env.build_dir) - self.sh( - f"yarn config set globalFolder {self.env.yarn_path}", cwd=self.env.build_dir - ) - self.sh("yarn config set nodeLinker pnp", cwd=self.env.build_dir) - self.sh("yarn install", cwd=self.env.build_dir) - self.sh("yarn combine:css", cwd=self.env.build_dir) - return True - - -class AtomicDeploy(DeploymentTask): - """Performs rsync to release directory and updates environment symlink""" - - _stage = Stage.DEPLOY - _deps = [YarnBuild] - skip: bool = False - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Executing atomic symlink swap" - - def _run(self): - # Determine success from the TestRunner flag - test_success = getattr(self.env, "test_success", False) or not self.get_arg( - "enforce_testing" - ) - - # Select appropriate Lua config table - cfg = self.env.release if test_success else self.env.testing - - # Generate the versioned directory path using Lua function - # Note: Use the actual formatted timestamp, not the format string - timestamp = time.strftime(self.env.timestamp_format) - final_release_dir = Path(cfg.get_release_dir(timestamp)) - - # 1. Finalize the directory (Rename from -BUILDING to versioned path) - self.sh(f"mv {self.env.build_dir} {final_release_dir}", shlex=True) - - # 2. Atomic Symlink Swap - ONLY if tests passed - if test_success: - deploy_link = Path(cfg.deploy_link) - # Create a temporary symlink name in the same parent directory - temp_link = deploy_link.with_name(deploy_link.name + "_tmp") - - # Create temporary symlink pointing to the new version - self.sh(f"ln -sfn {final_release_dir} {temp_link}", shlex=True) - - # Atomic rename of the symlink itself (overwrites the old link) - self.sh(f"mv -Tf {temp_link} {deploy_link}", shlex=True) - self.sh("yarn", cwd=final_release_dir) - - # Restart service - self.sh(f"sudo systemctl restart {cfg.service_name}", shlex=True) - else: - self.print(" [SKIP] Test failure detected. Symlink swap bypassed.") - self.print(f" [INFO] Artifact preserved at: {final_release_dir}") - self.fail() - - return True - - -class HealthCheck(DeploymentTask): - """Polls the local production service endpoint""" - - _stage = Stage.DEPLOY - _deps = [AtomicDeploy] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Verifying service health" - - def _run(self): - # Base run handles dry_run check already - uri = self.env.toml["release"]["network"]["health_endpoint"] - - status = self.poll_health_endpoint(uri, label="Production Service") - if self.do_dry_run(): - return - if not status: - self.fail(f"Production service failed health check at {uri}") - - return True diff --git a/deployment/core/tests.py b/deployment/core/tests.py deleted file mode 100644 index 7297e6a..0000000 --- a/deployment/core/tests.py +++ /dev/null @@ -1,164 +0,0 @@ -import time -import shlex - -from pipeline_runner.lib.task_types import SuiteTask, SuiteSubTask -from pipeline_runner.lib.types import Stage -from pipeline_runner.core.tasks import YarnBuild -from pipeline_runner.core.task_runner import TaskRunner - - -class StartTestApp(SuiteSubTask): - """Spins up the application in the build directory for integration testing""" - - _stage = Stage.TEST - _deps = [YarnBuild] - - def __init__(self, *args, **kwargs): - self.name = "Start Application for Test" - super().__init__(*args, **kwargs) - - def _run(self): - if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"): - self.print(" [SKIP] Skipping per user request.") - return True - - self.print(f" [EXEC] Starting app in {self.env.build_dir}") - - cmd = f"nohup sudo -u {self.env.user} yarn run prod --config {self.env.testing.config_file} >> '{self.env.test_log}' 2>&1 & echo $! > '{self.env.pidfile}'" - # This doesn't work because systemd doesnt know where it is yet - # cmd=f"sudo systemctl restart {self.env.testing.service_name}", - self.sh( - cmd, - cwd=self.env.build_dir, - # shlex=True, - ) - return True - - -class WaitForReadiness(SuiteSubTask): - """Polls the health endpoint of the TEST instance""" - - _stage = Stage.TEST - _deps = [StartTestApp] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Wait for Service Readiness" - - def _run(self): - if self._owner.args.get("skip_tests") and not self.get_arg( - "enforce_testing", True - ): - return True - - uri = self.env.toml["testing"]["network"]["health_endpoint"] - - status = self.poll_health_endpoint(uri, label="Test Service") - if self.do_dry_run(): - return - if not status: - # If the poll fails, we cat the log as requested before failing - self.sh(f"cat '{self.env.test_log}'", disabled=True) - self.fail(f"Test service at {uri} failed to start.") - - return True - - -class RunMochaTests(SuiteSubTask): - """Executes the actual test suite against the running instance""" - - _stage = Stage.TEST - _deps = [WaitForReadiness] - - def __init__(self, *args, **kwargs): - self.name = "Run Tests" - super().__init__(*args, **kwargs) - - def _run(self): - if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"): - return True - - # Using sh_thread to ensure real-time log streaming for Jenkins - self.sh_thread("yarn test:postreceive", cwd=self.env.build_dir) - return True - - -class StopTestApp(SuiteSubTask): - """Cleans up the test process regardless of test outcome""" - - _stage = Stage.TEST - _deps = [RunMochaTests] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Stop Test App" - - def _run(self): - self.sh(f"whoami") - self.sh(f"id") - self.sh(f"kill $(cat '{self.env.pidfile}') || true", shlex=False) - # self.sh( - # f"sudo systemctl stop {self.env.testing.service_name}", - # shlex=False, - # ) - return True - - -class TestRunner(SuiteTask): - """ - Sub-orchestrator for the Integration Testing lifecycle. - Manages the environment setup, execution, and teardown for Mocha tests. - """ - - _stage = Stage.TEST - _deps = [] # Dependent on YarnBuild completion in the main TaskRunner - skip: bool = False - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = "Integration Test Runner" - sub_tasks = [StartTestApp, WaitForReadiness, RunMochaTests, StopTestApp] - - runner = TaskRunner(self, owner=self._owner, base_class_name="TestRunner") - runner.queue_tasks(sub_tasks).run() - - def _run(self): - - # 1. Check if we should even be here - skip_param = self.args.get("skip_tests", False) - enforced = self.get_arg("enforce_testing") - - if skip_param and not enforced: - self.print(" [SKIP] Integration tests bypassed by user flag.") - return True - - self.print(f"--- Entering Stage: {self._stage.value.upper()} ---") - - # 2. Sequential Execution - # We manually iterate to maintain control over the 'StopTestApp' cleanup - success = True - try: - for task_class in self._sub_tasks: - # Instantiate as SubTask to maintain ID hierarchy (e.g., [4.1], [4.2]) - task = task_class(parent=self, owner=self._owner) - - if task.run() is False: - success = False - self.print(f" [FAIL] Test suite halted at: {task.name}") - break - - except AttributeError as e: - success = False - self.fail(f" [ERROR] failure during test execution: {e}") - except Exception as e: - success = False - self.fail(f" [ERROR] Critical failure during test execution: {e}") - - finally: - if not self.do_dry_run(): - # 3. Forced Teardown - # If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted - self.print(" [CLEAN] Ensuring test environment teardown...") - cleanup = StopTestApp(parent=self, owner=self._owner) - cleanup.run() - self.env.test_success = success diff --git a/deployment/lib/__init__.py b/deployment/lib/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/deployment/lib/__init__.py +++ /dev/null diff --git a/deployment/lib/task_types.py b/deployment/lib/task_types.py deleted file mode 100755 index 5e4ff56..0000000 --- a/deployment/lib/task_types.py +++ /dev/null @@ -1,98 +0,0 @@ -import time -from typing import TYPE_CHECKING - -from pipeline_runner.lib.types import typename -from pipeline_runner.lib.task_types import SuiteTask, Task as BaseTask, SuiteSubTask - - -if TYPE_CHECKING: - from deployment_pipeline.core.suite import DeploymentSuite - from deployment_pipeline.lib.task_types import BlogDeploySuite - - -class DeploymentTask(BaseTask): - pass - - -class DeploymentSuiteTask(SuiteTask): - _owner: "BlogDeploySuite" - _parent: "DeploymentTask" - - def skip_task(self): - if self._deps and not self.deps_loaded(): - self.print(f" [INFO] Skipping {self.name}: Dependencies not met.") - return True - if self.skip: - return True - - return False - - def run(self): - self.print("Running ", typename(self)) - try: - if len(self._deps) > 0: - - from deployment_pipeline.core.task_runner import TaskRunner - - TaskRunner.add_deps(self) - TaskRunner.run_deps(self) - except Exception as e: - raise Exception(f"Failed to load dependency for {typename(self)}: {e}") - return self._run() - - def deps_loaded(self): - if isinstance(self, SuiteSubTask): - return True - - from deployment_pipeline.core.task_runner import TaskRunner - - return TaskRunner.is_loaded(self._deps) - - def poll_health_endpoint(self, uri, retries=5, delay=5, label="Service"): - """Shared polling logic for verifying service availability""" - self.print(f" [POLL] Verifying {label} Health: {uri}") - - if self.do_dry_run(): - retries = 0 - - for _ in range(retries): - try: - # Use sh to maintain consistency in logs/dry-runs - # We use graceful=False but handle the boolean return in the loop - res = self.sh( - f"curl -s -I {uri} | grep '200 OK'", handle_exception=False - ) - - if res and res.returncode == 0: - self.print(f" [OK] {label} is healthy.") - return True - else: - self.print("Got result :", res) - except Exception as e: - - self.print(f" [WAIT] {label} not ready... {e}") - self.print(e.__dict__) - - time.sleep(delay) - - return False - - -class DeploymentSubTask(SuiteSubTask): - _owner: "DeploymentSuite" - _parent: DeploymentSuiteTask - - def run(self): - self.print("Running ", typename(self)) - try: - if len(self._deps) > 0: - - from deployment_pipeline.core.task_runner import TaskRunner - - TaskRunner.load_deps( - self, - self._deps, - ) - except Exception as e: - raise Exception(f"Failed to load dependency for {typename(self)}: {e}") - return self._run() diff --git a/deployment/lib/types.py b/deployment/lib/types.py deleted file mode 100644 index be5cfba..0000000 --- a/deployment/lib/types.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -from pathlib import Path - - -class BuildEnv: - timestamp_format: str = "%Y%m%d-%H%M%S" - workspace: Path - timestamp: str - deploy_branch: str - deploy_path: Path - build_dir: Path - service_name: str - release_dir: Path - test_endpoint_uri: str - pidfile: Path = Path() - test_log: Path = Path() - toml: dict = {} - release: dict = {} - testing: dict = {} - yarn_path: Path - corepack_home: Path - user: str - - def __init__(self, timestamp_format: str | None = None): - self.workspace: Path = Path() - self.timestamp: str = "" - self.deploy_branch: str = "" - self.deploy_path: Path = Path() - self.service_name: str = "" - self.release_dir: Path = Path() - self.yarn_path: Path = Path() - self.corepack_home: Path = Path() - self.server_schema = "http" - self.server_address = "localhost" - self.pidfile = Path("/tmp/blog_test.pid") - self.user: str = "" - - self.root_dir = os.getcwd() - if timestamp_format is not None: - self.timestamp_format = timestamp_format - self.workspace = Path(os.getenv("WORKSPACE", self.root_dir)) - self.build_dir = self.workspace / "build" - - self.test_log: Path = self.build_dir / "test_log.log" diff --git a/deployment/main.py b/deployment/main.py deleted file mode 100644 index 37b1b78..0000000 --- a/deployment/main.py +++ /dev/null @@ -1,31 +0,0 @@ -import sys - - -from deployment_pipeline.core.suite import DeploymentSuite - - -def main(): - - runner = DeploymentSuite() - exit_code = 0 - - import traceback - - try: - runner.run() - print("🚀 Deployment Successful") - exit_code = 0 - except KeyboardInterrupt: - runner.dump_print_queue() - traceback.print_exc() - runner.print("\n[System] Termination signal received. Cleaning up...") - exit_code = 0 - except Exception as e: - runner.dump_print_queue() - traceback.print_exc() - print(f"❌ Deployment Failed at: {e.with_traceback(e.__traceback__)}") - exit_code = 1 - if exit_code != 0: - print(exit_code) - sys.exit(exit_code or 1) - runner.dump_print_queue() diff --git a/pyproject.toml b/pyproject.toml index 7fd93b9..16cb225 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,3 +20,15 @@ "pytest==9.0.2", "pytest-cov==7.1.0", ] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] +include = ["deployment*"] + +[tool.pytest.ini_options] +pythonpath = ["src"] +testpaths = ["tests/deployment/tests"] +python_files = ["test_*.py", "*_test.py"] diff --git a/src/deployment/__init__.py b/src/deployment/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/deployment/__init__.py diff --git a/src/deployment/__main__.py b/src/deployment/__main__.py new file mode 100644 index 0000000..0d3ea05 --- /dev/null +++ b/src/deployment/__main__.py @@ -0,0 +1,4 @@ +if __name__ == "__main__": + from deployment_pipeline.main import main + + main() diff --git a/src/deployment/core/__init__.py b/src/deployment/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/deployment/core/__init__.py diff --git a/src/deployment/core/bootstrap.py b/src/deployment/core/bootstrap.py new file mode 100644 index 0000000..ef3ca0b --- /dev/null +++ b/src/deployment/core/bootstrap.py @@ -0,0 +1,26 @@ +import shutil + +from pipeline_runner.lib.types import Stage +from pipeline_runner.lib.task_types import SuiteTask + + +class VerifySystemDependencies(SuiteTask): + _can_skip = False + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verifying System Dependencies" + + def _run(self): + """Verifies non-Python dependencies required for the C++ Core.""" + if self._owner._in_nix_shell: + self.print("Skipping: in nix shell") + return True + + deps = ["yarn", "git", "rsync", "curl", "node"] + for dep in deps: + if not shutil.which(dep): + self.fail(f"Missing system dependency: {dep}") + self.printer.print(" [OK] System tools detected.") + return True diff --git a/src/deployment/core/suite.py b/src/deployment/core/suite.py new file mode 100644 index 0000000..046afa0 --- /dev/null +++ b/src/deployment/core/suite.py @@ -0,0 +1,127 @@ +import argparse +from pathlib import Path + +from pipeline_runner.lib.exceptions import SuiteError +from pipeline_runner.lib.task_types import SuiteTask, Task +from pipeline_runner.core.bootstrap import * + +from deployment_pipeline.core.task_runner import * + +from deployment_pipeline.lib.types import BuildEnv + +# from core.suite import * + + +def load_parser(): + parser = argparse.ArgumentParser(description="Blog Deployment Suite") + + parser.add_argument("--config", required=True) + parser.add_argument("--task", type=str, default="HealthCheck") + parser.add_argument("--hotfix", action="store_true") + parser.add_argument("--full-pipeline", action="store_true") + parser.add_argument("--branch", required=True) + parser.add_argument("--root", type=str, help="The root directory of the project") + parser.add_argument("--stage", type=str, help="Run a specific stage of the build") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a trial run without executing tasks", + ) + print("ran parser") + + # Capture specific test names: e.g., --tests ArtifactsTest.Cleanup FullChainTest.EndToEnd + parser.add_argument( + "--tests", + nargs="+", + type=str, + default=[], + help="List of specific test names", + ) + + # Capture a regex filter: e.g., --filter Artifacts.* + parser.add_argument( + "--filter", type=str, help="Filter tests with regex (maps to ctest -R)" + ) + + # --tasks 1 2 5 + parser.add_argument("--tasks", nargs="+", type=int, help="List of task IDs to run") + + # --skip 0 3 + parser.add_argument("--skip", nargs="+", type=int, help="List of task IDs to skip") + return parser + + +class DeploymentSuite(SuiteTask): + """ + Orchestrates the Hexascript logic verification pipeline. + Replaces tdd_loop.sh with zero subprocess overhead for Python logic. + """ + + name = "Deployment Test Runner" + root_dir: Path | None + _in_nix_shell: bool + _owner: "DeploymentSuite" + + def __init__(self, *args, root: str | None = None, **kwargs): + self.disable_dry_run() + self.parser = None + self._in_nix_shell = False + self.engine = None + self.env = BuildEnv() + self.args: dict = dict() + self.toml: dict = dict() + + self._owner = self + self._parser() + Task.owner = self + super().__init__(self, *args, owner=self, *kwargs) + + self._parent = self + + self.root_dir = Path(root) if root else None + + self.kwargs = kwargs + + def _parser(self): + + parser = load_parser() + self._owner.args = vars(parser.parse_args()) + + def initialized(): + print("Parser already initialized") + + self._parser = initialized + + def fail(self, *args, **kwargs): + """Helper to raise the state-aware exception.""" + raise SuiteError(self, *args, **kwargs) + + def _run(self): + from core.tests import TestRunner + + from core.tasks import ( + GetDeploymentConfig, + LoadServerConfig, + HotFix, + YarnBuild, + AtomicDeploy, + HealthCheck, + PipelineSuccess, + ) + + all_tasks = [ + CheckNix, + VerifySystemDependencies, + GetDeploymentConfig, + LoadServerConfig, + HotFix, + EnsureBuildPaths, + YarnBuild, + TestRunner, + AtomicDeploy, + HealthCheck, + ] + Task.__init__(self._owner, all_tasks) + task = self.get_arg("task") + Task.run(task) + return self diff --git a/src/deployment/core/task_runner.py b/src/deployment/core/task_runner.py new file mode 100755 index 0000000..ed0bda4 --- /dev/null +++ b/src/deployment/core/task_runner.py @@ -0,0 +1,220 @@ +from os import wait +from typing import List + +from pipeline_runner.lib.types import Stage, typename +from pipeline_runner.lib.exceptions import TaskError +from pipeline_runner.lib.task_types import SuiteTask + +from pipeline_runner.core.bootstrap import * +from deployment_pipeline.core.task_runner import * +from pipeline_runner.core.suite import * + + +from deployment_pipeline.core.tasks import PipelineSuccess + + +class TaskRunner(SuiteTask): + _stage = Stage.BOOTSTRAP + skip: list | None + tasks: list | None # Input from cli + _all_tasks: List[SuiteTask] + _queue: List = [] + _loaded: dict = {} + _initialized: bool = False + base_class_name: str + + def __init__( + self, + *args, + owner: "DeploymentSuite", + skip: list | None = None, + tasks: list | None = None, + stage: Stage = Stage.ANY, + base_class_name="TaskRunner", + **kwargs, + ): + super().__init__(owner, *args, owner=owner, **kwargs) + self.last_task = None + self.disable_dry_run() + + self.name = "Task Runner" + self._skip = skip + self._all_tasks: List = [] + self.tasks = tasks + self.current_stage = stage + self.base_class_name = base_class_name + + def queue_tasks(all_tasks): + return self._queue_tasks(all_tasks, *args, **kwargs) + + self.queue_tasks = queue_tasks + if TaskRunner._initialized: + return + + TaskRunner._initialized = True + + def _queue_tasks(self, all_tasks, *args, cls_name="SuiteTask", **kwargs): + selected_task = self.get_arg("tasks") + if selected_task: + self.print("Selected Task: ", type(selected_task)) + tasks = [task for task in all_tasks if task.__name__ == selected_task] + else: + tasks = all_tasks + + task_suite = TaskRunner._loaded + + for _task in tasks: + try: + task = _task(self._owner, *args, owner=self._owner, **kwargs) + self._all_tasks.append(task) + task_suite[typename(task)] = task + except Exception as e: + raise Exception("Task initialization failed: ", e) from e + return self + + @staticmethod + def load_deps(parent, deps: List[SuiteTask], cls_name="SuiteTask"): + from lib.task_types import typename + + parent.print(" Loading deps for ", typename(parent)) + for dep in deps: + # print("test", dep) + dep_name = dep.__name__ + parent.print(" dep: ", dep_name) + try: + if not TaskRunner.is_loaded(deps): + raise Exception("This dependency was not initialized.", deps) + elif dep.skip: + raise Exception("This dependency was marked as 'skip'.") + + dep_list = TaskRunner._get_dep_list() + # print("test", dep_list) + if dep_name not in dep_list.keys(): + + # parent.print(f"Running {type(dep)}", end="") + # parent.print(f"Running {dep}", end="") + + task = TaskRunner._get_dep(dep_name) + + task.run() + parent.print("Complete.") + + except Exception as e: + raise Exception("Dependency resolution failed: ", e) from e + + @staticmethod + def _get_dep_list(): + try: + task_suite = TaskRunner._loaded + # task_suite = TaskRunner._loaded.get(base_class_name) + except AttributeError as e: + raise Exception("Oops i messed up", e) + + if type(task_suite).__name__ != "dict": + # print(TaskRunner._loaded) + raise AttributeError(f"Expected a dict, got: {task_suite}: ") + return task_suite + + @staticmethod + def _get_dep(dep_name): + task_suite = TaskRunner._get_dep_list() + # print(TaskRunner._loaded) + if len(task_suite.keys()) == 0: + raise AttributeError( + f"Dependency '{dep_name}' not set in {task_suite_name}", cls_name + ) + task = task_suite.get(dep_name) + if typename(task) not in ["SuiteTask", "SuiteSubTask"]: + raise AttributeError(f"Dependency '{dep_name}' not found in task_suite") + return task + + @staticmethod + def is_loaded(deps: List[SuiteTask]): + """ + Validates if all required task types exist in the TaskRunner queue. + TaskRunner.queue is expected to be a set or list of SuiteTask instances. + """ + # Extract the classes of the tasks currently in the queue + if len(deps) == 0: + return True + results = [] + for dep in deps: + # Get the base class name + base_class = dep.__bases__[0].__name__ + + # Lookup the task in that queue + dep_list = TaskRunner._get_dep_list().values() + + dep_name_list = [type(task).__name__ for task in dep_list] + + print(dep.__name__) # The dependency's name + print(dep_name_list) + + # for d in dep_base_classes: + + # Is the dep in that task list? + if dep.__name__ in dep_name_list: + results.append(True) + for result in results: + if not result: + return False + + return True + + def _sanity_check(self): + if self._owner is None: + raise ValueError("Owner is not set") + if self._parent is None: + raise ValueError("Parent is not set") + + def _run(self): + self._sanity_check() + all_tasks = self._all_tasks + + if self.tasks is not None and len(self.tasks) > 0: + TaskRunner._queue = [all_tasks[i] for i in self.tasks if i < len(all_tasks)] + else: + skip_set = self._skip or set() + TaskRunner._queue = [ + task for i, task in enumerate(all_tasks) if i not in skip_set + ] + queue = TaskRunner._queue + + num_tasks = len(queue) + if num_tasks < 1: + self.print(all_tasks) + self.print(queue) + self.print(self.tasks) + self.print(self.skip_task()) + self.fail("No tasks queued.") + + self.print(f"Queue initialized with {len(queue)} tasks") + + self._runner(queue) + + def _runner(self, queue): + # Execute the filtered queue + selected_task = self.get_arg("task") + for task in queue: + from lib.task_types import typename + + task_name = typename(task) + + if selected_task is None or task_name == selected_task: + self.last_task = task_name + try: + # current_stage = self.current_stage + # task_stage = task.get_stage() + # if task_stage is not current_stage: + # continue + if task.run() is False: + self.fail(f"Pipeline stopped at task: {task.name}") + except PipelineSuccess as e: + self.print(e) + break + except ModuleNotFoundError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") + except TaskError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") diff --git a/src/deployment/core/tasks.py b/src/deployment/core/tasks.py new file mode 100644 index 0000000..b31dbff --- /dev/null +++ b/src/deployment/core/tasks.py @@ -0,0 +1,281 @@ +import os +import time +import tomllib +from lupa import LuaRuntime +from pathlib import Path +from deployment_pipeline.lib.task_types import DeploymentTask +from pipeline_runner.lib.types import Stage + + +class GetDeploymentConfig(DeploymentTask): + + _stage = Stage.BOOTSTRAP + _deps = [] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Get the deployment configuration" + + def _run(self): + # 1. Load Lua + lua = LuaRuntime(unpack_returned_tuples=True) + config_path = self.get_arg("config") + + self.print("Reading file: ", config_path) + with open(config_path, "r") as f: + try: + lua_content = f.read() + cfg = lua.execute(lua_content) + except Exception as e: + self.fail("Failed to load deployment config: ", e) + + # 4. Hydrate self.env + self.cfg = cfg # Store the lua object for functional calls later + self.env.lua_cfg = cfg # Store the lua object for functional calls later + self.env.app_name = cfg.app_name + self.env.repo = cfg.repo + self.env.timestamp_format = cfg.timestamp_format + self.env.yarn_path = cfg.yarn_path + self.env.corepack_home = cfg.corepack_home + self.env.user = cfg.user + + self.env.deploy_branch = self.get_arg("branch").split("/")[-1] + self.env.release = cfg.release + self.env.testing = cfg.testing + + self.print(f"✅ Context hydrated for {self.env.app_name}") + # self.env.build_dir = Path(config.paths.build) + return True + + +class LoadServerConfig(DeploymentTask): + """Verifies TOML existence and hydrates the environment with health check URI components""" + + _stage = Stage.BOOTSTRAP + _deps = [GetDeploymentConfig] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verify and Hydrate Server Configuration" + + def _run(self): + # 1. Physical existence check + self.env.toml["release"] = self.get_config("release") + self.env.toml["testing"] = self.get_config("testing") + + def get_config(self, env_type): + config_file = getattr(self.env, env_type).config_file + + self.print(f" [CHECK] Verifying {env_type} configuration: {config_file}") + + if not os.path.exists(config_file): + self.fail(f"CRITICAL: {env_type} config not found at {config_file}.") + # 1. Physical existence check + + # 2. Parse TOML for internal deployment metadata + try: + with open(config_file, "rb") as f: + data = tomllib.load(f) + + return { + "public": self.get_server_cfg(data, "public"), + "network": self.get_server_cfg(data, "network"), + } + + except Exception as e: + self.fail(f"FAILED to parse {env_type} TOML: ", e) + + def get_server_cfg(self, data, server_type): + try: + server = data.get(server_type) + + # 3. Hydrate self.env for HealthCheck and WaitForReadiness tasks + config = { + "schema": server.get("schema"), + "domain": server.get("domain"), + "address": server.get("address"), + "port": str(server.get("port")), + } + health_path = data.get("meta").get("health_check") + + if server_type == "network": + config["loc"] = config.get("address") + elif server_type == "public": + config["loc"] = config.get("domain") + + config["health_endpoint"] = ( + f"{config['schema']}://{config['loc']}:" + f"{config['port']}{health_path}" + ) + + self.print( + f" [READY] {server_type} Health URI: {config['health_endpoint']}" + ) + return config + + except Exception as e: + self.fail(f"FAILED to parse {server_type} TOML: ", e) + + +class PipelineSuccess(Exception): + pass + + +class HotFix(DeploymentTask): + """Bypasses the full build to update the current live deployment""" + + _stage = Stage.DEPLOY + _deps = [GetDeploymentConfig, LoadServerConfig] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Hot fix" + + def _run(self): + if not self.get_arg("hotfix"): + return + + cfg = self.env.release + # 1. Target the current active symlink + live_path = self.env.release.deploy_link + + # 2. Pull changes + try: + self.sh( + "git pull origin " + self.env.deploy_branch, + cwd=live_path, + handle_exception=False, + ) + except: + self.sh("git fetch origin ", cwd=live_path) + self.sh("git reset --hard origin/" + self.env.deploy_branch, cwd=live_path) + + # 3. Quick Asset Rebuild (Skip yarn install unless package.json changed) + # We check for changes in package.json to decide if we need a full install + self.sh("yarn combine:css", cwd=live_path) + + # 4. Restart to pick up Node.js changes + self.sh(f"sudo systemctl restart {cfg.service_name}", shlex=True) + + raise PipelineSuccess("Hot fix applied successfully") + + +class YarnBuild(DeploymentTask): + """Executes dependency installation and asset compilation""" + + _stage = Stage.BUILD + _deps = [GetDeploymentConfig, LoadServerConfig] + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Running Yarn build process" + + def _run(self): + timestamp = time.strftime(self.env.timestamp_format) + self.env.release_dir = f"{Path(self.env.testing.deploy_link)}-{timestamp}" + + build_git_path = os.path.join(self.env.build_dir, ".git") + + self.print("Build git path:", build_git_path) + + if os.path.exists(build_git_path): + try: + self.sh( + f"git pull origin {self.env.deploy_branch}", + cwd=self.env.build_dir, + handle_exception=False, + ) + except: + self.sh("git fetch origin", cwd=self.env.build_dir) + self.sh( + f"git reset --hard origin/{self.env.deploy_branch}", + cwd=self.env.build_dir, + ) + else: + self.sh( + f"git clone --branch {self.env.deploy_branch} {self.env.repo} {self.env.build_dir}" + ) + self.sh("git submodule update --init --recursive", cwd=self.env.build_dir) + self.sh("yarn config set enableGlobalCache true", cwd=self.env.build_dir) + self.sh( + f"yarn config set globalFolder {self.env.yarn_path}", cwd=self.env.build_dir + ) + self.sh("yarn config set nodeLinker pnp", cwd=self.env.build_dir) + self.sh("yarn install", cwd=self.env.build_dir) + self.sh("yarn combine:css", cwd=self.env.build_dir) + return True + + +class AtomicDeploy(DeploymentTask): + """Performs rsync to release directory and updates environment symlink""" + + _stage = Stage.DEPLOY + _deps = [YarnBuild] + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Executing atomic symlink swap" + + def _run(self): + # Determine success from the TestRunner flag + test_success = getattr(self.env, "test_success", False) or not self.get_arg( + "enforce_testing" + ) + + # Select appropriate Lua config table + cfg = self.env.release if test_success else self.env.testing + + # Generate the versioned directory path using Lua function + # Note: Use the actual formatted timestamp, not the format string + timestamp = time.strftime(self.env.timestamp_format) + final_release_dir = Path(cfg.get_release_dir(timestamp)) + + # 1. Finalize the directory (Rename from -BUILDING to versioned path) + self.sh(f"mv {self.env.build_dir} {final_release_dir}", shlex=True) + + # 2. Atomic Symlink Swap - ONLY if tests passed + if test_success: + deploy_link = Path(cfg.deploy_link) + # Create a temporary symlink name in the same parent directory + temp_link = deploy_link.with_name(deploy_link.name + "_tmp") + + # Create temporary symlink pointing to the new version + self.sh(f"ln -sfn {final_release_dir} {temp_link}", shlex=True) + + # Atomic rename of the symlink itself (overwrites the old link) + self.sh(f"mv -Tf {temp_link} {deploy_link}", shlex=True) + self.sh("yarn", cwd=final_release_dir) + + # Restart service + self.sh(f"sudo systemctl restart {cfg.service_name}", shlex=True) + else: + self.print(" [SKIP] Test failure detected. Symlink swap bypassed.") + self.print(f" [INFO] Artifact preserved at: {final_release_dir}") + self.fail() + + return True + + +class HealthCheck(DeploymentTask): + """Polls the local production service endpoint""" + + _stage = Stage.DEPLOY + _deps = [AtomicDeploy] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verifying service health" + + def _run(self): + # Base run handles dry_run check already + uri = self.env.toml["release"]["network"]["health_endpoint"] + + status = self.poll_health_endpoint(uri, label="Production Service") + if self.do_dry_run(): + return + if not status: + self.fail(f"Production service failed health check at {uri}") + + return True diff --git a/src/deployment/core/tests.py b/src/deployment/core/tests.py new file mode 100644 index 0000000..7297e6a --- /dev/null +++ b/src/deployment/core/tests.py @@ -0,0 +1,164 @@ +import time +import shlex + +from pipeline_runner.lib.task_types import SuiteTask, SuiteSubTask +from pipeline_runner.lib.types import Stage +from pipeline_runner.core.tasks import YarnBuild +from pipeline_runner.core.task_runner import TaskRunner + + +class StartTestApp(SuiteSubTask): + """Spins up the application in the build directory for integration testing""" + + _stage = Stage.TEST + _deps = [YarnBuild] + + def __init__(self, *args, **kwargs): + self.name = "Start Application for Test" + super().__init__(*args, **kwargs) + + def _run(self): + if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"): + self.print(" [SKIP] Skipping per user request.") + return True + + self.print(f" [EXEC] Starting app in {self.env.build_dir}") + + cmd = f"nohup sudo -u {self.env.user} yarn run prod --config {self.env.testing.config_file} >> '{self.env.test_log}' 2>&1 & echo $! > '{self.env.pidfile}'" + # This doesn't work because systemd doesnt know where it is yet + # cmd=f"sudo systemctl restart {self.env.testing.service_name}", + self.sh( + cmd, + cwd=self.env.build_dir, + # shlex=True, + ) + return True + + +class WaitForReadiness(SuiteSubTask): + """Polls the health endpoint of the TEST instance""" + + _stage = Stage.TEST + _deps = [StartTestApp] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Wait for Service Readiness" + + def _run(self): + if self._owner.args.get("skip_tests") and not self.get_arg( + "enforce_testing", True + ): + return True + + uri = self.env.toml["testing"]["network"]["health_endpoint"] + + status = self.poll_health_endpoint(uri, label="Test Service") + if self.do_dry_run(): + return + if not status: + # If the poll fails, we cat the log as requested before failing + self.sh(f"cat '{self.env.test_log}'", disabled=True) + self.fail(f"Test service at {uri} failed to start.") + + return True + + +class RunMochaTests(SuiteSubTask): + """Executes the actual test suite against the running instance""" + + _stage = Stage.TEST + _deps = [WaitForReadiness] + + def __init__(self, *args, **kwargs): + self.name = "Run Tests" + super().__init__(*args, **kwargs) + + def _run(self): + if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"): + return True + + # Using sh_thread to ensure real-time log streaming for Jenkins + self.sh_thread("yarn test:postreceive", cwd=self.env.build_dir) + return True + + +class StopTestApp(SuiteSubTask): + """Cleans up the test process regardless of test outcome""" + + _stage = Stage.TEST + _deps = [RunMochaTests] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Stop Test App" + + def _run(self): + self.sh(f"whoami") + self.sh(f"id") + self.sh(f"kill $(cat '{self.env.pidfile}') || true", shlex=False) + # self.sh( + # f"sudo systemctl stop {self.env.testing.service_name}", + # shlex=False, + # ) + return True + + +class TestRunner(SuiteTask): + """ + Sub-orchestrator for the Integration Testing lifecycle. + Manages the environment setup, execution, and teardown for Mocha tests. + """ + + _stage = Stage.TEST + _deps = [] # Dependent on YarnBuild completion in the main TaskRunner + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Integration Test Runner" + sub_tasks = [StartTestApp, WaitForReadiness, RunMochaTests, StopTestApp] + + runner = TaskRunner(self, owner=self._owner, base_class_name="TestRunner") + runner.queue_tasks(sub_tasks).run() + + def _run(self): + + # 1. Check if we should even be here + skip_param = self.args.get("skip_tests", False) + enforced = self.get_arg("enforce_testing") + + if skip_param and not enforced: + self.print(" [SKIP] Integration tests bypassed by user flag.") + return True + + self.print(f"--- Entering Stage: {self._stage.value.upper()} ---") + + # 2. Sequential Execution + # We manually iterate to maintain control over the 'StopTestApp' cleanup + success = True + try: + for task_class in self._sub_tasks: + # Instantiate as SubTask to maintain ID hierarchy (e.g., [4.1], [4.2]) + task = task_class(parent=self, owner=self._owner) + + if task.run() is False: + success = False + self.print(f" [FAIL] Test suite halted at: {task.name}") + break + + except AttributeError as e: + success = False + self.fail(f" [ERROR] failure during test execution: {e}") + except Exception as e: + success = False + self.fail(f" [ERROR] Critical failure during test execution: {e}") + + finally: + if not self.do_dry_run(): + # 3. Forced Teardown + # If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted + self.print(" [CLEAN] Ensuring test environment teardown...") + cleanup = StopTestApp(parent=self, owner=self._owner) + cleanup.run() + self.env.test_success = success diff --git a/src/deployment/lib/__init__.py b/src/deployment/lib/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/deployment/lib/__init__.py diff --git a/src/deployment/lib/task_types.py b/src/deployment/lib/task_types.py new file mode 100755 index 0000000..5e4ff56 --- /dev/null +++ b/src/deployment/lib/task_types.py @@ -0,0 +1,98 @@ +import time +from typing import TYPE_CHECKING + +from pipeline_runner.lib.types import typename +from pipeline_runner.lib.task_types import SuiteTask, Task as BaseTask, SuiteSubTask + + +if TYPE_CHECKING: + from deployment_pipeline.core.suite import DeploymentSuite + from deployment_pipeline.lib.task_types import BlogDeploySuite + + +class DeploymentTask(BaseTask): + pass + + +class DeploymentSuiteTask(SuiteTask): + _owner: "BlogDeploySuite" + _parent: "DeploymentTask" + + def skip_task(self): + if self._deps and not self.deps_loaded(): + self.print(f" [INFO] Skipping {self.name}: Dependencies not met.") + return True + if self.skip: + return True + + return False + + def run(self): + self.print("Running ", typename(self)) + try: + if len(self._deps) > 0: + + from deployment_pipeline.core.task_runner import TaskRunner + + TaskRunner.add_deps(self) + TaskRunner.run_deps(self) + except Exception as e: + raise Exception(f"Failed to load dependency for {typename(self)}: {e}") + return self._run() + + def deps_loaded(self): + if isinstance(self, SuiteSubTask): + return True + + from deployment_pipeline.core.task_runner import TaskRunner + + return TaskRunner.is_loaded(self._deps) + + def poll_health_endpoint(self, uri, retries=5, delay=5, label="Service"): + """Shared polling logic for verifying service availability""" + self.print(f" [POLL] Verifying {label} Health: {uri}") + + if self.do_dry_run(): + retries = 0 + + for _ in range(retries): + try: + # Use sh to maintain consistency in logs/dry-runs + # We use graceful=False but handle the boolean return in the loop + res = self.sh( + f"curl -s -I {uri} | grep '200 OK'", handle_exception=False + ) + + if res and res.returncode == 0: + self.print(f" [OK] {label} is healthy.") + return True + else: + self.print("Got result :", res) + except Exception as e: + + self.print(f" [WAIT] {label} not ready... {e}") + self.print(e.__dict__) + + time.sleep(delay) + + return False + + +class DeploymentSubTask(SuiteSubTask): + _owner: "DeploymentSuite" + _parent: DeploymentSuiteTask + + def run(self): + self.print("Running ", typename(self)) + try: + if len(self._deps) > 0: + + from deployment_pipeline.core.task_runner import TaskRunner + + TaskRunner.load_deps( + self, + self._deps, + ) + except Exception as e: + raise Exception(f"Failed to load dependency for {typename(self)}: {e}") + return self._run() diff --git a/src/deployment/lib/types.py b/src/deployment/lib/types.py new file mode 100644 index 0000000..be5cfba --- /dev/null +++ b/src/deployment/lib/types.py @@ -0,0 +1,44 @@ +import os +from pathlib import Path + + +class BuildEnv: + timestamp_format: str = "%Y%m%d-%H%M%S" + workspace: Path + timestamp: str + deploy_branch: str + deploy_path: Path + build_dir: Path + service_name: str + release_dir: Path + test_endpoint_uri: str + pidfile: Path = Path() + test_log: Path = Path() + toml: dict = {} + release: dict = {} + testing: dict = {} + yarn_path: Path + corepack_home: Path + user: str + + def __init__(self, timestamp_format: str | None = None): + self.workspace: Path = Path() + self.timestamp: str = "" + self.deploy_branch: str = "" + self.deploy_path: Path = Path() + self.service_name: str = "" + self.release_dir: Path = Path() + self.yarn_path: Path = Path() + self.corepack_home: Path = Path() + self.server_schema = "http" + self.server_address = "localhost" + self.pidfile = Path("/tmp/blog_test.pid") + self.user: str = "" + + self.root_dir = os.getcwd() + if timestamp_format is not None: + self.timestamp_format = timestamp_format + self.workspace = Path(os.getenv("WORKSPACE", self.root_dir)) + self.build_dir = self.workspace / "build" + + self.test_log: Path = self.build_dir / "test_log.log" diff --git a/src/deployment/main.py b/src/deployment/main.py new file mode 100644 index 0000000..37b1b78 --- /dev/null +++ b/src/deployment/main.py @@ -0,0 +1,31 @@ +import sys + + +from deployment_pipeline.core.suite import DeploymentSuite + + +def main(): + + runner = DeploymentSuite() + exit_code = 0 + + import traceback + + try: + runner.run() + print("🚀 Deployment Successful") + exit_code = 0 + except KeyboardInterrupt: + runner.dump_print_queue() + traceback.print_exc() + runner.print("\n[System] Termination signal received. Cleaning up...") + exit_code = 0 + except Exception as e: + runner.dump_print_queue() + traceback.print_exc() + print(f"❌ Deployment Failed at: {e.with_traceback(e.__traceback__)}") + exit_code = 1 + if exit_code != 0: + print(exit_code) + sys.exit(exit_code or 1) + runner.dump_print_queue()