import time
from lib.task_types import SuiteTask, SuiteSubTask
from lib.types import Stage
from core.tasks import YarnBuild
class StartTestApp(SuiteSubTask):
"""Spins up the application in the build directory for integration testing"""
_stage = Stage.TEST
_deps = [YarnBuild]
def __init__(self, *args, **kwargs):
self.name = "Start Application for Test"
super().__init__(*args, **kwargs)
def _run(self):
if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"):
self.print(" [SKIP] Skipping per user request.")
return True
self.print(f" [EXEC] Starting app in {self.env.build_dir}")
# Stop existing service if it's hogging the port
self.sh(f"sudo systemctl stop {self.env.testing.service_name} || true")
# Start background process and record PID
cmd = f"nohup yarn run prod >> '{self.env.test_log}' 2>&1 & echo $! > '{self.env.pidfile}'"
self.sh(cmd, cwd=self.env.build_dir)
return True
class WaitForReadiness(SuiteSubTask):
"""Polls the health endpoint of the TEST instance"""
_stage = Stage.TEST
_deps = [StartTestApp]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Wait for Service Readiness"
def _run(self):
if self._owner.args.get("skip_tests") and not self.get_arg(
"enforce_testing", True
):
return True
uri = self.env.toml["testing"]["network"]["health_endpoint"]
status = self.poll_health_endpoint(uri, label="Test Service")
if self.do_dry_run():
return
if not status:
# If the poll fails, we cat the log as requested before failing
self.sh(f"cat '{self.env.test_log}'")
self.fail(f"Test service at {uri} failed to start.")
return True
class RunMochaTests(SuiteSubTask):
"""Executes the actual test suite against the running instance"""
_stage = Stage.TEST
_deps = [WaitForReadiness]
def __init__(self, *args, **kwargs):
self.name = "Run Tests"
super().__init__(*args, **kwargs)
def _run(self):
if self._owner.args.get("skip_tests") and not self.get_arg("enforce_testing"):
return True
self.print(" [RUN] npm run test:postreceive")
# Using sh_thread to ensure real-time log streaming for Jenkins
self.sh_thread("npm run test:postreceive", cwd=self.env.build_dir)
return True
class StopTestApp(SuiteSubTask):
"""Cleans up the test process regardless of test outcome"""
_stage = Stage.TEST
_deps = [RunMochaTests]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Stop Test App"
def _run(self):
# We try to stop even if SKIP_TESTS was true to be safe
self.print(f" [KILL] Terminating process in {self.env.pidfile}")
self.sh(f"kill $(cat '{self.env.pidfile}') || true")
return True
class TestRunner(SuiteTask):
"""
Sub-orchestrator for the Integration Testing lifecycle.
Manages the environment setup, execution, and teardown for Mocha tests.
"""
_stage = Stage.TEST
_deps = [] # Dependent on YarnBuild completion in the main TaskRunner
skip: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Integration Test Runner"
self._sub_tasks = [StartTestApp, WaitForReadiness, RunMochaTests, StopTestApp]
def _run(self):
# 1. Check if we should even be here
skip_param = self.args.get("skip_tests", False)
enforced = self.get_arg("enforce_testing")
if skip_param and not enforced:
self.print(" [SKIP] Integration tests bypassed by user flag.")
return True
self.print(f"--- Entering Stage: {self._stage.value.upper()} ---")
# 2. Sequential Execution
# We manually iterate to maintain control over the 'StopTestApp' cleanup
success = True
try:
for task_class in self._sub_tasks:
# Instantiate as SubTask to maintain ID hierarchy (e.g., [4.1], [4.2])
task = task_class(parent=self, owner=self._owner)
if task.run() is False:
success = False
self.print(f" [FAIL] Test suite halted at: {task.name}")
break
except Exception as e:
success = False
self.print(f" [ERROR] Critical failure during test execution: {e}")
finally:
if self.do_dry_run():
return
# 3. Forced Teardown
# If the loop broke or failed, ensure StopTestApp runs if StartTestApp was attempted
self.print(" [CLEAN] Ensuring test environment teardown...")
cleanup = StopTestApp(parent=self, owner=self._owner)
cleanup.run()
self.env.test_success = success