homelab/services/lebedev-podcast-rss/rss_bot.py

236 lines
7.2 KiB
Python

import os
import time
import json
from pathlib import Path
from datetime import datetime, timedelta, timezone
from email.utils import format_datetime
import logging
import yt_dlp
from feedgen.feed import FeedGenerator
# -------------------------
# Config
# -------------------------
VK_PLAYLIST_URL = os.environ["VK_PLAYLIST_URL"]
DOMAIN = os.environ["DOMAIN"].rstrip("/")
FEED_FILE = os.environ["FEED_FILE"]
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "300"))
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "14"))
DATA_DIR = Path("/data")
AUDIO_DIR = DATA_DIR / "audio"
RSS_DIR = DATA_DIR / "rss"
STATE_FILE = DATA_DIR / "state/rss.json"
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
RSS_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
# -------------------------
# Logging
# -------------------------
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("vk-podcast-rss")
def load_state():
if not STATE_FILE.exists():
return None
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except Exception as e:
log.exception("Failed to read state file %s: %s", STATE_FILE, e)
return None
def save_state(state):
try:
STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
log.info("State saved: %s (known=%d, items=%d)", STATE_FILE, len(state.get("known", [])), len(state.get("items", [])))
except Exception as e:
log.exception("Failed to save state file %s: %s", STATE_FILE, e)
raise
def get_playlist():
log.info("Fetching playlist: %s", VK_PLAYLIST_URL)
ydl_opts = {"extract_flat": True, "quiet": True}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(VK_PLAYLIST_URL, download=False)
entries = info.get("entries", []) or []
videos = {e["id"]: e["url"] for e in entries if e}
log.info("Playlist fetched: %d entries", len(videos))
return videos
def download_audio(video_url: str):
"""
Downloads audio and converts to mp3.
Filename is video id: <id>.mp3 (ASCII-safe; best for Apple Podcasts)
"""
log.info("Downloading audio from: %s", video_url)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": str(AUDIO_DIR / "%(id)s.%(ext)s"),
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}],
"quiet": True,
"noplaylist": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
vid = info["id"]
title = info.get("title", vid)
mp3_path = AUDIO_DIR / f"{vid}.mp3"
if not mp3_path.exists():
raise FileNotFoundError(f"Expected mp3 not found after download: {mp3_path}")
size = mp3_path.stat().st_size
log.info("Downloaded: %s (%d bytes) title=%r", mp3_path.name, size, title)
return mp3_path, title, vid
def build_feed(items):
out_path = RSS_DIR / FEED_FILE
log.info("Generating RSS feed: %s (items=%d)", out_path, len(items))
fg = FeedGenerator()
fg.title("TrueNews Podcast")
fg.link(href=DOMAIN, rel="alternate")
fg.description("Pvlx personal podcast feed")
fg.language("ru")
# Optional but nice
fg.lastBuildDate(datetime.now(timezone.utc))
for item in items:
fe = fg.add_entry()
fe.title(item["title"])
fe.guid(item["guid"], permalink=False)
fe.pubDate(item["pubDate"])
fe.enclosure(
url=f"{DOMAIN}/audio/{item['filename']}",
length=str(item["size"]),
type="audio/mpeg"
)
fg.rss_file(out_path)
log.info("RSS written: %s (size=%d bytes)", out_path, out_path.stat().st_size if out_path.exists() else -1)
def prune_old(items):
cutoff = datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)
log.info("Pruning episodes older than %d days (cutoff=%s)", RETENTION_DAYS, cutoff.isoformat())
kept = []
removed = 0
deleted_files = 0
for item in items:
try:
dt = datetime.fromisoformat(item["iso"])
# if stored without tz, assume UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
except Exception:
# If parsing fails, keep it to avoid accidental deletion
kept.append(item)
continue
if dt > cutoff:
kept.append(item)
continue
removed += 1
path = AUDIO_DIR / item["filename"]
if path.exists():
try:
path.unlink()
deleted_files += 1
log.info("Deleted old audio: %s", path.name)
except Exception as e:
log.warning("Failed to delete %s: %s", path, e)
log.info("Prune done: removed=%d items, deleted_files=%d, kept=%d", removed, deleted_files, len(kept))
return kept
def first_run_index_only():
"""
First run behavior: create state with all current IDs; download nothing.
"""
playlist = get_playlist()
state = {"known": list(playlist.keys()), "items": []}
save_state(state)
build_feed([]) # create empty feed file so URL exists immediately
log.info("First run complete: indexed %d existing videos; no downloads.", len(playlist))
def main_loop():
log.info("RSS bot starting. domain=%s feed=%s interval=%ss retention=%sd", DOMAIN, FEED_FILE, CHECK_INTERVAL, RETENTION_DAYS)
log.info("Paths: data=%s audio=%s rss=%s state=%s", DATA_DIR, AUDIO_DIR, RSS_DIR, STATE_FILE)
state = load_state()
if state is None:
log.warning("State file not found. Performing first-run indexing only.")
first_run_index_only()
state = load_state() or {"known": [], "items": []}
known = set(state.get("known", []))
items = state.get("items", [])
while True:
try:
log.info("Tick: checking playlist...")
playlist = get_playlist()
new_ids = [vid for vid in playlist.keys() if vid not in known]
log.info("New videos found: %d", len(new_ids))
for vid in new_ids:
url = playlist[vid]
mp3_path, title, guid = download_audio(url)
size = mp3_path.stat().st_size
now = datetime.now(timezone.utc)
item = {
"title": title,
"guid": guid, # stable GUID = VK id
"filename": mp3_path.name, # <id>.mp3
"size": size,
"pubDate": format_datetime(now),
"iso": now.isoformat(),
}
items.insert(0, item)
known.add(vid)
log.info("Added episode: guid=%s file=%s", guid, mp3_path.name)
# retention + feed + state
items = prune_old(items)
build_feed(items)
save_state({"known": list(known), "items": items})
log.info("Sleep %ds...", CHECK_INTERVAL)
except Exception as e:
log.exception("Loop error: %s", e)
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main_loop()