feat(normalizer): generate structured tags from Schlagwort + Inhalt fields

Adds tags.py module implementing a three-outcome heuristic:
- Individual-to-individual correspondence tags ("Clara an Herbert") → dropped
- Group/collective correspondence ("Clara an Kinder", "Walter an Geschwister") → Briefwechsel/<value>
- Semantic/event tags ("Brautbriefe", "Alltag", "zur Hochzeit") → Themen/<value>

Three correspondence patterns detected: space-an-space, starts-with-"an ",
and abbreviated-sender form ("Maria W.an Clara").

COLLECTIVE_TERMS in config.py extended with 17 plural/group relational terms
(söhne, brüder, schwiegereltern, cousinen, etc.) confirmed against the full Excel.

Also adds two-phase summary mining: every run emits review/tag-candidates.csv;
subsequent runs apply keywords from overrides/approved-themes.csv as Themen tags.

Outputs: canonical-documents.xlsx gets pipe-separated "Parent/Child" tag paths;
canonical-tag-tree.xlsx provides the full tag hierarchy for backend pre-import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Marcel
2026-05-25 19:47:36 +02:00
parent 5efe3b8a7c
commit 94a40237f4
9 changed files with 405 additions and 6 deletions

View File

@@ -116,6 +116,10 @@ RELATIONAL_TERMS = {
COLLECTIVE_TERMS = { COLLECTIVE_TERMS = {
"familie", "fam", "kinder", "eltern", "geschwister", "großeltern", "familie", "fam", "kinder", "eltern", "geschwister", "großeltern",
"grosseltern", "alle", "diverse", "div", "gebrüder", "gebr", "grosseltern", "alle", "diverse", "div", "gebrüder", "gebr",
# Plural/group relational terms — added for tag generation heuristic
"söhne", "töchter", "brüder", "schwestern", "schwiegereltern",
"vettern", "kusinen", "cousinen", "nichten", "neffen", "tanten",
"freunde", "bekannte", "geschw", "enkelkinder", "jungens", "verwandten",
} }
# Markers of an unknown/illegible name (the literal "?" is handled separately in code). # Markers of an unknown/illegible name (the literal "?" is handled separately in code).
# All long enough to be safe as SUBSTRING matches — do NOT add short tokens like "nn" # All long enough to be safe as SUBSTRING matches — do NOT add short tokens like "nn"

View File

@@ -3,6 +3,7 @@ from dataclasses import dataclass, field
from enum import Enum, auto from enum import Enum, auto
import dates as _dates import dates as _dates
import tags as _tags
class Triage(Enum): class Triage(Enum):
@@ -88,7 +89,7 @@ def index_file_mismatch(index: str, file_path: str) -> bool:
return stem != index return stem != index
def to_canonical(raw, ctx, date_overrides: dict) -> CanonicalDocument: def to_canonical(raw, ctx, date_overrides: dict, approved_themes: frozenset = frozenset()) -> CanonicalDocument:
pd = _dates.parse_date(raw.date, date_overrides) pd = _dates.parse_date(raw.date, date_overrides)
flags = [] flags = []
@@ -113,6 +114,6 @@ def to_canonical(raw, ctx, date_overrides: dict) -> CanonicalDocument:
receiver_person_ids=[r[0] for r in receivers], receiver_person_ids=[r[0] for r in receivers],
receiver_names=[r[1] for r in receivers], receiver_names=[r[1] for r in receivers],
date_iso=pd.iso or "", date_raw=raw.date, date_precision=str(pd.precision), date_iso=pd.iso or "", date_raw=raw.date, date_precision=str(pd.precision),
location=raw.location, tags=[raw.tags] if raw.tags else [], summary=raw.summary, location=raw.location, tags=_tags.generate_tags(raw.tags, raw.summary, approved_themes), summary=raw.summary,
source_row=raw.source_row, needs_review=flags, source_row=raw.source_row, needs_review=flags,
) )

View File

