import os
import subprocess
import time
import tomllib
import socket
from lupa import LuaRuntime
from pathlib import Path
from lib.task_types import SuiteTask
from lib.types import Stage
class GetDeploymentConfig(SuiteTask):
_stage = Stage.BOOTSTRAP
_deps = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Get the deployment configuration"
def _run(self):
# 1. Load Lua
lua = LuaRuntime(unpack_returned_tuples=True)
config_path = self.get_arg("config")
with open(config_path, "r") as f:
cfg = lua.execute(f.read())
# 4. Hydrate self.env
self.env.lua_cfg = cfg # Store the lua object for functional calls later
self.env.app_name = cfg.app_name
self.env.repo = cfg.repo
self.env.timestamp_format = cfg.timestamp_format
self.env.deploy_branch = self.get_arg("branch").split("/")[-1]
self.env.release = cfg.release
self.env.testing = cfg.testing
self.print(f"✅ Context hydrated for {self.env.app_name}")
# self.env.build_dir = Path(config.paths.build)
return True
class LoadServerConfig(SuiteTask):
"""Verifies TOML existence and hydrates the environment with health check URI components"""
_stage = Stage.BOOTSTRAP
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Verify and Hydrate Server Configuration"
def _run(self):
# 1. Physical existence check
config_path = self.env.testing.config_file
self.print(f" [CHECK] Verifying configuration: {config_path}")
if not os.path.exists(config_path):
self.fail(
f"CRITICAL: Configuration file not found at {config_path}. "
"Pipeline terminated."
)
# 2. Parse TOML for internal deployment metadata
try:
with open(config_path, "rb") as f:
data = tomllib.load(f)
server = data.get("server", {})
# 3. Hydrate self.env for HealthCheck and WaitForReadiness tasks
self.env.server_schema = server.get("schema")
self.env.server_address = server.get("address")
self.env.server_port = str(server.get("port"))
self.env.server_health_path = server.get("health_check")
# Construct the dynamic URI used by curl in later stages
self.env.test_endpoint_uri = (
f"{self.env.server_schema}://{self.env.server_address}:"
f"{self.env.server_port}{self.env.server_health_path}"
)
self.print(
f" [READY] Health check URI constructed: {self.env.test_endpoint_uri}"
)
except Exception as e:
self.fail(f"FAILED to parse TOML at {config_path}: {e}")
self.print(" [OK] Configuration verified and environment hydrated.")
return True
class YarnBuild(SuiteTask):
"""Executes dependency installation and asset compilation"""
_stage = Stage.BUILD
_deps = [GetDeploymentConfig, LoadServerConfig]
skip: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Running Yarn build process"
def _run(self):
# Use a temporary build directory with -BUILDING suffix
# This is finalized in AtomicDeploy
timestamp = time.strftime(self.env.timestamp_format)
self.env.release_dir = f"{Path(self.env.testing.deploy_link)}-{timestamp}"
self.env.build_dir = os.getcwd()
self.print(f" [BUILD] Target: {self.env.build_dir}")
self.sh(
f"git clone --branch {self.env.deploy_branch} {self.env.repo} {self.env.build_dir}"
)
self.sh("git submodule update --init --recursive", cwd=self.env.build_dir)
self.sh("yarn install", cwd=self.env.build_dir)
self.sh("yarn combine:css", cwd=self.env.build_dir)
return True
class AtomicDeploy(SuiteTask):
"""Performs rsync to release directory and updates environment symlink"""
_stage = Stage.DEPLOY
_deps = [YarnBuild]
skip: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Executing atomic symlink swap"
def _run(self):
# Determine success from the TestRunner flag
test_success = getattr(self.env, "test_success", False)
# Select appropriate Lua config table
cfg = self.env.release if test_success else self.env.testing
# Generate the versioned directory path using Lua function
# Note: Use the actual formatted timestamp, not the format string
timestamp = time.strftime(self.env.timestamp_format)
final_release_dir = Path(cfg.get_release_dir(timestamp))
self.print(f" [MOVE] Finalizing build to: {final_release_dir}")
# 1. Finalize the directory (Rename from -BUILDING to versioned path)
self.sh(f"mv {self.env.build_dir} {final_release_dir}")
# 2. Atomic Symlink Swap - ONLY if tests passed
if test_success:
deploy_link = Path(cfg.deploy_link)
# Create a temporary symlink name in the same parent directory
temp_link = deploy_link.with_name(deploy_link.name + "_tmp")
self.print(
f" [LINK] Swapping symlink: {deploy_link} -> {final_release_dir}"
)
# Create temporary symlink pointing to the new version
self.sh(f"ln -sfn {final_release_dir} {temp_link}")
# Atomic rename of the symlink itself (overwrites the old link)
self.sh(f"mv -Tf {temp_link} {deploy_link}")
# Restart service
self.sh(f"sudo systemctl restart {cfg.service_name}")
else:
self.print(" [SKIP] Test failure detected. Symlink swap bypassed.")
self.print(f" [INFO] Artifact preserved at: {final_release_dir}")
return True
class HealthCheck(SuiteTask):
"""Polls the local service endpoint to verify readiness"""
_stage = Stage.DEPLOY
_deps = [AtomicDeploy]
skip: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Verifying service health"
def _run(self):
self.msg(self.name)
if self.do_dry_run:
return
for i in range(15):
res = subprocess.run(
["curl", "-s", "-I", self.env.test_endpoint_uri],
capture_output=True,
)
if res.returncode == 0:
self.print("✅ Service is healthy")
return True
else:
print(res)
time.sleep(2)
self.fail("Service failed health check after 30 seconds")