- Rewrote broad_scan.py with: - Caching layer (.agents/cache/research/) with 6-hour TTL - Proper Firefox User-Agent for Reddit RSS - Fallback to old.reddit.com RSS when www is blocked - Exponential backoff retry logic - Cache fallback when fresh fetch fails - Seeded initial cache with all 3 subreddits (weightroom, advancedfitness, StrongerByScience) - Fixed save path to logs/research/ - Added .agents/cache/ to .gitignore First research scan now has real data from all sources
314 lines
11 KiB
Python
314 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Broad research scan: fetches recent high-signal content from fitness/science
|
|
sources without targeted search bias. Run occasionally to catch trends,
|
|
new studies, and community discussions.
|
|
|
|
Sources:
|
|
- Reddit: r/weightroom, r/advancedfitness, r/StrongerByScience (via RSS + cache)
|
|
- PubMed: recent articles on general resistance training (via E-utilities)
|
|
|
|
Reddit RSS is rate-limited (~1 request per 10s without getting 429'd).
|
|
A local cache avoids re-fetching on every run.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import sys
|
|
import os
|
|
import urllib.parse
|
|
from datetime import datetime, timezone, timedelta
|
|
from xml.etree import ElementTree
|
|
|
|
# Requests is available in this project's venv (from fitness-workout deps)
|
|
try:
|
|
import requests as req_lib
|
|
except ImportError:
|
|
req_lib = None
|
|
|
|
REDDIT_USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0"
|
|
CACHE_TTL_HOURS = 6 # Re-fetch Reddit RSS if cache older than this
|
|
CACHE_DIR = None # Set in main()
|
|
|
|
SOURCES = {
|
|
"reddit": [
|
|
"weightroom",
|
|
"advancedfitness",
|
|
"StrongerByScience",
|
|
],
|
|
"pubmed_terms": [
|
|
"resistance training",
|
|
"strength training programming",
|
|
"exercise physiology",
|
|
],
|
|
}
|
|
|
|
|
|
def get_cache_path():
|
|
"""Get path to cache directory, creating it if needed."""
|
|
global CACHE_DIR
|
|
if CACHE_DIR is None:
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
# Go up: research/ -> scripts/ -> .agents/ -> project root
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(script_dir)))
|
|
CACHE_DIR = os.path.join(project_root, ".agents", "cache", "research")
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
return CACHE_DIR
|
|
|
|
|
|
def load_cache(key):
|
|
"""Load cached data for a given key. Returns None if missing/expired."""
|
|
cache_path = os.path.join(get_cache_path(), f"{key}.json")
|
|
if not os.path.exists(cache_path):
|
|
return None
|
|
try:
|
|
with open(cache_path) as f:
|
|
data = json.load(f)
|
|
ts = datetime.fromisoformat(data["_cached_at"])
|
|
age = datetime.now(timezone.utc) - ts
|
|
if age < timedelta(hours=CACHE_TTL_HOURS):
|
|
return data["payload"]
|
|
else:
|
|
return None # Stale
|
|
except (json.JSONDecodeError, KeyError, ValueError):
|
|
return None
|
|
|
|
|
|
def save_cache(key, payload):
|
|
"""Save payload to cache with timestamp."""
|
|
cache_path = os.path.join(get_cache_path(), f"{key}.json")
|
|
data = {
|
|
"_cached_at": datetime.now(timezone.utc).isoformat(),
|
|
"payload": payload,
|
|
}
|
|
with open(cache_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def fetch_url(url, max_retries=2):
|
|
"""Fetch content from a URL with retries using requests library."""
|
|
if req_lib is None:
|
|
print(" [Warning: requests library not available]", file=sys.stderr)
|
|
return None
|
|
headers = {
|
|
"User-Agent": REDDIT_USER_AGENT,
|
|
"Accept": "application/xml, text/xml, text/html, */*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
}
|
|
for attempt in range(max_retries):
|
|
try:
|
|
resp = req_lib.get(url, headers=headers, timeout=20)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
delay = 3 * (attempt + 1)
|
|
time.sleep(delay)
|
|
else:
|
|
return None
|
|
|
|
|
|
def fetch_reddit_posts(subreddit, limit=10):
|
|
"""Fetch top posts from a subreddit via RSS, with cache fallback."""
|
|
cache_key = f"reddit_{subreddit}"
|
|
|
|
# Try cache first
|
|
cached = load_cache(cache_key)
|
|
if cached is not None:
|
|
# Add a note about cache age
|
|
for p in cached:
|
|
if "_from_cache" not in p:
|
|
p["_from_cache"] = True
|
|
return cached
|
|
|
|
# Try main reddit RSS
|
|
url = f"https://www.reddit.com/r/{subreddit}/hot/.rss?limit={limit}"
|
|
content = fetch_url(url)
|
|
if not content:
|
|
# Fallback: try old.reddit.com (less aggressive blocking)
|
|
time.sleep(8) # Generous delay to avoid rate limiting
|
|
url = f"https://old.reddit.com/r/{subreddit}/hot/.rss?limit={limit}"
|
|
content = fetch_url(url)
|
|
|
|
if not content:
|
|
# Final fallback: try expired cache
|
|
stale = load_expired_cache(cache_key)
|
|
if stale is not None:
|
|
for p in stale:
|
|
p["_from_cache"] = True
|
|
return stale
|
|
return [{"title": f"[Could not reach r/{subreddit}]", "url": "", "score": 0, "num_comments": 0, "date": "?"}]
|
|
|
|
posts = []
|
|
try:
|
|
root = ElementTree.fromstring(content)
|
|
ns = {"": "http://www.w3.org/2005/Atom"}
|
|
for entry in root.findall(".//entry", ns):
|
|
title_el = entry.find("title", ns)
|
|
link_el = entry.find("link", ns)
|
|
updated_el = entry.find("updated", ns)
|
|
title = title_el.text if title_el is not None else "No title"
|
|
link = link_el.get("href", "") if link_el is not None else ""
|
|
updated = updated_el.text[:10] if updated_el is not None and updated_el.text else "?"
|
|
posts.append({
|
|
"title": title,
|
|
"url": link,
|
|
"score": "?",
|
|
"num_comments": "?",
|
|
"date": updated,
|
|
"_from_cache": False,
|
|
})
|
|
except Exception as e:
|
|
print(f" [Warning: RSS parse error for r/{subreddit}: {e}]", file=sys.stderr)
|
|
stale = load_expired_cache(cache_key)
|
|
if stale is not None:
|
|
for p in stale:
|
|
p["_from_cache"] = True
|
|
return stale
|
|
return [{"title": f"[Parse error: {e}]", "url": "", "score": 0, "num_comments": 0, "date": "?"}]
|
|
|
|
if not posts:
|
|
return [{"title": "No posts found", "url": "", "score": 0, "num_comments": 0, "date": "?"}]
|
|
|
|
# Save to cache
|
|
save_cache(cache_key, posts)
|
|
return posts
|
|
|
|
|
|
def load_expired_cache(key):
|
|
"""Load cached data even if expired (last resort fallback)."""
|
|
cache_path = os.path.join(get_cache_path(), f"{key}.json")
|
|
if not os.path.exists(cache_path):
|
|
return None
|
|
try:
|
|
with open(cache_path) as f:
|
|
data = json.load(f)
|
|
return data.get("payload")
|
|
except (json.JSONDecodeError, KeyError):
|
|
return None
|
|
|
|
|
|
def fetch_pubmed_articles(term, max_results=5):
|
|
"""Fetch recent PubMed articles on a broad topic."""
|
|
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
|
|
|
|
search_url = (
|
|
f"{base_url}esearch.fcgi?"
|
|
f"db=pubmed&term={urllib.parse.quote(term)}&"
|
|
f"retmax={max_results}&sort=date&retmode=json"
|
|
)
|
|
content = fetch_url(search_url)
|
|
if not content:
|
|
return [{"title": f"[Could not reach PubMed for '{term}']", "url": "", "authors": "", "source": ""}]
|
|
|
|
try:
|
|
search_data = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
return [{"title": f"[Parse error for '{term}']", "url": "", "authors": "", "source": ""}]
|
|
|
|
id_list = search_data.get("esearchresult", {}).get("idlist", [])
|
|
if not id_list:
|
|
return [{"title": "No recent articles found", "url": "", "authors": "", "source": ""}]
|
|
|
|
time.sleep(1)
|
|
|
|
details_url = (
|
|
f"{base_url}efetch.fcgi?"
|
|
f"db=pubmed&id={','.join(id_list)}&retmode=xml"
|
|
)
|
|
xml_data = fetch_url(details_url)
|
|
if not xml_data:
|
|
return [{"title": f"[Could not fetch details for '{term}']", "url": "", "authors": "", "source": ""}]
|
|
|
|
articles = []
|
|
root = ElementTree.fromstring(xml_data)
|
|
for article in root.findall(".//PubmedArticle")[:max_results]:
|
|
medline = article.find(".//MedlineCitation")
|
|
article_data = medline.find(".//Article") if medline is not None else None
|
|
if article_data is None:
|
|
continue
|
|
|
|
title_el = article_data.find("ArticleTitle")
|
|
title = "".join(title_el.itertext()) if title_el is not None else "No title"
|
|
|
|
author_list = article_data.findall(".//Author")
|
|
authors = []
|
|
for author in author_list[:3]:
|
|
last = author.find("LastName")
|
|
fore = author.find("ForeName")
|
|
if last is not None:
|
|
name = last.text or ""
|
|
if fore is not None:
|
|
name += f" {fore.text or ''}"
|
|
authors.append(name)
|
|
author_str = ", ".join(authors) if authors else "Unknown"
|
|
|
|
journal = article_data.find(".//Journal/Title")
|
|
journal_str = journal.text if journal is not None else "Unknown journal"
|
|
|
|
pmid = medline.find(".//PMID")
|
|
pmid_str = pmid.text if pmid is not None else ""
|
|
link = f"https://pubmed.ncbi.nlm.nih.gov/{pmid_str}/" if pmid_str else ""
|
|
|
|
articles.append({
|
|
"title": title,
|
|
"url": link,
|
|
"authors": author_str,
|
|
"source": journal_str,
|
|
})
|
|
|
|
return articles if articles else [{"title": "No structured data", "url": "", "authors": "", "source": ""}]
|
|
|
|
|
|
def main():
|
|
output = []
|
|
output.append(f"# Broad Research Scan — {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
|
|
output.append(f"_Auto-generated. Not targeted — general scan of recent content._\n")
|
|
|
|
# --- Reddit ---
|
|
output.append("## Reddit\n")
|
|
for i, subreddit in enumerate(SOURCES["reddit"]):
|
|
if i > 0:
|
|
time.sleep(10) # 10s delay between subreddits to avoid rate limiting
|
|
output.append(f"### r/{subreddit}\n")
|
|
posts = fetch_reddit_posts(subreddit)
|
|
for post in posts:
|
|
date_str = post.get("date", "?")
|
|
source_note = ""
|
|
if post.get("_from_cache"):
|
|
source_note = " (cached)"
|
|
output.append(f"- **{post['title']}**{source_note}")
|
|
output.append(f" - {post['url']} ({date_str})")
|
|
output.append("")
|
|
|
|
# --- PubMed ---
|
|
output.append("## PubMed — Recent Articles\n")
|
|
for term in SOURCES["pubmed_terms"]:
|
|
output.append(f"### Topic: \"{term}\"\n")
|
|
articles = fetch_pubmed_articles(term)
|
|
for article in articles:
|
|
output.append(f"- **{article['title']}**")
|
|
if article.get("authors") and article.get("source"):
|
|
output.append(f" - {article['authors']} | {article['source']}")
|
|
if article.get("url"):
|
|
output.append(f" - {article['url']}")
|
|
output.append("")
|
|
|
|
result = "\n".join(output)
|
|
print(result)
|
|
|
|
# Save to dated file
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(script_dir)))
|
|
logs_dir = os.path.join(project_root, "logs", "research")
|
|
os.makedirs(logs_dir, exist_ok=True)
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
filename = os.path.join(logs_dir, f"{date_str}-broad-scan.md")
|
|
with open(filename, "w") as f:
|
|
f.write(result)
|
|
print(f"\n--- Saved to {filename} ---", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|