@@ -8,13 +8,17 @@ import ingest
import persons import persons
import documents import documents
import overrides as overrides_mod import overrides as overrides_mod
import tags as _tags
import writers import writers
def run(*, document_workbook, document_sheet, person_workbook, person_sheet, def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
out_dir, review_dir, date_overrides, name_overrides) -> dict: out_dir, review_dir, date_overrides, name_overrides,
approved_themes_path=None) -> dict:
out_dir, review_dir = Path(out_dir), Path(review_dir) out_dir, review_dir = Path(out_dir), Path(review_dir)
approved_themes = _tags.load_approved_themes(Path(approved_themes_path)) if approved_themes_path else set()
# --- persons --- # --- persons ---
person_rows = ingest.read_sheet(person_workbook, person_sheet) person_rows = ingest.read_sheet(person_workbook, person_sheet)
p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS) p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
@@ -52,7 +56,7 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
seen_index[raw.index] += 1 seen_index[raw.index] += 1
if raw.date.strip() and raw.date.strip() in date_overrides: if raw.date.strip() and raw.date.strip() in date_overrides:
dates_by_override += 1 dates_by_override += 1
doc = documents.to_canonical(raw, ctx, date_overrides) doc = documents.to_canonical(raw, ctx, date_overrides, frozenset(approved_themes))
if "unparsed_date" in doc.needs_review: if "unparsed_date" in doc.needs_review:
unparsed_by_raw.setdefault(raw.date, []).append(source_row) unparsed_by_raw.setdefault(raw.date, []).append(source_row)
if "index_file_mismatch" in doc.needs_review: if "index_file_mismatch" in doc.needs_review:
@@ -74,6 +78,9 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx") writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx")
writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx") writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx")
all_tag_paths = [path for doc in canon_docs for path in doc.tags]
writers.write_tag_tree_xlsx(_tags.build_tag_tree(all_tag_paths), out_dir / "canonical-tag-tree.xlsx")
# --- review files --- # --- review files ---
# unparsed dates: most-frequent first, with example source rows + blank override cells so a # unparsed dates: most-frequent first, with example source rows + blank override cells so a
# corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape). # corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape).
@@ -97,6 +104,11 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
["category", "raw", "count", "example_rows"], unresolved_rows) ["category", "raw", "count", "example_rows"], unresolved_rows)
writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches) writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches)
all_summaries = [doc.summary for doc in canon_docs if doc.summary]
candidates = _tags.mine_summary_candidates(all_summaries)
writers.write_review_csv(review_dir / "tag-candidates.csv", ["candidate", "count"],
[[c, n] for c, n in candidates])
dated = sum(1 for d in canon_docs if d.date_raw.strip()) dated = sum(1 for d in canon_docs if d.date_raw.strip())
unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN") unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN")
unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%" unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%"
@@ -148,7 +160,8 @@ def main():
document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET, document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET,
person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET, person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET,
out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR, out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR,
date_overrides=date_overrides, name_overrides=name_overrides) date_overrides=date_overrides, name_overrides=name_overrides,
approved_themes_path=config.OVERRIDES_DIR / "approved-themes.csv")
print("Normalization complete:") print("Normalization complete:")
for k, v in stats.items(): for k, v in stats.items():
print(f" {k}: {v}") print(f" {k}: {v}")

View File

@@ -0,0 +1 @@
candidate
1 candidate

View File

