import os import time import json from pathlib import Path from datetime import datetime, timedelta, timezone from email.utils import format_datetime import logging import yt_dlp from feedgen.feed import FeedGenerator # ------------------------- # Config # ------------------------- VK_PLAYLIST_URL = os.environ["VK_PLAYLIST_URL"] DOMAIN = os.environ["DOMAIN"].rstrip("/") FEED_FILE = os.environ["FEED_FILE"] CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "300")) RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "14")) DATA_DIR = Path("/data") AUDIO_DIR = DATA_DIR / "audio" RSS_DIR = DATA_DIR / "rss" STATE_FILE = DATA_DIR / "state/rss.json" AUDIO_DIR.mkdir(parents=True, exist_ok=True) RSS_DIR.mkdir(parents=True, exist_ok=True) STATE_FILE.parent.mkdir(parents=True, exist_ok=True) # ------------------------- # Logging # ------------------------- logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s %(levelname)s %(message)s", ) log = logging.getLogger("vk-podcast-rss") def load_state(): if not STATE_FILE.exists(): return None try: return json.loads(STATE_FILE.read_text(encoding="utf-8")) except Exception as e: log.exception("Failed to read state file %s: %s", STATE_FILE, e) return None def save_state(state): try: STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") log.info("State saved: %s (known=%d, items=%d)", STATE_FILE, len(state.get("known", [])), len(state.get("items", []))) except Exception as e: log.exception("Failed to save state file %s: %s", STATE_FILE, e) raise def get_playlist(): log.info("Fetching playlist: %s", VK_PLAYLIST_URL) ydl_opts = {"extract_flat": True, "quiet": True} with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(VK_PLAYLIST_URL, download=False) entries = info.get("entries", []) or [] videos = {e["id"]: e["url"] for e in entries if e} log.info("Playlist fetched: %d entries", len(videos)) return videos def download_audio(video_url: str): """ Downloads audio and converts to mp3. Filename is video id: .mp3 (ASCII-safe; best for Apple Podcasts) """ log.info("Downloading audio from: %s", video_url) ydl_opts = { "format": "bestaudio/best", "outtmpl": str(AUDIO_DIR / "%(id)s.%(ext)s"), "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", }], "quiet": True, "noplaylist": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=True) vid = info["id"] title = info.get("title", vid) mp3_path = AUDIO_DIR / f"{vid}.mp3" if not mp3_path.exists(): raise FileNotFoundError(f"Expected mp3 not found after download: {mp3_path}") size = mp3_path.stat().st_size log.info("Downloaded: %s (%d bytes) title=%r", mp3_path.name, size, title) return mp3_path, title, vid def build_feed(items): out_path = RSS_DIR / FEED_FILE log.info("Generating RSS feed: %s (items=%d)", out_path, len(items)) fg = FeedGenerator() fg.title("TrueNews Podcast") fg.link(href=DOMAIN, rel="alternate") fg.description("Pvlx personal podcast feed") fg.language("ru") # Optional but nice fg.lastBuildDate(datetime.now(timezone.utc)) for item in items: fe = fg.add_entry() fe.title(item["title"]) fe.guid(item["guid"], permalink=False) fe.pubDate(item["pubDate"]) fe.enclosure( url=f"{DOMAIN}/audio/{item['filename']}", length=str(item["size"]), type="audio/mpeg" ) fg.rss_file(out_path) log.info("RSS written: %s (size=%d bytes)", out_path, out_path.stat().st_size if out_path.exists() else -1) def prune_old(items): cutoff = datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS) log.info("Pruning episodes older than %d days (cutoff=%s)", RETENTION_DAYS, cutoff.isoformat()) kept = [] removed = 0 deleted_files = 0 for item in items: try: dt = datetime.fromisoformat(item["iso"]) # if stored without tz, assume UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) except Exception: # If parsing fails, keep it to avoid accidental deletion kept.append(item) continue if dt > cutoff: kept.append(item) continue removed += 1 path = AUDIO_DIR / item["filename"] if path.exists(): try: path.unlink() deleted_files += 1 log.info("Deleted old audio: %s", path.name) except Exception as e: log.warning("Failed to delete %s: %s", path, e) log.info("Prune done: removed=%d items, deleted_files=%d, kept=%d", removed, deleted_files, len(kept)) return kept def first_run_index_only(): """ First run behavior: create state with all current IDs; download nothing. """ playlist = get_playlist() state = {"known": list(playlist.keys()), "items": []} save_state(state) build_feed([]) # create empty feed file so URL exists immediately log.info("First run complete: indexed %d existing videos; no downloads.", len(playlist)) def main_loop(): log.info("RSS bot starting. domain=%s feed=%s interval=%ss retention=%sd", DOMAIN, FEED_FILE, CHECK_INTERVAL, RETENTION_DAYS) log.info("Paths: data=%s audio=%s rss=%s state=%s", DATA_DIR, AUDIO_DIR, RSS_DIR, STATE_FILE) state = load_state() if state is None: log.warning("State file not found. Performing first-run indexing only.") first_run_index_only() state = load_state() or {"known": [], "items": []} known = set(state.get("known", [])) items = state.get("items", []) while True: try: log.info("Tick: checking playlist...") playlist = get_playlist() new_ids = [vid for vid in playlist.keys() if vid not in known] log.info("New videos found: %d", len(new_ids)) for vid in new_ids: url = playlist[vid] mp3_path, title, guid = download_audio(url) size = mp3_path.stat().st_size now = datetime.now(timezone.utc) item = { "title": title, "guid": guid, # stable GUID = VK id "filename": mp3_path.name, # .mp3 "size": size, "pubDate": format_datetime(now), "iso": now.isoformat(), } items.insert(0, item) known.add(vid) log.info("Added episode: guid=%s file=%s", guid, mp3_path.name) # retention + feed + state items = prune_old(items) build_feed(items) save_state({"known": list(known), "items": items}) log.info("Sleep %ds...", CHECK_INTERVAL) except Exception as e: log.exception("Loop error: %s", e) time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main_loop()