#!/usr/bin/env python3 import argparse import configparser import subprocess import sys from dataclasses import dataclass from datetime import datetime from enum import Enum, auto from pathlib import Path class State(Enum): UP = 1 DOWN = 2 UNSTABLE = 3 UNKNOWN = auto() @staticmethod def from_rc(rc: int) -> "State": if rc == 0: return State.UP else: return State.DOWN def to_colour(self) -> str: return { State.UP: "green", State.DOWN: "red", State.UNSTABLE: "orange", State.UNKNOWN: "gray" }[self] class StateChange(Enum): NONE = 0 FAIL = 1 RECOVER = 2 @dataclass class Service: name: str cmd: str url: str on_fail: str | None = None on_recover: str | None = None def get_services_from_config(config_path: Path) -> list[Service]: config = configparser.ConfigParser() config.read(config_path) services = [] for section in config.sections(): if not section.startswith("service:"): continue name = section.split("service:")[1] cmd = config[section].get("cmd") url = config[section].get("url") or "#" on_fail = config[section].get("on_fail") on_recover = config[section].get("on_recover") if cmd is None: raise ValueError(f"Section service:{name} missing 'cmd'.") services.append(Service(name=name, url=url, cmd=cmd, on_fail=on_fail, on_recover=on_recover)) return services def do_check(service: Service) -> int: print(f"Checking {service.name}...", file=sys.stderr, end="") rc = subprocess.run( service.cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ).returncode print(f" {State.from_rc(rc).name}", file=sys.stderr) return rc def run_on_fail(service: Service): if service.on_fail: subprocess.run(service.on_fail, shell=True) def run_on_recover(service: Service): if service.on_recover: subprocess.run(service.on_recover, shell=True) def write_status(rc: int, status_file: Path): now = datetime.now().isoformat(timespec="seconds") with open(status_file, "a") as f: f.write(f"{now} {rc}\n") def check_for_state_change(status_file: Path) -> StateChange: with open(status_file, "r") as f: lines = f.readlines()[-2:] try: prev_rc = int(lines[0].split()[1]) cur_rc = int(lines[1].split()[1]) except IndexError: return StateChange.NONE if prev_rc == 0 and cur_rc != 0: return StateChange.FAIL elif prev_rc != 0 and cur_rc == 0: return StateChange.RECOVER else: return StateChange.NONE def check_service(service: Service, workdir: Path): status_dir = workdir / service.name if not status_dir.exists(): status_dir.mkdir(parents=True) today = datetime.today().strftime("%Y-%m-%d") status_file = status_dir / today rc = do_check(service) write_status(rc, status_file) state_change = check_for_state_change(status_file) if state_change == StateChange.FAIL: run_on_fail(service) elif state_change == StateChange.RECOVER: run_on_recover(service) def write_report_header(f): f.write(""" Service Status

Service Status

""") def write_service_row(f, service: Service, days: list[tuple[str, State, State]]): latest_state = days[-1][2] badge = f'{latest_state.name}' f.write(f' \n \n \n \n") def write_report_footer(f): now = datetime.now().strftime("%Y-%m-%d %H:%M") f.write(f"""
Service Status Last 30 days
{service.name}{badge}\n') for day, overall_state, _ in days: f.write(f' \n') f.write("
""") def generate_report(services: list[Service], workdir: Path, output: Path): if not output.exists(): output.mkdir(parents=True) report_file = output / "status.html" with open(report_file, "w") as report_f: write_report_header(report_f) for service in services: status_dir = workdir / service.name # Last 30 days status_files = sorted(status_dir.glob("*"))[-30:] days: list[tuple[str, State, State]] = [] # (date, overall_state, latest_state) for status_file in status_files: with open(status_file, "r") as status_f: overall_state = State.UNKNOWN for line in status_f: when, raw_rc = line.strip().split() rc = int(raw_rc) latest_state = State.from_rc(rc) if overall_state == State.UNKNOWN: overall_state = latest_state else: if latest_state != overall_state: overall_state = State.UNSTABLE days.append((status_file.name, overall_state, latest_state)) write_service_row(report_f, service, days) write_report_footer(report_f) def run(args: argparse.Namespace) -> int: services = get_services_from_config(args.config) for service in services: check_service(service, args.workdir) generate_report(services, args.workdir, args.output) return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Monitor the uptime of your services.") parser.add_argument( "-c", "--config", type=Path, required=True, help="Path to the configuration file." ) parser.add_argument( # TODO: Should probably be database "-w", "--workdir", type=Path, default=Path.cwd(), help="Working directory." ) parser.add_argument( "-o", "--output", type=Path, default=Path.cwd() / "html", help="Output directory." ) return parser.parse_args() def main() -> int: args = parse_args() return run(args) if __name__ == "__main__": sys.exit(main())