Newer
Older
express-blog / deployment / core / tasks.py
import os
import subprocess
import time
import tomllib
import socket
from lupa import LuaRuntime
from pathlib import Path
from lib.task_types import SuiteTask
from lib.types import Stage


class GetDeploymentConfig(SuiteTask):

    _stage = Stage.BOOTSTRAP
    _deps = []

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Get the deployment configuration"

    def _run(self):
        # 1. Load Lua
        lua = LuaRuntime(unpack_returned_tuples=True)
        config_path = self.get_arg("config")

        with open(config_path, "r") as f:
            cfg = lua.execute(f.read())

        # 4. Hydrate self.env
        self.env.lua_cfg = cfg  # Store the lua object for functional calls later
        self.env.app_name = cfg.app_name
        self.env.repo = cfg.repo
        self.env.timestamp_format = cfg.timestamp_format
        self.env.yarn_path = cfg.yarn_path
        self.env.corepack_home = cfg.corepack_home
        self.env.user = cfg.user

        self.env.deploy_branch = self.get_arg("branch").split("/")[-1]
        self.env.release = cfg.release
        self.env.testing = cfg.testing

        self.print(f"✅ Context hydrated for {self.env.app_name}")
        # self.env.build_dir = Path(config.paths.build)
        return True


class LoadServerConfig(SuiteTask):
    """Verifies TOML existence and hydrates the environment with health check URI components"""

    _stage = Stage.BOOTSTRAP

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Verify and Hydrate Server Configuration"

    def _run(self):
        # 1. Physical existence check
        self.env.toml["release"] = self.get_config("release")
        self.env.toml["testing"] = self.get_config("testing")

    def get_config(self, env_type):
        config_file = getattr(self.env, env_type).config_file

        self.print(f"  [CHECK] Verifying {env_type} configuration: {config_file}")

        if not os.path.exists(config_file):
            self.fail(f"CRITICAL: {env_type} config not found at {config_file}.")
        # 1. Physical existence check

        # 2. Parse TOML for internal deployment metadata
        try:
            with open(config_file, "rb") as f:
                data = tomllib.load(f)

            return {
                "public": self.get_server_cfg(data, "public"),
                "network": self.get_server_cfg(data, "network"),
            }

        except Exception as e:
            self.fail(f"FAILED to parse {env_type} TOML: ", e)

    def get_server_cfg(self, data, server_type):
        try:
            server = data.get(server_type)

            # 3. Hydrate self.env for HealthCheck and WaitForReadiness tasks
            config = {
                "schema": server.get("schema"),
                "domain": server.get("domain"),
                "address": server.get("address"),
                "port": str(server.get("port")),
            }
            health_path = data.get("meta").get("health_check")

            if server_type == "network":
                config["loc"] = config.get("address")
            elif server_type == "public":
                config["loc"] = config.get("domain")

            config["health_endpoint"] = (
                f"{config['schema']}://{config['loc']}:"
                f"{config['port']}{health_path}"
            )

            self.print(
                f"  [READY] {server_type} Health URI: {config['health_endpoint']}"
            )
            return config

        except Exception as e:
            self.fail(f"FAILED to parse {server_type} TOML: ", e)


class PipelineSuccess(Exception):
    pass


class HotFix(SuiteTask):
    """Bypasses the full build to update the current live deployment"""

    _stage = Stage.DEPLOY

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Hot fix"

    def _run(self):
        if not self.get_arg("hotfix"):
            return

        cfg = self.env.release
        # 1. Target the current active symlink
        live_path = self.env.release.deploy_link

        # 2. Pull changes
        try:
            self.sh(
                "git pull origin " + self.env.deploy_branch,
                cwd=live_path,
                handle_exception=False,
            )
        except:
            self.sh("git fetch origin ", cwd=live_path)
            self.sh("git reset --hard origin/" + self.env.deploy_branch, cwd=live_path)

        # 3. Quick Asset Rebuild (Skip yarn install unless package.json changed)
        # We check for changes in package.json to decide if we need a full install
        self.sh("yarn combine:css", cwd=live_path)

        # 4. Restart to pick up Node.js changes
        self.sh(f"sudo systemctl restart {cfg.service_name}")

        raise PipelineSuccess("Hot fix applied successfully")


class YarnBuild(SuiteTask):
    """Executes dependency installation and asset compilation"""

    _stage = Stage.BUILD
    _deps = [GetDeploymentConfig, LoadServerConfig]
    skip: bool = False

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Running Yarn build process"

    def _run(self):
        timestamp = time.strftime(self.env.timestamp_format)
        self.env.release_dir = f"{Path(self.env.testing.deploy_link)}-{timestamp}"

        self.sh(
            f"git clone --branch {self.env.deploy_branch} {self.env.repo} {self.env.build_dir}"
        )
        self.sh("git submodule update --init --recursive", cwd=self.env.build_dir)
        self.sh("yarn config set enableGlobalCache true", cwd=self.env.build_dir)
        self.sh(
            f"yarn config set globalFolder {self.env.yarn_path}", cwd=self.env.build_dir
        )
        self.sh("yarn config set nodeLinker pnp", cwd=self.env.build_dir)
        self.sh("yarn install", cwd=self.env.build_dir)
        self.sh("yarn combine:css", cwd=self.env.build_dir)
        return True


class AtomicDeploy(SuiteTask):
    """Performs rsync to release directory and updates environment symlink"""

    _stage = Stage.DEPLOY
    _deps = [YarnBuild]
    skip: bool = False

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Executing atomic symlink swap"

    def _run(self):
        # Determine success from the TestRunner flag
        test_success = getattr(self.env, "test_success", False)

        # Select appropriate Lua config table
        cfg = self.env.release if test_success else self.env.testing

        # Generate the versioned directory path using Lua function
        # Note: Use the actual formatted timestamp, not the format string
        timestamp = time.strftime(self.env.timestamp_format)
        final_release_dir = Path(cfg.get_release_dir(timestamp))

        # 1. Finalize the directory (Rename from -BUILDING to versioned path)
        self.sh(f"mv {self.env.build_dir} {final_release_dir}")

        # 2. Atomic Symlink Swap - ONLY if tests passed
        if test_success:
            deploy_link = Path(cfg.deploy_link)
            # Create a temporary symlink name in the same parent directory
            temp_link = deploy_link.with_name(deploy_link.name + "_tmp")

            # Create temporary symlink pointing to the new version
            self.sh(f"ln -sfn {final_release_dir} {temp_link}")

            # Atomic rename of the symlink itself (overwrites the old link)
            self.sh(f"mv -Tf {temp_link} {deploy_link}")

            # Restart service
            self.sh(f"sudo systemctl restart {cfg.service_name}")
        else:
            self.print("  [SKIP] Test failure detected. Symlink swap bypassed.")
            self.print(f"  [INFO] Artifact preserved at: {final_release_dir}")

        return True


class HealthCheck(SuiteTask):
    """Polls the local production service endpoint"""

    _stage = Stage.DEPLOY
    _deps = [AtomicDeploy]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = "Verifying service health"

    def _run(self):
        # Base run handles dry_run check already
        uri = self.env.toml["release"]["network"]["health_endpoint"]

        status = self.poll_health_endpoint(uri, label="Production Service")
        if self.do_dry_run():
            return
        if not status:
            self.fail(f"Production service failed health check at {uri}")

        return True