import argparse import subprocess from collections import defaultdict from dataclasses import dataclass import os import re import docker from pydantic import BaseModel, ValidationError import yaml @dataclass class BuildSpec(BaseModel): context: str dockerfile: str args: dict[str, str] = defaultdict @dataclass class Service(BaseModel): build: str | BuildSpec | None = None image: str | None = None volumes: list[str] | None = None command: str | None = None depends_on: list[str] | None = None entrypoint: list[str] | None = None env_file: str | None = None environment: dict[str, str] | None = None @dataclass class ComposeSpec(BaseModel): services: dict[str, Service] parser = argparse.ArgumentParser() parser.add_argument("--dry", type=bool, default=False) parser.add_argument("--root", default=".", type=str, required=True) parser.add_argument("--network", default="cloud", type=str) parser.add_argument("--volume-parent", type=str, required=True) parser.add_argument("--lock", type=str) args = parser.parse_args() dry_run = args.dry root = args.root network = args.network vol_parent = args.volume_parent has_error = False def err(msg: str): global has_error has_error = True print(msg) userNamePattern = r"^[a-z0-9]+$" users: list[tuple[int, str]] = [] for userDir in os.listdir(f"{root}/users"): spl = userDir.split("-") if len(spl) != 2: err(f"ERR: Valid priority isn't set for userDir {spl}") continue elif not spl[0].isdigit(): err(f"ERR: UserDir {spl} has an invalid priority.") continue elif not re.match(userNamePattern, spl[1]): err(f"ERR: Name {spl[1]} doesn't match {userNamePattern}") continue users.append((int(spl[0]), spl[1])) users.sort(key=lambda x: x[0]) serviceNamePattern = r"^[a-z0-9]+$" userWorkDir: dict[str, str] = {} userComposeFiles: dict[str, ComposeSpec] = {} for prio, name in users: path = f"{root}/users/{prio}-{name}/compose.yml" with open(path) as csf: data = yaml.safe_load(csf) try: spec = ComposeSpec(**data) except ValidationError as e: err(f"Cannot validate compose spec at {path}") print(e.errors()) continue userComposeFiles[name] = spec userWorkDir[name] = f"{root}/users/{prio}-{name}/" def validate_compose_spec(spec: ComposeSpec, workdir: str): invalid_services = [serviceName for serviceName, _ in spec.services.items() if not re.match(serviceNamePattern, serviceName)] if len(invalid_services) > 0: err(f"ERR: Invalid service names: {', '.join([x for x in invalid_services])}") for invalid_service in invalid_services: spec.services.pop(invalid_service) for key, spec in spec.services.items(): validate_service_spec(spec, workdir) depended_services = [(spec.depends_on or []) for name, spec in spec.services.items()] depended_services = [item for sublist in depended_services for item in sublist] for serviceName in depended_services: if serviceName not in spec.services: err(f"ERR: Service {serviceName} is depended on but not defined in the compose spec") def validate_env_file(path_: str) -> bool: try: with open(path_, "r") as f: for idx, raw in enumerate(f, start=1): line = raw.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export "):].strip() if "=" not in line: err(f"ERR: Invalid line {idx} in env file {path_}: no '=' found") return False key, _ = line.split("=", 1) key = key.strip() if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", key): err(f"ERR: Invalid env var name '{key}' in {path_} at line {idx}") return False return True except Exception as e: err(f"ERR: Cannot read env_file {path_}: {e}") return False def validate_service_spec(serv: Service, workdir: str): if serv.image is None and serv.build is None: err(f"ERR: Service {serv.image} doesn't have an image or build spec") if isinstance(serv.build, str) and not os.path.exists(f"{workdir}/{serv.build}/Dockerfile"): err(f"ERR: Dockerfile doesn't exist at {workdir}/{serv.build}/Dockerfile") for volume in serv.volumes: spl = volume.split(":") if (len(spl) != 2 and len(spl) != 3) or \ not os.path.normpath(spl[0]).startswith(vol_parent) or \ (len(spl) == 3 and spl[2] != "ro"): err(f"ERR: Invalid volume spec {volume} in service {serv.image}") continue if serv.env_file and not os.path.exists(f"{workdir}/{serv.env_file}"): err(f"ERR: env_file {serv.env_file} doesn't exist in {workdir}") elif serv.env_file: validate_env_file(f"{workdir}/{serv.env_file}") if isinstance(serv.build, BuildSpec) and not os.path.exists(f"{workdir}/{serv.build.dockerfile}"): err(f"ERR: Dockerfile {serv.build.dockerfile} doesn't exist in {workdir}") dk = docker.from_env() def orchestrate(user: str, spec: ComposeSpec, workdir: str): for name, serv in spec.services.items(): if isinstance(serv.build, str): serv.build = BuildSpec(context=serv.build, dockerfile=f"{serv.build}/Dockerfile") if serv.build: command = [ "docker", "buildx", "build", "-t", f"{user}-{name}:latest" ] build_args = [["--build-arg", f"{k}={v}"] for k, v in serv.build.args.items()] command += [arg for build_arg in build_args for arg in build_arg] command += ["-f", serv.build.dockerfile, serv.build.context] print(f"Building image for {user}:{name} with command: {' '.join(command)}") if not dry_run: try: subprocess.run(command, check=True) except subprocess.CalledProcessError as e: err(f"ERR: Failed to build image for {user}:{name} with error: {e}") continue else: print("Build skipped due to dry run mode.")