diff --git a/deployment/__init__.py b/deployment/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/deployment/__init__.py diff --git a/deployment/__main__.py b/deployment/__main__.py new file mode 100644 index 0000000..421d58f --- /dev/null +++ b/deployment/__main__.py @@ -0,0 +1,4 @@ +if __name__ == "__main__": + from main import main + + main() diff --git a/deployment/core/__init__.py b/deployment/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/deployment/core/__init__.py diff --git a/deployment/core/bootstrap.py b/deployment/core/bootstrap.py new file mode 100644 index 0000000..a8d3c29 --- /dev/null +++ b/deployment/core/bootstrap.py @@ -0,0 +1,62 @@ +import os +import shutil + +from lib.printer import clear_screen +from lib.types import Stage +from lib.task_types import SuiteTask, SuiteSubTask + + +class CheckNix(SuiteTask): + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Checking if we are in a nix shell..." + + def _run(self): + if not shutil.which("nix"): + self.print("⬡ Nix tools not found in PATH.") + return + + # 2. Check if already in a shell + shell_type = os.environ.get("IN_NIX_SHELL") + if shell_type: + self._in_nix_shell = shell_type + return True + + +class EnsureBuildPaths(SuiteTask): + _can_skip = False + """Task 1: Ensure build paths exist""" + + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Setting up bulid path" + + def _run(self): + """Ensure build directory exists""" + self.env.build_dir.mkdir(parents=True, exist_ok=True) + + +class VerifySystemDependencies(SuiteTask): + _can_skip = False + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verifying System Dependencies" + + def _run(self): + """Verifies non-Python dependencies required for the C++ Core.""" + if self._owner._in_nix_shell: + self.print("Skipping: in nix shell") + return True + + deps = ["yarn", "git", "rsync", "curl", "node"] + for dep in deps: + if not shutil.which(dep): + self.fail(f"Missing system dependency: {dep}") + self.printer.print(" [OK] System tools detected.") + return True diff --git a/deployment/core/suite.py b/deployment/core/suite.py new file mode 100644 index 0000000..8c8b5eb --- /dev/null +++ b/deployment/core/suite.py @@ -0,0 +1,95 @@ +import argparse +from pathlib import Path + +from lib.errors import SuiteError +from lib.task_types import SuiteTask +from core.task_runner import TaskRunner +from lib.types import BuildEnv + + +class DeploymentSuite(SuiteTask): + """ + Orchestrates the Hexascript logic verification pipeline. + Replaces tdd_loop.sh with zero subprocess overhead for Python logic. + """ + + name = "Hexa Core Test Script Runner" + root_dir: Path | None + _in_nix_shell: bool + _owner: "DeploymentSuite" + + def __init__(self, *args, root: str | None = None, **kwargs): + self.disable_dry_run() + self.parser = None + self._in_nix_shell = False + self.paths = None + self.engine = None + self.env = BuildEnv() + self.args: dict = dict() + + self._owner = self + self._parser() + super().__init__(self, *args, owner=self, *kwargs) + + self._parent = self + + self.root_dir = Path(root) if root else None + + self.kwargs = kwargs + + def _parser(self): + parser = argparse.ArgumentParser(description="Blog Deployment Suite") + + parser.add_argument("--config", required=True) + parser.add_argument("--branch", required=True) + parser.add_argument( + "--root", type=str, help="The root directory of the project" + ) + parser.add_argument( + "--stage", type=str, help="Run a specific stage of the build" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a trial run without executing tasks", + ) + print("ran parser") + + # Capture specific test names: e.g., --tests ArtifactsTest.Cleanup FullChainTest.EndToEnd + parser.add_argument( + "--tests", + nargs="+", + type=str, + default=[], + help="List of specific test names", + ) + + # Capture a regex filter: e.g., --filter Artifacts.* + parser.add_argument( + "--filter", type=str, help="Filter tests with regex (maps to ctest -R)" + ) + + # --tasks 1 2 5 + parser.add_argument( + "--tasks", nargs="+", type=int, help="List of task IDs to run" + ) + + # --skip 0 3 + parser.add_argument( + "--skip", nargs="+", type=int, help="List of task IDs to skip" + ) + + self.parser = parser + self._owner.args = vars(parser.parse_args()) + + def initialized(): + print("Parser already initialized") + + self._parser = initialized + + def fail(self, *args, **kwargs): + """Helper to raise the state-aware exception.""" + raise SuiteError(self, *args, **kwargs) + + def _run(self): + TaskRunner(self, owner=self._owner).run() diff --git a/deployment/core/task_runner.py b/deployment/core/task_runner.py new file mode 100755 index 0000000..09a77ce --- /dev/null +++ b/deployment/core/task_runner.py @@ -0,0 +1,123 @@ +from typing import List + +from lib.types import Stage +from lib.errors import TaskError +from lib.task_types import SuiteTask + +from core.bootstrap import * +from core.task_runner import * +from core.suite import * +from core.tests import * + +from core.tasks import ( + GetDeploymentConfig, + VerifyConfigExists, + YarnBuild, + AtomicDeploy, + HealthCheck, +) + + +class TaskRunner(SuiteTask): + _stage = Stage.BOOTSTRAP + skip: list | None + tasks: list | None # Input from cli + _all_tasks: List[SuiteTask] + _queue: List = [] + + def __init__( + self, + *args, + owner: "DeploymentSuite", + skip: list | None = None, + tasks: list | None = None, + stage: Stage = Stage.ANY, + **kwargs, + ): + super().__init__(owner, *args, owner=owner, **kwargs) + self.last_task = None + self.disable_dry_run() + + self.name = "Task Runner" + self._skip = skip + self._all_tasks = [] + self.tasks = tasks + self.current_stage = stage + + all_tasks = [ + CheckNix, + VerifySystemDependencies, + GetDeploymentConfig, + VerifyConfigExists, + EnsureBuildPaths, + YarnBuild, + TestRunner, + AtomicDeploy, + HealthCheck, + # DetermineRoot, + # VerifyEnvironment, + # EnsureBuildPaths, + # RunUnitTests, + # UpdateVersion, + # BuildServer, + # StartServer, + ] + for _task in all_tasks: + + task = _task(owner, *args, owner=owner, **kwargs) + self._all_tasks.append(task) + + @staticmethod + def is_loaded(deps: List[type[SuiteTask]]): + """ + Validates if all required task types exist in the TaskRunner queue. + TaskRunner.queue is expected to be a set or list of SuiteTask instances. + """ + # Extract the classes of the tasks currently in the queue + queued_task_types = {type(task) for task in TaskRunner._queue} + + # Returns True if every dependency class is found in the queued types + return all(dep in queued_task_types for dep in deps) + + def _run(self): + if self._owner is None: + raise ValueError("Owner is not set") + if self._parent is None: + raise ValueError("Parent is not set") + all_tasks = self._all_tasks + + if self.tasks is not None and len(self.tasks) > 0: + TaskRunner._queue = [all_tasks[i] for i in self.tasks if i < len(all_tasks)] + else: + skip_set = self._skip or set() + TaskRunner._queue = [ + task for i, task in enumerate(all_tasks) if i not in skip_set + ] + queue = TaskRunner._queue + + num_tasks = len(queue) + if num_tasks < 1: + self.print(all_tasks) + self.print(queue) + self.print(self.tasks) + self.print(self.skip_task()) + self.fail("No tasks queued.") + + self.print(f"Queue initialized with {len(queue)} tasks") + + # Execute the filtered queue + for task in queue: + self.last_task = task.name + try: + # current_stage = self.current_stage + # task_stage = task.get_stage() + # if task_stage is not current_stage: + # continue + if task.run() is False: + self.fail(f"Pipeline stopped at task: {task.name}") + except ModuleNotFoundError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") + except TaskError as e: + self.print(f" [ERROR] Task {task.name} failed: {e}") + self.fail(f"Pipeline stopped at task: {self.last_task}") diff --git a/deployment/core/tasks.py b/deployment/core/tasks.py new file mode 100644 index 0000000..2b5c963 --- /dev/null +++ b/deployment/core/tasks.py @@ -0,0 +1,169 @@ +import os +import subprocess +import time +import tomllib +import socket +from lupa import LuaRuntime +from pathlib import Path +from lib.task_types import SuiteTask +from lib.types import Stage + + +class GetDeploymentConfig(SuiteTask): + + _stage = Stage.BOOTSTRAP + _deps = [] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Get the deployment configuration" + + def _run(self): + # 1. Load Lua + lua = LuaRuntime(unpack_returned_tuples=True) + with open(self.get_arg("config"), "r") as f: + module = lua.execute(f.read()) + + # 2. Call the factory + target_env = self.get_arg("branch").split("/")[-1] # e.g., 'main' or 'testing' + print(target_env) + + # 3. Hydrate self.env + config = module.get_config(target_env) + + self.env.build_dir = Path(config.paths.build) + self.env.release_dir = Path(config.paths.release_dir) + self.env.deploy_path = Path(config.paths.deploy_link) + self.env.service = config.systemd.service_name + self.env.config_file_source = config.paths.config_file + self.env.meta = config.meta + + self.print(f"✅ Context hydrated for {config.meta.app_name}:{target_env}") + return True + + +class LoadServerConfig(SuiteTask): + """Fails the pipeline if the required TOML config is missing from the host""" + + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verify the server's toml configuration exists" + + +class VerifyConfigExists(SuiteTask): + """Verifies TOML existence and hydrates the environment with health check URI components""" + + _stage = Stage.BOOTSTRAP + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verify and Hydrate Server Configuration" + + def _run(self): + # 1. Physical existence check + config_path = self.env.config_file_source + self.print(f" [CHECK] Verifying configuration: {config_path}") + + if not os.path.exists(config_path): + self.fail( + f"CRITICAL: Configuration file not found at {config_path}. " + "Pipeline terminated to prevent application misbehavior." + ) + + # 2. Parse TOML for internal deployment metadata + try: + with open(config_path, "rb") as f: + data = tomllib.load(f) + + server = data.get("server", {}) + + # 3. Hydrate self.env for HealthCheck and WaitForReadiness tasks + self.env.server_schema = server.get("schema") + self.env.server_address = server.get("address") + self.env.server_port = str(server.get("port")) + self.env.server_health_path = server.get("health_check") + + # Construct the dynamic URI used by curl in later stages + self.env.test_endpoint_uri = ( + f"{self.env.server_schema}://{self.env.server_address}:" + f"{self.env.server_port}{self.env.server_health_path}" + ) + + self.print( + f" [READY] Health check URI constructed: {self.env.test_endpoint_uri}" + ) + + except Exception as e: + self.fail(f"FAILED to parse TOML at {config_path}: {e}") + + self.print(" [OK] Configuration verified and environment hydrated.") + return True + + +class YarnBuild(SuiteTask): + """Executes dependency installation and asset compilation""" + + _stage = Stage.BUILD + _deps = [GetDeploymentConfig, VerifyConfigExists] + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Running Yarn build process" + + def _run(self): + build_dir = self.env.build_dir + self.sh( + f"git clone --branch {self.env.deploy_branch} {self.get_arg('repo')} {build_dir}" + ) + self.sh("git submodule update --init --recursive", cwd=build_dir) + self.sh("yarn install", cwd=build_dir) + self.sh("yarn combine:css", cwd=build_dir) + + +class AtomicDeploy(SuiteTask): + """Performs rsync to release directory and updates environment symlink""" + + _stage = Stage.DEPLOY + _deps = [YarnBuild] + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Executing atomic symlink swap" + + def _run(self): + env = self.env + self.sh(f"mkdir -p {env.release_dir}") + self.sh(f"rsync -a --delete {env.build_dir}/ {env.release_dir}/") + self.sh(f"ln -sfn {env.release_dir} {env.deploy_path}") + self.sh(f"sudo systemctl restart {env.service_name}") + + +class HealthCheck(SuiteTask): + """Polls the local service endpoint to verify readiness""" + + _stage = Stage.DEPLOY + _deps = [AtomicDeploy] + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Verifying service health" + + def _run(self): + self.msg(self.name) + if self.do_dry_run: + return + for i in range(15): + res = subprocess.run( + ["curl", "-s", "-I", self.env.test_endpoint_uri], + capture_output=True, + ) + if res.returncode == 0: + self.print("✅ Service is healthy") + return True + time.sleep(2) + self.fail("Service failed health check after 30 seconds") diff --git a/deployment/core/tests.py b/deployment/core/tests.py new file mode 100644 index 0000000..547b855 --- /dev/null +++ b/deployment/core/tests.py @@ -0,0 +1,161 @@ +import time + +from lib.task_types import SuiteTask, SuiteSubTask +from lib.types import Stage +from core.tasks import YarnBuild + + +class StartTestApp(SuiteSubTask): + """Spins up the application in the build directory for integration testing""" + + _stage = Stage.TEST + _deps = [YarnBuild] + + def __init__(self, *args, **kwargs): + self.name = "Start Application for Test" + super().__init__(*args, **kwargs) + + def _run(self): + if self._owner.args.get("skip_tests") and not self.env.meta.get( + "enforce_testing" + ): + self.print(" [SKIP] Skipping per user request.") + return True + + self.print(f" [EXEC] Starting app in {self.env.build_dir}") + # Stop existing service if it's hogging the port + self.sh(f"sudo systemctl stop {self.env.service_name} || true") + + # Start background process and record PID + cmd = f"nohup yarn run prod >> '{self.env.meta.server_log_file}' 2>&1 & echo $! > '{self.env.pidfile}'" + self.sh(cmd, cwd=self.env.build_dir) + return True + + +class WaitForReadiness(SuiteSubTask): + """Polls the health endpoint until the test server is responsive""" + + _stage = Stage.TEST + _deps = [StartTestApp] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Wait for Service Readiness" + + def _run(self): + if self._owner.args.get("skip_tests") and not self.env.meta.get( + "enforce_testing" + ): + return True + + uri = self.env.test_endpoint_uri + self.print(f" [POLL] Waiting for {uri}...") + # if self.do_dry_run(): + # return + + for _ in range(15): + # Check for 200 OK + try: + res = self.sh(f"curl -s -I {uri} | grep '200 OK'") + if res: + self.print(" [OK] Service is UP.") + return True + except Exception: + time.sleep(2) + + self.sh(f"cat '{self.env.meta.server_log_file}'") + self.fail(f"Service at {uri} failed to start within 30s.") + + +class RunMochaTests(SuiteSubTask): + """Executes the actual test suite against the running instance""" + + _stage = Stage.TEST + _deps = [WaitForReadiness] + + def __init__(self, *args, **kwargs): + self.name = "Run Tests" + super().__init__(*args, **kwargs) + + def _run(self): + if self._owner.args.get("skip_tests") and not self.env.meta.get( + "enforce_testing" + ): + return True + + self.print(" [RUN] npm run test:postreceive") + # Using sh_thread to ensure real-time log streaming for Jenkins + self.sh_thread("npm run test:postreceive", cwd=self.env.build_dir) + return True + + +class StopTestApp(SuiteSubTask): + """Cleans up the test process regardless of test outcome""" + + _stage = Stage.TEST + _deps = [RunMochaTests] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Stop Test App" + + def _run(self): + # We try to stop even if SKIP_TESTS was true to be safe + self.print(f" [KILL] Terminating process in {self.env.pidfile}") + self.sh(f"kill $(cat '{self.env.pidfile}') || true") + return True + + +class TestRunner(SuiteTask): + """ + Sub-orchestrator for the Integration Testing lifecycle. + Manages the environment setup, execution, and teardown for Mocha tests. + """ + + _stage = Stage.TEST + _deps = [] # Dependent on YarnBuild completion in the main TaskRunner + skip: bool = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = "Integration Test Runner" + self._sub_tasks = [StartTestApp, WaitForReadiness, RunMochaTests, StopTestApp] + + def _run(self): + # 1. Check if we should even be here + skip_param = self.args.get("skip_tests", False) + enforced = (self.env.meta.enforce_testing,) + + if skip_param and not enforced: + self.print(" [SKIP] Integration tests bypassed by user flag.") + return True + + self.print(f"--- Entering Stage: {self._stage.value.upper()} ---") + + # 2. Sequential Execution + # We manually iterate to maintain control over the 'StopTestApp' cleanup + success = True + try: + for task_class in self._sub_tasks: + # Instantiate as SubTask to maintain ID hierarchy (e.g., [4.1], [4.2]) + task = task_class(parent=self, owner=self._owner) + + if task.run() is False: + success = False + self.print(f" [FAIL] Test suite halted at: {task.name}") + break + + except Exception as e: + success = False + self.print(f" [ERROR] Critical failure during test execution: {e}") + + finally: + if self.do_dry_run(): + return + # 3. Forced Teardown + # If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted + self.print(" [CLEAN] Ensuring test environment teardown...") + cleanup = StopTestApp(parent=self, owner=self._owner) + cleanup.run() + + return success diff --git a/deployment/lib/__init__.py b/deployment/lib/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/deployment/lib/__init__.py diff --git a/deployment/lib/errors.py b/deployment/lib/errors.py new file mode 100644 index 0000000..bbe22c1 --- /dev/null +++ b/deployment/lib/errors.py @@ -0,0 +1,28 @@ +import sys + + +class SuiteError(Exception): + def __init__( + self, parent, *args, critical: bool = False, code: int | None = None, **kwargs + ): + super().__init__(*args, **kwargs) + parent.dump_print_queue() + print(*args, **kwargs) + + if code is not None: + sys.exit(code) + if critical: + raise RuntimeError(*args, **kwargs) + sys.exit(1) + + +class TaskError(SuiteError): + def __init__( + self, parent, *args, critical: bool = False, code: int | None = None, **kwargs + ): + super().__init__(parent, *args, critical=critical, code=code, **kwargs) + # I dont think this code is reachable + if critical: + raise RuntimeError(*args, **kwargs) + else: + print(*args, **kwargs) diff --git a/deployment/lib/printer.py b/deployment/lib/printer.py new file mode 100755 index 0000000..1ca254e --- /dev/null +++ b/deployment/lib/printer.py @@ -0,0 +1,79 @@ +import os +import json +from pathlib import Path + + +def clear_screen(): + """Clear the screen on both NT and *nix systems.""" + os.system("cls" if os.name == "nt" else "clear") + + +class Printer: + _queue: list = [] + _cache: list = [] + _use_queue = False + _parent = None + _instance = None + + def __init__( + self, + parent, + instance, + *args, + **kwargs, + ): + self._parent = parent + self._instance = instance + self._parent_id = parent._id if parent else 0 + self._instance_id = instance.get_id() + + def dump(self): + for args, kwargs in Printer._queue: + print(*args, **kwargs) + Printer._queue = [] + + def print(self, *args, **kwargs): + payload = [args, kwargs] + Printer._cache.append(payload) + if Printer._use_queue: + Printer._queue.append(payload) + + else: + print(*args, **kwargs) + + def flush(self): + for args, kwargs in Printer._queue: + print(*args, **kwargs) + Printer._queue = [] + + def save_stdout(self, _file_path: Path | str): + file_path = Path(_file_path).resolve() + with open(file_path, "w") as f: + try: + for line in Printer._cache: + try: + f.write(line) + except: + f.write(json.dumps(line)) + except Exception as e: + print(e.with_traceback) + + def _msg_prefix(self): + # Format: [ID] for main tasks, [ID.Sub] for subtasks + from lib.task_types import SuiteSubTask + + if isinstance(self._instance, SuiteSubTask): + return f"\n[{self._instance.parent_id}.{self._instance.sub_id}] " + return f"\n[{self._instance._id}] " + + def msg(self, *args, **kwargs): + """Standardized message logger.""" + + self.print(self._msg_prefix(), *args, **kwargs) + + def enable_queue(self): + Printer._use_queue = True + + def disable_queue(self): + Printer._use_queue = False + self.dump() diff --git a/deployment/lib/task_types.py b/deployment/lib/task_types.py new file mode 100755 index 0000000..ba8128e --- /dev/null +++ b/deployment/lib/task_types.py @@ -0,0 +1,262 @@ +import threading +import os +import sys +import subprocess +from pathlib import Path +from abc import ABC, abstractmethod +from typing import List, TYPE_CHECKING + +from lib.printer import Printer +from lib.errors import TaskError + + +if TYPE_CHECKING: + from types import Stage, BuildEnv + from task_types import BlogDeploySuite + + +class SuiteTask(ABC): + _owner: "BlogDeploySuite" + _parent: "SuiteTask" + _global_counter: int = 0 + _id: int + _cwd: Path | None + message: str + name: str + printer: Printer + skip: bool = False + _can_skip: bool = True + _stage: "Stage" + _initialized = False + _deps = [] + env: "BuildEnv" + + def __init__( + self, + parent, + *args, + owner: "TDDSuite", + cwd: Path | str | None = None, + attach_printer: bool = True, + **kwargs, + ): + from lib.task_types import SuiteTask + + if owner is None and not SuiteTask._initialized: + raise ValueError("Owner is not set") + if parent is None: + raise ValueError("Parent is not set") + if kwargs and self.__class__.__name__ in kwargs.get("skip"): + self.skip = True + return + SuiteTask._initialized = True + + if cwd is not None: + cwd = Path(cwd) + if cwd is None and parent is not None: + try: + cwd = parent.get_cwd() + except: + pass + if cwd is None: + try: + cwd = os.getcwd() + except: + pass + self._cwd = cwd + + self._owner = owner + self._parent = parent + self.env = owner.env + self.args = self._owner.args + + from lib.task_types import SuiteTask + + if not isinstance(self, SuiteSubTask): + self._id = SuiteTask._global_counter + SuiteTask._global_counter += 1 + if attach_printer: + self.attach_printer(parent) + + def get_arg(self, arg): + return self._owner.args.get(arg) + + def skip_task(self): + if self._deps and not self.deps_loaded(): + self.print(f" [INFO] Skipping {self.name}: Dependencies not met.") + return True + if self.skip: + return True + + return False + + def get_path(self, component: str, path: Path | str | None = None) -> Path: + if path is not None: + return self._owner.paths.get(component) / Path(path) + return self._owner.paths.get(component) + + def do_dry_run(self): + do_dry_run = self.args.get("dry_run", False) or self.skip_task() + return do_dry_run + + def attach_printer(self, parent): + self.printer = Printer(parent, self) + + @staticmethod + def inc_count(): + SuiteSubTask._global_counter += 1 + + @staticmethod + def get_count(): + return SuiteTask._global_counter + + def dump_print_queue(self): + """Standardized message logger.""" + self.printer.dump() + + def print(self, *args, **kwargs): + """Standardized message logger.""" + self.printer.print(*args, **kwargs) + + def msg(self, *args, **kwargs): + """Standardized message logger.""" + self.printer.msg(*args, **kwargs) + + @abstractmethod + def _run(self): + pass + + def dry_run(self): + self.msg(self.name) + if self.skip_task(): + self.print("Skipping") + return True + return self.do_dry_run() + + def disable_dry_run(self): + def func(): + print("Dry run disabled") + return False + + print("Disabling dry run") + self.do_dry_run = func + + def run(self): + return self._run() + + def fail(self, *args, critical: bool = False, **kwargs): + """Helper to raise the state-aware exception.""" + + raise TaskError(self, critical=critical, *args, **kwargs) + + def sh(self, cmd: str, cwd: Path | None = None, graceful=False, dry_run=None): + """Helper to run shell commands within the project context.""" + self.msg(f" [EXEC] {cmd}") + if self.do_dry_run and dry_run is not False: + return + + try: + # E: Instance of 'SuiteTask' has no 'paths' member + return subprocess.run( + cmd, shell=True, check=True, cwd=str(cwd or os.getcwd()) + ) + except Exception as e: + if graceful: + self.fail(e) + raise Exception(e) + + def sh_thread(self, cmd: str, cwd: Path | None = None): + """ + Runs shell commands, streams output to CLI in real-time, + and captures it for later analysis. + """ + self.msg(f" [EXEC] {cmd}") + if self.do_dry_run: + return + + # Store captured output + self.last_stdout = [] + self.last_stderr = [] + + # Start the process with piped outputs + process = subprocess.Popen( + cmd, + shell=True, + cwd=str(cwd or self.get_path("root")), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + ) + + def stream_pipe(pipe, relay, accumulator): + """Reads from pipe, writes to relay (stdout/err), and saves to list.""" + for line in iter(pipe.readline, ""): + if line: + accumulator.append(line) + relay.write(line) + relay.flush() + pipe.close() + + # Use threads to prevent the pipes from clogging (which causes deadlocks) + t1 = threading.Thread( + target=stream_pipe, args=(process.stdout, sys.stdout, self.last_stdout) + ) + t2 = threading.Thread( + target=stream_pipe, args=(process.stderr, sys.stderr, self.last_stderr) + ) + + t1.start() + t2.start() + + # Wait for completion + exit_code = process.wait() + t1.join() + t2.join() + + if exit_code != 0: + self.fail(f"\n[ERROR] Command failed with code {exit_code}", code=exit_code) + + return ["".join(self.last_stdout), "".join(self.last_stdout)] + + def get_cwd(self): + return self._cwd + + def get_id(self): + return self._id + + def get_stage(self): + return self._stage + + +class SuiteSubTask(SuiteTask): + _owner: "TDDSuite" + _parent: SuiteTask + + _sub_counter: dict[int] = {} + + def __init__(self, *args, **kwargs): + super().__init__(*args, attach_printer=False, **kwargs) + + if SuiteTask._global_counter not in SuiteSubTask._sub_counter.keys(): + SuiteSubTask._sub_counter[SuiteTask._global_counter] = 0 + + self._id = (SuiteTask._global_counter, SuiteSubTask._sub_counter) + + self.attach_printer(self._owner) + + self.paths = self._owner.paths + + def msg(self, *args, **kwargs): + """Standardized message logger.""" + SuiteSubTask.inc_count() + + self._parent.msg(*args, **kwargs) + + @staticmethod + def inc_count(): + SuiteSubTask._sub_counter[SuiteTask._global_counter] += 1 + + @staticmethod + def get_count(): + return SuiteSubTask._sub_counter diff --git a/deployment/lib/types.py b/deployment/lib/types.py new file mode 100644 index 0000000..1fe95b9 --- /dev/null +++ b/deployment/lib/types.py @@ -0,0 +1,44 @@ +import os +import time +from enum import Enum +from pathlib import Path + + +class Stage(Enum): + ANY = "any" + BOOTSTRAP = "bootstrap" + BUILD = "build" + TEST = "test" + DEPLOY = "deploy" + + +class BuildEnv: + timestamp_format: str = "%Y%m%d-%H%M%S" + workspace: Path + timestamp: str + deploy_branch: str + deploy_path: Path + build_dir: Path + service_name: str + release_dir: Path + test_endpoint_uri: str + meta: dict = dict() + pidfile: Path = Path() + + def __init__(self, timestamp_format: str | None = None): + self.workspace: Path = Path() + self.timestamp: str = "" + self.deploy_branch: str = "" + self.deploy_path: Path = Path() + self.build_dir: Path = Path() + self.service_name: str = "" + self.release_dir: Path = Path() + self.server_schema: str = "http" + self.server_domain: str = "localhost" + + self.root_dir = os.getcwd() + if timestamp_format is not None: + self.timestamp_format = timestamp_format + self.timestamp = time.strftime(self.timestamp_format) + self.workspace = Path(os.getenv("WORKSPACE", self.root_dir)) + self.build_dir = self.workspace / "build" diff --git a/deployment/main.py b/deployment/main.py new file mode 100644 index 0000000..cf9fe99 --- /dev/null +++ b/deployment/main.py @@ -0,0 +1,25 @@ +import sys + + +from core.suite import DeploymentSuite + + +def main(): + + runner = DeploymentSuite() + exit_code = None + + try: + runner.run() + print("🚀 Deployment Successful") + exit_code = 0 + except KeyboardInterrupt: + runner.print("\n[System] Termination signal received. Cleaning up...") + runner.dump_print_queue() + exit_code = 0 + except Exception as e: + print(f"❌ Deployment Failed at: {e.with_traceback(e.__traceback__)}") + raise + exit_code = 1 + runner.dump_print_queue() + sys.exit(exit_code or 1) diff --git a/deployment/prototype.py b/deployment/prototype.py new file mode 100755 index 0000000..b591d4a --- /dev/null +++ b/deployment/prototype.py @@ -0,0 +1,177 @@ +GIT_REPO = 'ssh://git@git.jasonpoage.vpn:29418/jason/express-blog.git' +DEPLOY_BASE = '/srv/jasonpoage.com' +YARN_ENABLE_GLOBAL_CACHE = 'false' +YARN_CACHE_FOLDER = '/var/cache/jenkins/yarn' +CREDENTIALS_ID = '08a57452-477d-4aa6-86c6-242553660b3f' + + + +options { + timestamps() +} + + +parameters { + string(name: 'branch', defaultValue: 'refs/heads/main', description: 'Branch ref from webhook') + string(name: 'oldrev', defaultValue: '', description: 'old rev') + string(name: 'newrev', defaultValue: '', description: 'new rev') + + booleanParam(name: 'SKIP_TESTS', defaultValue: true, description: 'Skip all testing') +} +class test_runner: + def init(): + print('Init') + if params.branch?.startsWith("refs/heads/"): + DEPLOY_BRANCH = params.branch.replaceFirst(/^refs\/heads\//, '') + else: + print(f "Invalid branch ref: '{params.branch}'") + + + print( "==== DEBUG: Branch Param ====") + print ("params.branch: '{params.branch}'") + print ("DEPLOY_BRANCH: '{DEPLOY_BRANCH}'") + + TIMESTAMP = sh(script: "date +%Y%m%d-%H%M%S", returnStdout: true).trim() + LOG_DIR = f"{DEPLOY_BASE}/deployments/logs" + SERVER_LOG_FILE = f"{LOG_DIR}/server/server-{TIMESTAMP}.log" + TEST_LOGS_FILE = f"{LOG_DIR}/test-results/test-" + BUILD_DIR = f"{WORKSPACE}/build" + PIDFILE = f"{BUILD_DIR}/test.pid" + ENV_FILE = f"{DEPLOY_BASE}/env/{DEPLOY_BRANCH}.env" + SERVICE_NAME = f"express-blog@{DEPLOY_BRANCH}.service" + DEPLOY_PATH = f"{DEPLOY_BASE}/deployments/blog-{DEPLOY_BRANCH}" + + + if params.oldrev?.trim() && params.newrev?.trim(): + OLD_REV = params.oldrev + NEW_REV = params.newrev + else : + OLD_REV = sh(script: 'git rev-parse HEAD~1', returnStdout: true).trim() + NEW_REV = sh(script: 'git rev-parse HEAD', returnStdout: true).trim() + + print (f"==== DEBUG: Revisions ====") + print (f"params.oldrev: '{params.oldrev}'") + print (f"params.newrev: '{params.newrev}'") + print (f"Old revision: {OLD_REV}") + print (f"New revision: {NEW_REV}") + sh (f"mkdir -p '{LOG_DIR}/server' '{LOG_DIR}/test-results'") + def checkout () : + print('Checkout') + checkout([$class: 'GitSCM', + branches: [[name: "*/{DEPLOY_BRANCH}"]], + userRemoteConfigs: [[ + url: GIT_REPO, + credentialsId: CREDENTIALS_ID + ]] + ]) + + def validate_branch(): + print('Validate Branch') + steps { + script { + def allowed = ['testing', 'staging', 'main', 'production'] + if (!allowed.contains(DEPLOY_BRANCH)) { + fail "Branch '{DEPLOY_BRANCH}' is not allowed for deployment." + } + } + } + + def clone_build_dir(): + print('Clone to Build Dir') + sh (f"git clone --branch '{DEPLOY_BRANCH}' '{GIT_REPO}' '{BUILD_DIR}'") + + def build(): + print('Build') + dir(f"{BUILD_DIR}") { + sh """ + git submodule update --init --recursive + yarn + yarn combine:css + """ + + def start_app(): + print('Start Application for Test') { + if not params.SKIP_TESTS : + dir(BUILD_DIR) { + sh """ + sudo systemctl stop {SERVICE_NAME} || true + corepack enable + nohup yarn run prod >> '{SERVER_LOG_FILE}' 2>&1 & + echo \$! > '{PIDFILE}' + """ + + def wait_for_service(): + print('Wait for Service Readiness') + if not params.SKIP_TESTS : + def timeout = 30 + def elapsed = 0 + def success = false + while elapsed < timeout : + def result = sh(script: f"curl --max-time 2 --silent --fail '\{SERVER_SCHEMA}://\{SERVER_DOMAIN}/health -I' > /dev/null || true", returnStatus: true) + if result == 0: + success = true + break + sleep 1 + elapsed += 1 + if not success : + sh f"cat '{SERVER_LOG_FILE}'" + fail f"Service did not become available within {timeout}s." + } + } + } + + def run_tests(): + print('Run Tests') + if not params.SKIP_TESTS : + def testStatus = sh(script: f"cd '{BUILD_DIR}' && npm run test:postreceive", returnStatus: true) + archiveArtifacts artifacts: f"{TEST_LOGS_FILE}*", onlyIfSuccessful: false + if testStatus != 0: + sh f""" + kill \$(cat '{PIDFILE}') || true + cat '{SERVER_LOG_FILE}' + """ + fail( "Tests failed for branch {DEPLOY_BRANCH}") + } + } + + def kill_test_server(): + """systemctl stop test server""" + print('Stop Test App') + if not params.SKIP_TESTS : + sh ("kill \$(cat '{PIDFILE}') || true") + def deploy(): + print('Deploy') + # 1. Create the new release directory + releaseDir = f"{DEPLOY_BASE}/releases/blog-{DEPLOY_BRANCH}-{TIMESTAMP}" + sh( f"mkdir -p {releaseDir}") + + # 2. Sync the finished build to the release directory + print(f"Deploying build to {releaseDir}") + sh( f"rsync -a --delete '{BUILD_DIR}/' '{releaseDir}/'") + + # 3. Atomically flip the symlink + # We use 'ln -sfn' to overwrite the existing link to the new path + sh (f"ln -sfn '{releaseDir}' '{DEPLOY_PATH}'") + + # 4. Cleanup old releases (Keep only last 5) + dir(f"{DEPLOY_BASE}/releases") { + sh f"ls -1t | grep 'blog-{DEPLOY_BRANCH}' | tail -n +6 | xargs rm -rf || true" + } + def restart_production_server(): + print('Restart Service') + sh f"sudo systemctl restart {SERVICE_NAME}" + def verify_service(): + print("VerifyService") + timeout = 30 + elapsed = 0 + success = false + while (elapsed < timeout): + result = sh(script: f"curl --max-time 2 --silent --fail '{SERVER_SCHEMA}://{SERVER_DOMAIN}/health -I' > /dev/null || true", returnStatus: true) + if result == 0: + success = true + break + sleep 1 + elapsed += 1 + if not success: + print(SERVER_LOG_FILE) + self.fail( f"Service did not become available within {timeout}s.") diff --git a/shell.nix b/shell.nix index e34e8c4..662c049 100644 --- a/shell.nix +++ b/shell.nix @@ -1,18 +1,17 @@ # NodeJS PM2 # shell.nix - -{ pkgs ? import {} }: - +{pkgs ? import {}}: pkgs.mkShell { - packages = with pkgs; [ - which - nodejs_latest - nodePackages.pnpm - chromium - # nodePackages.pm2mk - imagemagick - ]; - + packages = with pkgs; + [ + which + nodejs_latest + nodePackages.pnpm + chromium + # nodePackages.pm2mk + imagemagick + ] + ++ (with python313Packages; [tomli lupa]); shellHook = '' export PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=true export PUPPETEER_EXECUTABLE_PATH=${pkgs.chromium}/bin/chromium