@@ -0,0 +1,119 @@
import csv
import re
from collections import Counter
from pathlib import Path
import config
_COLLECTIVE = config.COLLECTIVE_TERMS
_GERMAN_STOP_WORDS = {
"der", "die", "das", "ein", "eine", "einer", "einen", "einem", "eines",
"und", "oder", "aber", "an", "in", "auf", "für", "mit", "von", "zu",
"bei", "nach", "vor", "aus", "ist", "sind", "war", "waren", "hat",
"haben", "wird", "werden", "ich", "du", "er", "sie", "es", "wir",
"ihr", "ihn", "ihm", "ihnen", "mich", "mir", "dich", "dir",
"ihre", "ihren", "seinem", "seinen", "seiner", "seine",
"auch", "nicht", "noch", "dann", "durch", "dem", "den",
"des", "als", "wie", "dass", "um", "über", "unter", "zwischen",
"all", "alle", "was", "wer", "wo", "wann", "welche", "welcher",
"mehr", "sehr", "nur", "schon", "dabei", "dazu",
"bis", "seit", "gegen", "ohne", "doch", "wenn", "weil",
"ob", "so", "da", "dort", "hier", "nun", "ja", "nein",
"ihrer", "ihrem",
# Contracted prepositions common in German Inhalt summaries
"im", "am", "ans", "ins", "zum", "zur", "vom", "beim", "sich",
"hat", "hatte", "wird", "wurde", "wurden", "worden",
"kann", "konnte", "soll", "sollte", "will", "wollte",
"ihm", "dieses", "dieser", "diesem", "diesen",
}
def _is_correspondence(raw: str) -> bool:
lower = raw.lower()
return " an " in lower or lower.startswith("an ") or ".an " in lower
def _tokenize(text: str) -> list[str]:
return [t.lower() for t in re.findall(r"[a-zA-ZäöüÄÖÜß]+", text)]
def _has_collective(tokens: list[str]) -> bool:
return any(t in _COLLECTIVE for t in tokens)
def classify_schlagwort(raw: str) -> list[str]:
if not raw or not raw.strip():
return []
if not _is_correspondence(raw):
return [f"Themen/{raw}"]
if _has_collective(_tokenize(raw)):
return [f"Briefwechsel/{raw}"]
return []
def mine_summary_candidates(summaries: list[str]) -> list[tuple[str, int]]:
counter: Counter = Counter()
for summary in summaries:
for token in re.split(r"[,;\s]+", summary.lower()):
token = re.sub(r"[^a-zA-ZäöüÄÖÜß]", "", token)
if len(token) >= 2 and token not in _GERMAN_STOP_WORDS:
counter[token] += 1
return counter.most_common()
def load_approved_themes(path: Path) -> set[str]:
if not path.exists():
return set()
themes: set[str] = set()
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
if row.get("candidate"):
themes.add(row["candidate"].strip().lower())
return themes
def apply_approved_themes(summary: str, themes: set[str]) -> list[str]:
lower = summary.lower()
return [
f"Themen/{theme}"
for theme in themes
if re.search(r"\b" + re.escape(theme) + r"\b", lower)
]
def generate_tags(schlagwort: str, summary: str, themes: set[str]) -> list[str]:
result = classify_schlagwort(schlagwort or "")
if summary and themes:
result = result + apply_approved_themes(summary, themes)
return result
def encode_tags(tag_list: list[str]) -> str:
return "|".join(tag_list)
def build_tag_tree(all_tag_paths: list[str]) -> list[dict]:
unique_paths = list(dict.fromkeys(all_tag_paths))
roots: dict[str, None] = {}
children: dict[str, tuple[str, str]] = {}
for path in unique_paths:
if "/" in path:
parent, child = path.split("/", 1)
roots[parent] = None
children[path] = (parent, child)
else:
roots[path] = None
rows: list[dict] = []
seen: set[str] = set()
for root in roots:
if root not in seen:
rows.append({"tag_path": root, "parent_name": "", "tag_name": root})
seen.add(root)
for path, (parent, child) in children.items():
if path not in seen:
rows.append({"tag_path": path, "parent_name": parent, "tag_name": child})
seen.add(path)
return rows

View File

@@ -51,7 +51,7 @@ def test_to_canonical_resolves_and_flags():
assert doc.sender_person_id == "de-gruyter-walter" assert doc.sender_person_id == "de-gruyter-walter"
assert doc.receiver_person_ids == ["de-gruyter-eugenie"] # matched via maiden alias assert doc.receiver_person_ids == ["de-gruyter-eugenie"] # matched via maiden alias
assert doc.date_iso == "1888-02-15" and doc.date_precision == "DAY" assert doc.date_iso == "1888-02-15" and doc.date_precision == "DAY"
assert doc.tags == ["Brautbriefe"] assert doc.tags == ["Themen/Brautbriefe"]
assert doc.needs_review == [] assert doc.needs_review == []
def test_to_canonical_unmatched_and_unparsed(): def test_to_canonical_unmatched_and_unparsed():

View File

