snapshot orchestration

This commit is contained in:
iceBear67
2026-06-03 15:31:27 +08:00
parent b27a0a564e
commit 0e5c3d6c65
3 changed files with 276 additions and 62 deletions

View File

@@ -1,24 +1,21 @@
import argparse
import subprocess
from collections import defaultdict
from dataclasses import dataclass
import os
import re
import shutil
import subprocess
import sys
import docker
from pydantic import BaseModel, ValidationError
import yaml
@dataclass
class BuildSpec(BaseModel):
context: str
dockerfile: str
args: dict[str, str] = defaultdict
args: dict[str, str] = {}
@dataclass
class Service(BaseModel):
build: str | BuildSpec | None = None
image: str | None = None
@@ -30,17 +27,15 @@ class Service(BaseModel):
environment: dict[str, str] | None = None
@dataclass
class ComposeSpec(BaseModel):
services: dict[str, Service]
parser = argparse.ArgumentParser()
parser.add_argument("--dry", type=bool, default=False)
parser.add_argument("--root", default=".", type=str, required=True)
parser.add_argument("--dry", action="store_true", default=False)
parser.add_argument("--root", default=".", type=str)
parser.add_argument("--network", default="cloud", type=str)
parser.add_argument("--volume-parent", type=str, required=True)
parser.add_argument("--lock", type=str)
args = parser.parse_args()
dry_run = args.dry
@@ -61,7 +56,7 @@ userNamePattern = r"^[a-z0-9]+$"
users: list[tuple[int, str]] = []
for userDir in os.listdir(f"{root}/users"):
spl = userDir.split("-")
spl = userDir.split("-", 1)
if len(spl) != 2:
err(f"ERR: Valid priority isn't set for userDir {spl}")
continue
@@ -77,23 +72,6 @@ users.sort(key=lambda x: x[0])
serviceNamePattern = r"^[a-z0-9]+$"
userWorkDir: dict[str, str] = {}
userComposeFiles: dict[str, ComposeSpec] = {}
for prio, name in users:
path = f"{root}/users/{prio}-{name}/compose.yml"
with open(path) as csf:
data = yaml.safe_load(csf)
try:
spec = ComposeSpec(**data)
except ValidationError as e:
err(f"Cannot validate compose spec at {path}")
print(e.errors())
continue
userComposeFiles[name] = spec
userWorkDir[name] = f"{root}/users/{prio}-{name}/"
def validate_compose_spec(spec: ComposeSpec, workdir: str):
invalid_services = [serviceName for serviceName, _ in spec.services.items() if
not re.match(serviceNamePattern, serviceName)]
@@ -101,16 +79,68 @@ def validate_compose_spec(spec: ComposeSpec, workdir: str):
err(f"ERR: Invalid service names: {', '.join([x for x in invalid_services])}")
for invalid_service in invalid_services:
spec.services.pop(invalid_service)
for key, spec in spec.services.items():
validate_service_spec(spec, workdir)
for key, svc in spec.services.items():
validate_service_spec(svc, workdir, key)
depended_services = [(spec.depends_on or []) for name, spec in spec.services.items()]
depended_services = [(svc.depends_on or []) for _name, svc in spec.services.items()]
depended_services = [item for sublist in depended_services for item in sublist]
for serviceName in depended_services:
if serviceName not in spec.services:
err(f"ERR: Service {serviceName} is depended on but not defined in the compose spec")
# Fields recognized by our Pydantic models — anything else in the YAML
# is silently dropped by Pydantic and could interfere with our injection.
_TOP_LEVEL_KNOWN = {"services"}
_SERVICE_KNOWN = {"build", "image", "volumes", "command", "depends_on",
"entrypoint", "env_file", "environment"}
_BUILD_KNOWN = {"context", "dockerfile", "args"}
def detect_injected_fields(path_: str, data: dict):
"""Warn about YAML keys not captured by the Pydantic model — these would
be silently dropped and could interfere with network injection."""
if not isinstance(data, dict):
return
extra_toplevel = set(data.keys()) - _TOP_LEVEL_KNOWN
if extra_toplevel:
err(f"WARN: {path_} has unexpected top-level keys (ignored by model): "
f"{', '.join(sorted(extra_toplevel))}. "
f"These may interfere with injected networks/config.")
services = data.get("services")
if not isinstance(services, dict):
return
for svc_name, svc_data in services.items():
if not isinstance(svc_data, dict):
continue
extra_svc = set(svc_data.keys()) - _SERVICE_KNOWN
if extra_svc:
err(f"WARN: {path_} service '{svc_name}' has unexpected keys "
f"(ignored by model): {', '.join(sorted(extra_svc))}")
build = svc_data.get("build")
if isinstance(build, dict):
extra_build = set(build.keys()) - _BUILD_KNOWN
if extra_build:
err(f"WARN: {path_} service '{svc_name}' build block has "
f"unexpected keys: {', '.join(sorted(extra_build))}")
for prio, name in users:
workdir = f"{root}/users/{prio}-{name}/"
path = f"{workdir}compose.yml"
with open(path) as csf:
data = yaml.safe_load(csf)
detect_injected_fields(path, data)
try:
spec = ComposeSpec(**data)
except ValidationError as e:
err(f"Cannot validate compose spec at {path}")
print(e.errors())
continue
validate_compose_spec(spec, workdir)
def validate_env_file(path_: str) -> bool:
try:
with open(path_, "r") as f:
@@ -134,46 +164,113 @@ def validate_env_file(path_: str) -> bool:
return False
def validate_service_spec(serv: Service, workdir: str):
def validate_service_spec(serv: Service, workdir: str, name: str):
if serv.image is None and serv.build is None:
err(f"ERR: Service {serv.image} doesn't have an image or build spec")
err(f"ERR: Service {name} doesn't have an image or build spec")
if isinstance(serv.build, str) and not os.path.exists(f"{workdir}/{serv.build}/Dockerfile"):
err(f"ERR: Dockerfile doesn't exist at {workdir}/{serv.build}/Dockerfile")
for volume in serv.volumes:
err(f"ERR: Service {name}: Dockerfile doesn't exist at {workdir}/{serv.build}/Dockerfile")
for volume in (serv.volumes or []):
spl = volume.split(":")
if (len(spl) != 2 and len(spl) != 3) or \
not os.path.normpath(spl[0]).startswith(vol_parent) or \
(len(spl) == 3 and spl[2] != "ro"):
err(f"ERR: Invalid volume spec {volume} in service {serv.image}")
continue
if serv.env_file and not os.path.exists(f"{workdir}/{serv.env_file}"):
err(f"ERR: env_file {serv.env_file} doesn't exist in {workdir}")
elif serv.env_file:
err(f"ERR: Invalid volume spec {volume} in service {name}")
if serv.env_file and not os.path.exists(f"{workdir}/{serv.env_file}") \
and not os.path.exists(f"{workdir}/{serv.env_file}.gpg"):
err(f"ERR: Service {name}: env_file {serv.env_file} (or .gpg) doesn't exist in {workdir}")
elif serv.env_file and os.path.exists(f"{workdir}/{serv.env_file}"):
validate_env_file(f"{workdir}/{serv.env_file}")
if isinstance(serv.build, BuildSpec) and not os.path.exists(f"{workdir}/{serv.build.dockerfile}"):
err(f"ERR: Dockerfile {serv.build.dockerfile} doesn't exist in {workdir}")
err(f"ERR: Service {name}: Dockerfile {serv.build.dockerfile} doesn't exist in {workdir}")
dk = docker.from_env()
if has_error:
print("Errors found during validation. Exiting.")
sys.exit(1)
def orchestrate(user: str, spec: ComposeSpec, workdir: str):
for name, serv in spec.services.items():
if isinstance(serv.build, str):
serv.build = BuildSpec(context=serv.build, dockerfile=f"{serv.build}/Dockerfile")
if serv.build:
command = [
"docker", "buildx", "build", "-t", f"{user}-{name}:latest"
]
build_args = [["--build-arg", f"{k}={v}"] for k, v in serv.build.args.items()]
command += [arg for build_arg in build_args for arg in build_arg]
command += ["-f", serv.build.dockerfile, serv.build.context]
print(f"Building image for {user}:{name} with command: {' '.join(command)}")
if not dry_run:
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
err(f"ERR: Failed to build image for {user}:{name} with error: {e}")
continue
# ── Snapshot: copy userdir + inject networks into compose.yml ──────────
snapshot_root = f"{root}/snapshots"
print(f"Snapshot target: {snapshot_root}")
snapshot_dirs: list[tuple[str, str]] = [] # (dst_dir, name) for compose-up later
for prio, name in users:
src_dir = f"{root}/users/{prio}-{name}"
dst_dir = f"{snapshot_root}/{prio}-{name}"
if dry_run:
print(f" [dry] would snapshot {src_dir} -> {dst_dir}")
print(f" [dry] inject networks: .networks.{network}.external=true, "
f".networks.{name}.name={name}, services[].networks=[{network}, {name}]")
snapshot_dirs.append((dst_dir, name))
continue
print(f" Snapshotting {src_dir} -> {dst_dir}")
try:
if os.path.exists(dst_dir):
shutil.rmtree(dst_dir)
os.makedirs(dst_dir, exist_ok=True)
for item in os.listdir(src_dir):
src_item = os.path.join(src_dir, item)
dst_item = os.path.join(dst_dir, item)
if item == "compose.yml":
# Rewrite with injected network configuration
with open(src_item) as f:
compose_data = yaml.safe_load(f)
compose_data.setdefault("networks", {})
compose_data["networks"][network] = {"external": True}
compose_data["networks"][name] = {"name": name}
for svc_name, svc in (compose_data.get("services") or {}).items():
svc["networks"] = [network, name]
print(f" Injected networks into service '{svc_name}': "
f"[{network}, {name}]")
with open(dst_item, "w") as f:
yaml.dump(compose_data, f, default_flow_style=False, sort_keys=False)
print(f" Wrote {dst_item} (networks injected)")
elif item.endswith(".gpg"):
# Decrypt .gpg file → output without .gpg suffix
plain_name = item[:-4] # strip ".gpg"
dst_plain = os.path.join(dst_dir, plain_name)
print(f" Decrypting {item} -> {plain_name}")
subprocess.run(
["gpg", "--decrypt", "--batch", "--output", dst_plain, src_item],
check=True,
)
elif os.path.isdir(src_item):
shutil.copytree(src_item, dst_item)
else:
print("Build skipped due to dry run mode.")
shutil.copy2(src_item, dst_item)
snapshot_dirs.append((dst_dir, name))
print(f" Done snapshotting {name}")
except Exception as e:
err(f"ERR: Snapshot failed for {name}: {e}")
# ── Compose up in each snapshot ────────────────────────────────────────
if has_error:
print("Snapshot errors detected — skipping compose up.")
sys.exit(1)
if dry_run:
for dst_dir, name in snapshot_dirs:
print(f" [dry] would run: docker compose -f {dst_dir}/compose.yml "
f"up --remove-orphans --build --detach")
else:
for dst_dir, name in snapshot_dirs:
print(f" Starting services for {name} in {dst_dir}")
try:
subprocess.run(
["docker", "compose", "-f", f"{dst_dir}/compose.yml",
"up", "--remove-orphans", "--build", "--detach"],
cwd=dst_dir, check=True,
)
print(f"{name} started")
except subprocess.CalledProcessError as e:
err(f"ERR: docker compose up failed for {name}: {e}")
if has_error:
print("Errors during compose up. Exiting.")
sys.exit(1)