mirror of
https://github.com/pvlnes/homelab.git
synced 2026-04-05 16:01:45 +00:00
236 lines
7.2 KiB
Python
236 lines
7.2 KiB
Python
import os
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta, timezone
|
|
from email.utils import format_datetime
|
|
import logging
|
|
import yt_dlp
|
|
from feedgen.feed import FeedGenerator
|
|
|
|
# -------------------------
|
|
# Config
|
|
# -------------------------
|
|
VK_PLAYLIST_URL = os.environ["VK_PLAYLIST_URL"]
|
|
DOMAIN = os.environ["DOMAIN"].rstrip("/")
|
|
FEED_FILE = os.environ["FEED_FILE"]
|
|
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "300"))
|
|
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "14"))
|
|
|
|
DATA_DIR = Path("/data")
|
|
AUDIO_DIR = DATA_DIR / "audio"
|
|
RSS_DIR = DATA_DIR / "rss"
|
|
STATE_FILE = DATA_DIR / "state/rss.json"
|
|
|
|
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
|
|
RSS_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# -------------------------
|
|
# Logging
|
|
# -------------------------
|
|
logging.basicConfig(
|
|
level=os.getenv("LOG_LEVEL", "INFO").upper(),
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
log = logging.getLogger("vk-podcast-rss")
|
|
|
|
|
|
def load_state():
|
|
if not STATE_FILE.exists():
|
|
return None
|
|
try:
|
|
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
log.exception("Failed to read state file %s: %s", STATE_FILE, e)
|
|
return None
|
|
|
|
|
|
def save_state(state):
|
|
try:
|
|
STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
log.info("State saved: %s (known=%d, items=%d)", STATE_FILE, len(state.get("known", [])), len(state.get("items", [])))
|
|
except Exception as e:
|
|
log.exception("Failed to save state file %s: %s", STATE_FILE, e)
|
|
raise
|
|
|
|
|
|
def get_playlist():
|
|
log.info("Fetching playlist: %s", VK_PLAYLIST_URL)
|
|
ydl_opts = {"extract_flat": True, "quiet": True}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(VK_PLAYLIST_URL, download=False)
|
|
|
|
entries = info.get("entries", []) or []
|
|
videos = {e["id"]: e["url"] for e in entries if e}
|
|
log.info("Playlist fetched: %d entries", len(videos))
|
|
return videos
|
|
|
|
|
|
def download_audio(video_url: str):
|
|
"""
|
|
Downloads audio and converts to mp3.
|
|
Filename is video id: <id>.mp3 (ASCII-safe; best for Apple Podcasts)
|
|
"""
|
|
log.info("Downloading audio from: %s", video_url)
|
|
|
|
ydl_opts = {
|
|
"format": "bestaudio/best",
|
|
"outtmpl": str(AUDIO_DIR / "%(id)s.%(ext)s"),
|
|
"postprocessors": [{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
"preferredquality": "192",
|
|
}],
|
|
"quiet": True,
|
|
"noplaylist": True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(video_url, download=True)
|
|
|
|
vid = info["id"]
|
|
title = info.get("title", vid)
|
|
mp3_path = AUDIO_DIR / f"{vid}.mp3"
|
|
|
|
if not mp3_path.exists():
|
|
raise FileNotFoundError(f"Expected mp3 not found after download: {mp3_path}")
|
|
|
|
size = mp3_path.stat().st_size
|
|
log.info("Downloaded: %s (%d bytes) title=%r", mp3_path.name, size, title)
|
|
|
|
return mp3_path, title, vid
|
|
|
|
|
|
def build_feed(items):
|
|
out_path = RSS_DIR / FEED_FILE
|
|
log.info("Generating RSS feed: %s (items=%d)", out_path, len(items))
|
|
|
|
fg = FeedGenerator()
|
|
fg.title("TrueNews Podcast")
|
|
fg.link(href=DOMAIN, rel="alternate")
|
|
fg.description("Pvlx personal podcast feed")
|
|
fg.language("ru")
|
|
|
|
# Optional but nice
|
|
fg.lastBuildDate(datetime.now(timezone.utc))
|
|
|
|
for item in items:
|
|
fe = fg.add_entry()
|
|
fe.title(item["title"])
|
|
fe.guid(item["guid"], permalink=False)
|
|
fe.pubDate(item["pubDate"])
|
|
fe.enclosure(
|
|
url=f"{DOMAIN}/audio/{item['filename']}",
|
|
length=str(item["size"]),
|
|
type="audio/mpeg"
|
|
)
|
|
|
|
fg.rss_file(out_path)
|
|
log.info("RSS written: %s (size=%d bytes)", out_path, out_path.stat().st_size if out_path.exists() else -1)
|
|
|
|
|
|
def prune_old(items):
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)
|
|
log.info("Pruning episodes older than %d days (cutoff=%s)", RETENTION_DAYS, cutoff.isoformat())
|
|
|
|
kept = []
|
|
removed = 0
|
|
deleted_files = 0
|
|
|
|
for item in items:
|
|
try:
|
|
dt = datetime.fromisoformat(item["iso"])
|
|
# if stored without tz, assume UTC
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
# If parsing fails, keep it to avoid accidental deletion
|
|
kept.append(item)
|
|
continue
|
|
|
|
if dt > cutoff:
|
|
kept.append(item)
|
|
continue
|
|
|
|
removed += 1
|
|
path = AUDIO_DIR / item["filename"]
|
|
if path.exists():
|
|
try:
|
|
path.unlink()
|
|
deleted_files += 1
|
|
log.info("Deleted old audio: %s", path.name)
|
|
except Exception as e:
|
|
log.warning("Failed to delete %s: %s", path, e)
|
|
|
|
log.info("Prune done: removed=%d items, deleted_files=%d, kept=%d", removed, deleted_files, len(kept))
|
|
return kept
|
|
|
|
|
|
def first_run_index_only():
|
|
"""
|
|
First run behavior: create state with all current IDs; download nothing.
|
|
"""
|
|
playlist = get_playlist()
|
|
state = {"known": list(playlist.keys()), "items": []}
|
|
save_state(state)
|
|
build_feed([]) # create empty feed file so URL exists immediately
|
|
log.info("First run complete: indexed %d existing videos; no downloads.", len(playlist))
|
|
|
|
|
|
def main_loop():
|
|
log.info("RSS bot starting. domain=%s feed=%s interval=%ss retention=%sd", DOMAIN, FEED_FILE, CHECK_INTERVAL, RETENTION_DAYS)
|
|
log.info("Paths: data=%s audio=%s rss=%s state=%s", DATA_DIR, AUDIO_DIR, RSS_DIR, STATE_FILE)
|
|
|
|
state = load_state()
|
|
if state is None:
|
|
log.warning("State file not found. Performing first-run indexing only.")
|
|
first_run_index_only()
|
|
state = load_state() or {"known": [], "items": []}
|
|
|
|
known = set(state.get("known", []))
|
|
items = state.get("items", [])
|
|
|
|
while True:
|
|
try:
|
|
log.info("Tick: checking playlist...")
|
|
playlist = get_playlist()
|
|
|
|
new_ids = [vid for vid in playlist.keys() if vid not in known]
|
|
log.info("New videos found: %d", len(new_ids))
|
|
|
|
for vid in new_ids:
|
|
url = playlist[vid]
|
|
mp3_path, title, guid = download_audio(url)
|
|
size = mp3_path.stat().st_size
|
|
|
|
now = datetime.now(timezone.utc)
|
|
item = {
|
|
"title": title,
|
|
"guid": guid, # stable GUID = VK id
|
|
"filename": mp3_path.name, # <id>.mp3
|
|
"size": size,
|
|
"pubDate": format_datetime(now),
|
|
"iso": now.isoformat(),
|
|
}
|
|
|
|
items.insert(0, item)
|
|
known.add(vid)
|
|
log.info("Added episode: guid=%s file=%s", guid, mp3_path.name)
|
|
|
|
# retention + feed + state
|
|
items = prune_old(items)
|
|
build_feed(items)
|
|
save_state({"known": list(known), "items": items})
|
|
|
|
log.info("Sleep %ds...", CHECK_INTERVAL)
|
|
|
|
except Exception as e:
|
|
log.exception("Loop error: %s", e)
|
|
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_loop()
|