@@ -62,3 +62,60 @@ def test_run_end_to_end(tmp_path):
assert _matrix(out_dir / "canonical-persons.xlsx") == persons1 assert _matrix(out_dir / "canonical-persons.xlsx") == persons1
assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1 assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1
assert len(docs1) == 4 # header + 3 docs assert len(docs1) == 4 # header + 3 docs
def test_tag_tree_output_emitted(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
assert (out_dir / "canonical-tag-tree.xlsx").exists()
def test_tag_candidates_review_emitted(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
assert (review_dir / "tag-candidates.csv").exists()
text = (review_dir / "tag-candidates.csv").read_text(encoding="utf-8")
assert "candidate" in text and "count" in text
def test_schlagwort_encoded_as_themen_in_documents(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
wb = openpyxl.load_workbook(out_dir / "canonical-documents.xlsx")
ws = wb.active
header = [c.value for c in ws[1]]
tag_col = header.index("tags")
tag_values = [ws.cell(row=r, column=tag_col + 1).value for r in range(2, ws.max_row + 1)]
assert any(v and "Themen/Brautbriefe" in v for v in tag_values)
assert not any(v and v.strip() == "Brautbriefe" for v in tag_values)
def test_approved_themes_applied(tmp_path):
themes_file = tmp_path / "approved-themes.csv"
themes_file.write_text("candidate\ngeschäftsreise\n", encoding="utf-8")
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={},
approved_themes_path=themes_file)
wb = openpyxl.load_workbook(out_dir / "canonical-documents.xlsx")
ws = wb.active
header = [c.value for c in ws[1]]
tag_col = header.index("tags")
tag_values = [ws.cell(row=r, column=tag_col + 1).value for r in range(2, ws.max_row + 1)]
# W-0001 has Inhalt "Geschäftsreise" — should get an extra Themen/geschäftsreise tag
assert any(v and "Themen/geschäftsreise" in v for v in tag_values)

View File

@@ -0,0 +1,191 @@
import tags
# --- classify_schlagwort ---
def test_semantic_tag_kept_as_themen():
assert tags.classify_schlagwort("Brautbriefe") == ["Themen/Brautbriefe"]
def test_everyday_tag_kept_as_themen():
assert tags.classify_schlagwort("Alltag in Ruhrort") == ["Themen/Alltag in Ruhrort"]
def test_event_tag_kept_as_themen():
assert tags.classify_schlagwort("zur Hochzeit") == ["Themen/zur Hochzeit"]
def test_individual_correspondence_dropped():
assert tags.classify_schlagwort("Clara an Herbert") == []
def test_individual_correspondence_with_year_dropped():
assert tags.classify_schlagwort("Herbert an Clara 1918") == []
def test_individual_with_role_dropped():
assert tags.classify_schlagwort("Vater Juan an Herbert") == []
def test_relational_receiver_dropped():
assert tags.classify_schlagwort("Clara an ihre Mutter") == []
def test_group_receiver_kinder_kept_as_briefwechsel():
assert tags.classify_schlagwort("Clara an Kinder") == ["Briefwechsel/Clara an Kinder"]
def test_group_receiver_eltern_kept():
assert tags.classify_schlagwort("Herbert an seine Eltern") == ["Briefwechsel/Herbert an seine Eltern"]
def test_group_receiver_geschwister_kept():
assert tags.classify_schlagwort("Walter an Geschwister") == ["Briefwechsel/Walter an Geschwister"]
def test_group_receiver_schwiegereltern_kept():
assert tags.classify_schlagwort("Clara an Schwiegereltern") == ["Briefwechsel/Clara an Schwiegereltern"]
def test_group_receiver_soehne_kept():
assert tags.classify_schlagwort("Mutter Cram an ihre Söhne") == ["Briefwechsel/Mutter Cram an ihre Söhne"]
def test_group_receiver_brueder_kept():
assert tags.classify_schlagwort("Hans an Brüder") == ["Briefwechsel/Hans an Brüder"]
def test_group_receiver_cousinen_kept():
assert tags.classify_schlagwort("Clara an Cousinen in Göttingen") == ["Briefwechsel/Clara an Cousinen in Göttingen"]
def test_group_receiver_freunde_kept():
assert tags.classify_schlagwort("Freunde an Herbert") == ["Briefwechsel/Freunde an Herbert"]
def test_group_sender_geschwister_kept():
# collective on the LEFT side of "an"
assert tags.classify_schlagwort("Geschwister Cram an Herbert") == ["Briefwechsel/Geschwister Cram an Herbert"]
def test_receiver_only_individual_dropped():
# starts with "an " — single individual receiver
assert tags.classify_schlagwort("an Walter de Gruyter") == []
def test_receiver_only_group_kept():
# starts with "an " — collective receiver
assert tags.classify_schlagwort("an ihre Geschwister") == ["Briefwechsel/an ihre Geschwister"]
def test_abbreviated_sender_individual_dropped():
# "Maria W.an Clara" — abbreviated name + ".an"
assert tags.classify_schlagwort("Maria W.an Clara") == []
def test_abbreviated_sender_group_kept():
assert tags.classify_schlagwort("Eugenie sen.an Kinder") == ["Briefwechsel/Eugenie sen.an Kinder"]
def test_empty_schlagwort_returns_empty():
assert tags.classify_schlagwort("") == []
def test_einzelkinder_kept():
assert tags.classify_schlagwort("Enkelkinder an Clara") == ["Briefwechsel/Enkelkinder an Clara"]
def test_geschw_abbreviation_kept():
# "Geschw." abbreviation for Geschwister — appears after "u" in receiver side
assert tags.classify_schlagwort("Bruder Hans an Herbert u Geschw.") == ["Briefwechsel/Bruder Hans an Herbert u Geschw."]
# --- mine_summary_candidates ---
def test_mine_candidates_counts_words():
summaries = ["Reise, Hochzeit", "Reise", "Krieg"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert candidates["reise"] == 2
assert candidates["hochzeit"] == 1
assert candidates["krieg"] == 1
def test_mine_candidates_filters_stop_words():
summaries = ["und die Reise", "das ist eine Reise"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert "reise" in candidates
assert "und" not in candidates
assert "die" not in candidates
assert "das" not in candidates
assert "ist" not in candidates
assert "eine" not in candidates
def test_mine_candidates_filters_contracted_prepositions():
# im=in+dem, zum=zu+dem, zur=zu+der, vom=von+dem, sich, am, beim
summaries = ["im Sommer zum Besuch, zur Hochzeit vom Vater, sich gefreut am Morgen beim Fest"]
candidates = dict(tags.mine_summary_candidates(summaries))
for stop in ("im", "zum", "zur", "vom", "sich", "am", "beim", "ans"):
assert stop not in candidates, f"stop word '{stop}' leaked through"
assert "besuch" in candidates
assert "hochzeit" in candidates
def test_mine_candidates_filters_single_chars():
summaries = ["x Reise y"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert "x" not in candidates
assert "y" not in candidates
def test_mine_candidates_sorted_descending():
summaries = ["Reise", "Reise", "Hochzeit", "Reise", "Hochzeit", "Krieg"]
result = tags.mine_summary_candidates(summaries)
counts = [count for _, count in result]
assert counts == sorted(counts, reverse=True)
def test_mine_candidates_empty_summaries():
assert tags.mine_summary_candidates([]) == []
assert tags.mine_summary_candidates([""]) == []
# --- load_approved_themes and apply_approved_themes ---
def test_apply_themes_match_found(tmp_path):
themes = {"reise", "hochzeit"}
result = tags.apply_approved_themes("Reise nach Berlin", themes)
assert "Themen/reise" in result
def test_apply_themes_case_insensitive(tmp_path):
themes = {"reise"}
result = tags.apply_approved_themes("REISE", themes)
assert "Themen/reise" in result
def test_apply_themes_no_match(tmp_path):
themes = {"krieg"}
result = tags.apply_approved_themes("Alltag in Ruhrort", themes)
assert result == []
def test_apply_themes_multiple_matches():
themes = {"reise", "hochzeit"}
result = tags.apply_approved_themes("Reise zur Hochzeit", themes)
assert len(result) == 2
assert "Themen/reise" in result
assert "Themen/hochzeit" in result
# --- encode_tags ---
def test_encode_tags_single():
assert tags.encode_tags(["Themen/Brautbriefe"]) == "Themen/Brautbriefe"
def test_encode_tags_multiple():
result = tags.encode_tags(["Themen/Brautbriefe", "Briefwechsel/Clara an Kinder"])
assert result == "Themen/Brautbriefe|Briefwechsel/Clara an Kinder"
def test_encode_tags_empty():
assert tags.encode_tags([]) == ""
# --- build_tag_tree ---
def test_build_tag_tree_includes_roots():
paths = ["Themen/Brautbriefe", "Briefwechsel/Clara an Kinder"]
tree = tags.build_tag_tree(paths)
tag_paths = [row["tag_path"] for row in tree]
assert "Themen" in tag_paths
assert "Briefwechsel" in tag_paths
def test_build_tag_tree_includes_children():
paths = ["Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
child = next(r for r in tree if r["tag_path"] == "Themen/Brautbriefe")
assert child["parent_name"] == "Themen"
assert child["tag_name"] == "Brautbriefe"
def test_build_tag_tree_root_has_empty_parent():
paths = ["Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
root = next(r for r in tree if r["tag_path"] == "Themen")
assert root["parent_name"] == ""
assert root["tag_name"] == "Themen"
def test_build_tag_tree_no_duplicates():
paths = ["Themen/Brautbriefe", "Themen/Alltag", "Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
tag_paths = [row["tag_path"] for row in tree]
assert len(tag_paths) == len(set(tag_paths))

View File

@@ -47,6 +47,19 @@ def write_documents_xlsx(docs, path: Path):
_write_xlsx(docs, DOC_COLUMNS, path) _write_xlsx(docs, DOC_COLUMNS, path)
def write_tag_tree_xlsx(tree: list[dict], path: Path):
columns = ["tag_path", "parent_name", "tag_name"]
wb = openpyxl.Workbook()
ws = wb.active
ws.append(columns)
for row in tree:
ws.append([row.get(col, "") for col in columns])
wb.properties.created = _FIXED_TS
wb.properties.modified = _FIXED_TS
Path(path).parent.mkdir(parents=True, exist_ok=True)
wb.save(path)
def write_persons_xlsx(people, path: Path): def write_persons_xlsx(people, path: Path):
_write_xlsx(people, PERSON_COLUMNS, path) _write_xlsx(people, PERSON_COLUMNS, path)