ncdu on files to back up

I use borg and restic to backup files in my system. Sometimes I run a huge download or clone a large git repo and forget to mark it with CACHEDIR.TAG, and it gets picked up slowing the backup process and wasting backup space uselessly.

I would like to occasionally audit the system to have an idea of what is a candidate for backup. ncdu would be great for this, but it doesn't know about backup exclusion filters.

Let's teach it then.

Here's a script that simulates a backup and feeds the results to ncdu:

#!/usr/bin/python3

import argparse
import os
import sys
import time
import stat
import json
import subprocess
import tempfile
from pathlib import Path
from typing import Any

FILTER_ARGS = [
    "--one-file-system",
    "--exclude-caches",
    "--exclude",
    "*/.cache",
]
BACKUP_PATHS = [
    "/home",
]


class Dir:
    """
    Dispatch borg output into a hierarchical directory structure.

    borg prints a flat file list, ncdu needs a hierarchical JSON.
    """

    def __init__(self, path: Path, name: str):
        self.path = path
        self.name = name
        self.subdirs: dict[str, "Dir"] = {}
        self.files: list[str] = []

    def print(self, indent: str = "") -> None:
        for name, subdir in self.subdirs.items():
            print(f"{indent}{name:}/")
            subdir.print(indent + " ")
        for name in self.files:
            print(f"{indent}{name}")

    def add(self, parts: tuple[str, ...]) -> None:
        if len(parts) == 1:
            self.files.append(parts[0])
            return

        subdir = self.subdirs.get(parts[0])
        if subdir is None:
            subdir = Dir(self.path / parts[0], parts[0])
            self.subdirs[parts[0]] = subdir

        subdir.add(parts[1:])

    def to_data(self) -> list[Any]:
        res: list[Any] = []
        st = self.path.stat()
        res.append(self.collect_stat(self.name, st))
        for name, subdir in self.subdirs.items():
            res.append(subdir.to_data())

        dir_fd = os.open(self.path, os.O_DIRECTORY)
        try:
            for name in self.files:
                try:
                    st = os.lstat(name, dir_fd=dir_fd)
                except FileNotFoundError:
                    print(
                        "Possibly broken encoding:",
                        self.path,
                        repr(name),
                        file=sys.stderr,
                    )
                    continue
                if stat.S_ISDIR(st.st_mode):
                    continue
                res.append(self.collect_stat(name, st))
        finally:
            os.close(dir_fd)

        return res

    def collect_stat(self, fname: str, st) -> dict[str, Any]:
        res = {
            "name": fname,
            "ino": st.st_ino,
            "asize": st.st_size,
            "dsize": st.st_blocks * 512,
        }
        if stat.S_ISDIR(st.st_mode):
            res["dev"] = st.st_dev
        return res


class Scanner:
    def __init__(self) -> None:
        self.root = Dir(Path("/"), "/")
        self.data = None

    def scan(self) -> None:
        with tempfile.TemporaryDirectory() as tmpdir_name:
            mock_backup_dir = Path(tmpdir_name) / "backup"
            subprocess.run(
                ["borg", "init", mock_backup_dir.as_posix(), "--encryption", "none"],
                cwd=Path.home(),
                check=True,
            )

            proc = subprocess.Popen(
                [
                    "borg",
                    "create",
                    "--list",
                    "--dry-run",
                ]
                + FILTER_ARGS
                + [
                    f"{mock_backup_dir}::test",
                ]
                + BACKUP_PATHS,
                cwd=Path.home(),
                stderr=subprocess.PIPE,
            )
            assert proc.stderr is not None
            for line in proc.stderr:
                match line[0:2]:
                    case b"- ":
                        path = Path(line[2:].strip().decode())
                    case b"x ":
                        continue
                    case _:
                        raise RuntimeError(f"Unparsable borg output: {line!r}")

                if path.parts[0] != "/":
                    raise RuntimeError(f"Unsupported path: {path.parts!r}")
                self.root.add(path.parts[1:])

    def to_json(self) -> list[Any]:
        return [
            1,
            0,
            {
                "progname": "backup-ncdu",
                "progver": "0.1",
                "timestamp": int(time.time()),
            },
            self.root.to_data(),
        ]

    def export(self):
        return json.dumps(self.to_json()).encode()


def main():
    parser = argparse.ArgumentParser(
        description="Run ncdu to estimate sizes of files to backup."
    )
    parser.parse_args()

    scanner = Scanner()
    scanner.scan()
    # scanner.root.print()
    res = subprocess.run(["ncdu", "-f-"], input=scanner.export())
    sys.exit(res.returncode)


if __name__ == "__main__":
    main()