Talk to @BotFather.
/newbot, pick a name, copy the token. Takes 10 seconds, always has.
/newbot7824……:AAH…DgKeep your token secure — paste it into
.env. Drop-in telemetry for any Python service. Exceptions, slow calls, watchdog stalls, RSS / CPU / FD anomalies, and your own notify() calls — streamed to a Telegram chat you already have open. No DSN, no dashboards to host, no blocking calls in your hot path.
Snippets pulled from examples/ — real working scripts, not pseudo-code. Pick a case to see the setup and the verbatim alerts it produces.
import snitchbot
snitchbot.init("orders-api")
# Unhandled exceptions are captured automatically,
# including stack, thread, and origin.
async def list_orders(user_id: int) -> list:
return await svc.fetch_all(user_id)
# Somewhere down the stack:
# raise DatabaseConnectionError("refused to ...")
# -> snitchbot captures and sends the alert below.import asyncio
import time
import snitchbot
@snitchbot.watch_slow(threshold_ms=100)
async def fetch_user_profile(user_id: int) -> dict:
await asyncio.sleep(0.25) # 250 ms > threshold
return {"name": "Alice"}
@snitchbot.watch_slow(threshold_ms=500)
def generate_report() -> str:
time.sleep(0.6) # sync, also captured
return "report-data"
async def main():
snitchbot.init("slow-demo")
await fetch_user_profile(42)
generate_report()import snitchbot
from snitchbot import AnomalyConfig, WatchdogConfig
snitchbot.init("watchdog-demo")
# Zero-config: watchdog is on, threshold 500 ms,
# auto-escalates to error at 2 s, critical at 5 s.
# Full config with custom thresholds:
snitchbot.init(
"watchdog-demo",
anomaly=AnomalyConfig(
watchdog=WatchdogConfig(
threshold_ms=500, # 🟠 warning
error_threshold_ms=2000, # 🔴 error
critical_threshold_ms=5000, # 🟣 critical
escalation_window="1m",
cooldown_sec=5,
),
),
)import snitchbot
from snitchbot import (
AnomalyConfig,
RssAnomalyConfig,
CpuAnomalyConfig,
FdAnomalyConfig,
ThreadAnomalyConfig,
)
snitchbot.init(
"anomaly-demo",
sample_interval_sec=5,
anomaly=AnomalyConfig(
rss=RssAnomalyConfig(
duration="1m", baseline_duration="30m",
max_mb=450, # 🔴 ceiling
spike_ratio=1.5, # 🟠 +50% vs baseline
min_spike_mb=50, # and ≥ 50 MB absolute
),
cpu=CpuAnomalyConfig(
duration="2m", baseline_duration="20m",
max_percent=90, # 🔴 ceiling
spike_ratio=2.5, # 🟠 spike
min_spike_delta=30, # ≥ 30 pp
),
fds=FdAnomalyConfig(
max_fds=800, # 🔴 ulimit guard
spike_ratio=1.5, # 🔴 fd leak
drop_ratio=0.5, # 🟠 pool collapse
),
threads=ThreadAnomalyConfig(
max_threads=100,
spike_ratio=1.5,
),
),
)import snitchbot
snitchbot.init("orders-api")
# ▶ lifecycle("startup", reason="init") — sent immediately
# ...your service does its thing...
# On any of these paths, a shutdown event is emitted:
# · Clean exit -> reason="clean_exit", exit_code=0
# · SIGTERM / SIGINT -> reason="sigterm" / "sigint"
# · Uncaught crash -> reason="crash" (+ traceback)
# · Thread crash -> reason="thread_crash"
# Nothing to call, nothing to decorate.import snitchbot
snitchbot.init("notify-demo")
# Warning with extras — renders as a meta-table
snitchbot.notify(
"Starting checkout process",
severity="warning",
extras={"cart_size": 3, "user": "Alice"},
)
# Error with live traceback
try:
_ = 1 / 0
except ZeroDivisionError:
snitchbot.notify(
"Division failed in payment calculator",
severity="error",
exc_info=True,
)import asyncio
import snitchbot
@snitchbot.watch_slow(threshold_ms=100)
async def call_payment_api(amount: float) -> str:
await asyncio.sleep(0.2)
return "txn-12345"
async def handle_request(request_id: str, user_id: int):
with snitchbot.request_context(
trace_id=request_id,
user_id=user_id,
action="checkout",
):
snitchbot.notify(
"User started checkout",
extras={"cart_size": 3},
)
await call_payment_api(99.99) # inherits ctximport logging
import snitchbot
snitchbot.init("log-demo")
snitchbot.setup_logging() # WARNING+ -> Telegram
# Or, for structlog users:
# processor = snitchbot.setup_structlog()
# structlog.configure(processors=[..., processor])
logger = logging.getLogger("myapp")
# Extras become a meta-table in the alert
logger.warning(
"Cache miss rate too high",
extra={"miss_pct": 42},
)
# exc_info=True attaches the traceback
try:
_ = 1 / 0
except ZeroDivisionError:
logger.error("Calculation failed", exc_info=True)
# Inside request_context — trace_id attached automatically
with snitchbot.request_context(trace_id="req-abc-123"):
logger.warning("Slow DB query in checkout")import snitchbot
snitchbot.init("web-demo")
snitchbot.setup_logging()
# ── FastAPI ───────────────────────────────────
from fastapi import FastAPI
from snitchbot.integrations.fastapi import install
app = FastAPI()
install(app)
# ── Flask ─────────────────────────────────────
# from flask import Flask
# from snitchbot.integrations.flask import install
# app = Flask(__name__); install(app)
# ── Litestar ──────────────────────────────────
# from litestar import Litestar
# from snitchbot.integrations.litestar import install
# app = Litestar(route_handlers=[...]); install(app)
@app.post("/checkout")
async def checkout(cart_value: int = 100):
snitchbot.notify("Large checkout",
extras={"cart_value": cart_value})
return {"status": "processing"}
@app.get("/search")
async def search(query: str):
raise ValueError("Unknown search backend")a1b2c3 × 2app/db/pool.py:47 in acquire() conn = await self._pool.get() app/services/orders.py:88 in fetch_all() return await db.fetch(q) app/routes/orders.py:12 in list_orders() return await svc.fetch_all()
ee48d4f9c2a17c64977c6497 × 2732334Innocent-Worker · background_task worker.py:55 in background_task() Task-1 · main worker.py:97 in main()
a7af9c × 2c82b1491e7da44c2fbf966e32eec9cTraceback (most recent call last):
File "checkout.py", line 13, in main
_ = 1 / 0
ZeroDivisionError: division by zero156afeee48d4c1b4e8d9a3f1Traceback (most recent call last):
File "app.py", line 23, in <module>
_ = 1 / 0
ZeroDivisionError: division by zero7b2c8d5ec8a2bb9d31main.py:28 in search()
raise ValueError("Unknown search backend")snitchbot.init(). Hooks into sys.excepthook, threading.excepthook, and the asyncio exception handler. Works in main thread, worker threads, async tasks. Fork-safe.@snitchbot.watch_slow(threshold_ms=...). Fast path untouched — the alert fires only when duration exceeds the threshold. Works for sync functions too.threshold_ms is reported with every stuck task's stack. Multi-threshold severity: threshold_ms -> 🟠 warning, error_threshold_ms -> 🔴 error, critical_threshold_ms -> 🟣 critical. All defaults sensible — zero-config works, full-config unlocks the 3-tier escalation.ceiling (hard limit), spike (relative growth vs baseline), drop (relative decline). Windows and baselines are time-based ("15s", "1m", "1h"). The sidecar samples psutil every 5 s (tunable via sample_interval_sec). Turn any mode off by passing None — or skip the config entirely for sensible defaults.snitchbot.init() and registered atexit / signal handlers. You see startup, clean exits, graceful shutdowns (SIGTERM / SIGINT), and crashes — with pid, role (worker / standalone), reason, and exit code. Multiworker-aware: gunicorn / uvicorn workers get their own role suffix.snitchbot.notify(text, severity, extras, exc_info). Severity drives the icon (🟠/🔴/🟣) and rate-limit bucket. exc_info=True attaches the current traceback.with snitchbot.request_context(trace_id=..., **extras). Everything inside — notify(), @watch_slow, crash reports — inherits the context. Propagates across await, create_task, and nested calls. Frameworks (FastAPI / Flask / Litestar) set this automatically per request.snitchbot.setup_logging() — attaches a handler to Python's logging. WARNING+ records become notifications, keeping level, message, extras, and exc_info. For structlog, call snitchbot.setup_structlog() and add the returned processor to your chain. Inside a request_context, trace_id is attached automatically.install(app) from the matching integration module. Middleware attaches per-request context (http_method, path, client_ip, trace_id). 5xx errors auto-captured with safe headers and query params. Response gets an X-Snitchbot-Trace-Id header. Logging bridge picks up the same context inside request scope.
The sidecar samples psutil and keeps a rolling buffer. Ask for a chart in any window from 1m to 1d, pin a live dashboard that updates itself, or pull the full history as a CSV and drop it into your notebook.
/chart all 5m 12:17 CPU (5m) cur=99.2% min=0.0 max=100.2 100.20 ┼╮ ╭─╮ ╭─╮ ╭╮ ╭╮ ╭╮ ╭ 80.16 ┤│ │ │ │ │ ╭╯│ ╭╯│ ││ │ 60.12 ┤│ │ │ │ │ │ │ │ │ ╭╯│ ╭╯ 40.08 ┤│ │ │ │ │ │ │ │ ╰╮│ ╰╮│ 20.04 ┤│ │ │ │ ╰╮│ ╰╮│ ││ ││ 0.00 ┤╰─╯ ╰─╯ ╰╯ ╰╯ ╰╯ ╰╯ RSS (5m) cur=80.2MB min=43.5 max=83.5 83.52 ┤ 76.85 ┤ ╭─╮ ╭───╮ ╭───╮ ╭ 70.18 ┤╭╯ │ ╭╯ │ │ ╰╮ ╭╯ 63.51 ┼╯ │ │ │ ╭╯ │ │ 56.84 ┤ │ ╭╯ ╰─╯ │╭╯ 50.17 ┤ │ ╭╯ ╰╯ 43.50 ┤ ╰─╯ FDs (5m) cur=26 min=14 max=34 34.00 ┤ 30.67 ┤ ╭───────────╮ 27.33 ┤ │ │ 24.00 ┤ │ ╰─ 20.67 ┤ │ 17.33 ┼──────────╯ 14.00 ┤ Threads (5m) cur=12 min=6 max=13 13.00 ┤ 11.83 ┤╭──╮ ╭──╮ ╭╮ ╭─ 10.67 ┼╯ ╰╮ ╭╯ │ ╭╯╰╮ │ 9.50 ┤ │ │ ╰╮ ╭╯ │ ╭╯ 8.33 ┤ ╰─╯ ╰╮│ ╰╮ │ 7.17 ┤ ╰╯ ╰╮╭╯ 6.00 ┤ ╰╯12:17
/export 12:18 cpu · mem · fds · threads · all.1m · 5m · 15m · 1h · 6h · 1d./chart all 1h live_dashboard=True at init(). The sidecar pins one message to the chat and rewrites it every sample_interval_sec with fresh vitals, so it never clutters the history.
<service>_vitals.csv via Telegram's sendDocument. Columns: sampled_at, cpu_pct, rss_mb, fds, threads.
Your service stays thin. A detached sidecar carries the weight — HTTP, dedup, rate-limit, vitals, interactive commands. They talk over an AF_UNIX datagram socket in microseconds, so nothing you write ever waits on Telegram.
psutil. Rate-limit state needs memory. None of that belongs in your hot path — so we put it in its own process. If the sidecar dies, your app doesn't.
SOCK_DGRAM = kernel-level datagram. A send() takes microseconds, buffered atomically by the kernel. No TCP handshake, no TLS, no user-space queue. msgpack keeps the payload compact — pydantic, httpx, psutil stay out of your process.
getUpdates — commands come back in the response. Works behind NAT, inside Docker, on a laptop. No webhook, no reverse proxy, no TLS cert. /status /chart /export /mute all travel this way.
No account, no DSN, no dashboards. Three steps: a Telegram bot, a chat id, and one uv add. The rest is already wired.
/newbot, pick a name, copy the token. Takes 10 seconds, always has.
/newbot7824……:AAH…Dg.env. @userinfobot — it replies with your numeric id. For groups, add your new bot and use the group id instead.
/start1387261905.env, add one line to your Python — done. init() reads env vars and spawns the sidecar.
$ uv add snitchbot # .env SNITCHBOT_TOKEN="7824…:AAH…Dg" SNITCHBOT_CHAT_ID="1387261905" # app.py import snitchbot snitchbot.init("orders-api")