Analyze billing data
Goal: answer the questions a billing partner asks — where is the money, who is realizing
their rate, what work is driving spend. This generates ~180
synthetic billing entries, loads them into
kaos-tabular’s DuckDB engine, and runs the rollups. Deterministic, offline, no model.
uv run examples/analyze-billing.py180 entries | $370,892 fees | 727 hours | blended $510/hr
top matters by fees: snappy-tenancy-2026 $ 81,062 ( 152 hrs) ...realized rate by role: Partner $ 165,944 @ $ 903/hr Associate $ 156,158 @ $ 479/hr Paralegal $ 48,789 @ $ 224/hr#!/usr/bin/env -S uv run --script# /// script# requires-python = ">=3.13"# dependencies = ["kaos-names>=0.1.0a5,<0.2", "kaos-tabular>=0.1.0,<0.2"]# ///"""Analyze a firm's billing — on synthetic data generated for the demo.
This is the post-bill analytics use case. It generates ~180 synthetic billingline items (see generate-billing-data.py for the technique), loads them into`kaos-tabular`'s DuckDB engine, and runs the rollups a billing partner actuallywants: spend by matter, blended rate by role, and spend by UTBMS task code.Fully offline, deterministic, no model.
Run it:
uv run examples/analyze-billing.py"""
from __future__ import annotations
import csvimport hashlibimport randomimport tempfilefrom datetime import date, timedeltafrom pathlib import Path
import kaos_names as knfrom kaos_tabular.engine import TabularEngine
TIMEKEEPERS = [ ("Okafor", "Partner", 925), ("Nguyen", "Partner", 880), ("Alvarez", "Associate", 520), ("Brandt", "Associate", 470), ("Cohen", "Associate", 440), ("Devi", "Paralegal", 240), ("Eriksson", "Paralegal", 215),]TASKS = ["L120", "L160", "L210", "L240", "L320", "L330"]PRACTICE_AREAS = ["Litigation", "M&A", "Employment", "Intellectual Property", "Regulatory"]
def generate_billing_rows(seed: int = 42, n: int = 180) -> list[dict]: mrng = random.Random(seed) matters = [ {"matter": kn.generate_session_name(rng=mrng, number_min=2026, number_max=2026), "practice_area": mrng.choice(PRACTICE_AREAS)} for _ in range(6) ] base = date(2026, 1, 1) rows = [] for i in range(n): digest = hashlib.blake2b(f"{seed}:{i}".encode(), digest_size=8).digest() rng = random.Random(int.from_bytes(digest, "big")) matter = rng.choice(matters) name, role, rate = rng.choice(TIMEKEEPERS) hours = round(rng.uniform(0.3, 8.0), 1) rows.append({ "entry_id": f"E{i:04d}", "date": (base + timedelta(days=rng.randint(0, 89))).isoformat(), "matter": matter["matter"], "practice_area": matter["practice_area"], "timekeeper": name, "role": role, "task_code": rng.choice(TASKS), "hours": hours, "rate": rate, "amount": round(hours * rate, 2), }) return rows
def write_csv(rows: list[dict]) -> Path: path = Path(tempfile.mkdtemp()) / "billing.csv" with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=list(rows[0])) writer.writeheader() writer.writerows(rows) return path
def main(): rows = generate_billing_rows() engine = TabularEngine() engine.register_file(write_csv(rows), table_name="billing")
total_fees = sum(r["amount"] for r in rows) total_hours = sum(r["hours"] for r in rows) print(f"{len(rows)} entries | ${total_fees:,.0f} fees | " f"{total_hours:,.0f} hours | blended ${total_fees / total_hours:,.0f}/hr\n")
# Top matters by spend. by_matter = engine.aggregate( "billing", aggregates=[("sum", "amount", "fees"), ("sum", "hours", "hrs")], group_by=["matter"], order_by=[("fees", "desc")], limit=5, ) print("top matters by fees:") for matter, fees, hrs in by_matter.rows: print(f" {matter:24} ${fees:>10,.0f} ({hrs:>5,.0f} hrs)")
# Blended rate by role. by_role = engine.aggregate( "billing", aggregates=[("sum", "amount", "fees"), ("sum", "hours", "hrs")], group_by=["role"], order_by=[("fees", "desc")], ) print("\nrealized rate by role:") for role, fees, hrs in by_role.rows: print(f" {role:12} ${fees:>10,.0f} @ ${fees / hrs:>5,.0f}/hr")
# Spend by UTBMS task code. by_task = engine.aggregate( "billing", aggregates=[("sum", "amount", "fees")], group_by=["task_code"], order_by=[("fees", "desc")], ) print("\nspend by UTBMS task code:") for code, fees in by_task.rows: print(f" {code} ${fees:>10,.0f}")
return by_matter, by_role, by_task
if __name__ == "__main__": by_matter, by_role, by_task = main() # The rollups are well-formed and complete. assert by_matter.row_count == 5 assert {r[0] for r in by_role.rows} == {"Partner", "Associate", "Paralegal"} assert by_task.row_count == len(TASKS)What to notice
engine.register_file(csv, table_name=...)loads the data into DuckDB;engine.aggregate(...)runsGROUP BYrollups withaggregates=[("sum", "amount", "fees"), ...],group_by,order_by, andlimit— no SQL to hand-write (thoughengine.execute(sql)is there when you want it).- The same engine does
filter,pivot,join,top_k, andfind_duplicates— everything you’d reach to DuckDB for, over CSV/XLSX/JSON. See run SQL analytics. - Generated data, real analytics. The numbers are synthetic but the pipeline is exactly what you’d run on a LEDES export — swap the CSV for a real one and the rollups are unchanged.
- This is the post-bill analytics half of billing; the pre-bill half is UTBMS coding (classifying narratives to task codes).