Source code for modules.job_search

"""Job search module - scrapes/queries job listings from multiple sources.

Source tiers
------------
Tier 1 - Always-on free public JSON APIs (no auth):
  remote-ok, remotive, arbeitnow, himalayas, jobicy, the-muse

Tier 2 - RSS feed (no auth):
  we-work-remotely

Tier 3 - API-key required (configure in .env):
  adzuna, indeed, ziprecruiter

Tier 4 - OAuth 2.0 (configure LinkedIn app in .env):
  linkedin

Tier 5 - ATS aggregators (company career pages, public API):
  lever, ashby, greenhouse
"""

from __future__ import annotations

import asyncio
import json
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path

import aiohttp
from config import logger
from settings import get_settings

from modules.auth import get_auth_manager

# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------

_HEADERS = {"User-Agent": "TalentAcquisitionAgent/2.0 (+https://github.com/rakrsh)"}
_TIMEOUT = aiohttp.ClientTimeout(total=20)


async def _get_json(
    session: aiohttp.ClientSession, url: str, **kwargs
) -> dict | list | None:
    try:
        async with session.get(url, timeout=_TIMEOUT, headers=_HEADERS, **kwargs) as r:
            if r.status == 200:
                return await r.json(content_type=None)
            logger.debug(f"GET {url}{r.status}")
    except asyncio.TimeoutError:
        logger.warning(f"Timeout: {url}")
    except Exception as exc:
        logger.error(f"Error fetching {url}: {exc}")
    return None


async def _get_text(session: aiohttp.ClientSession, url: str, **kwargs) -> str | None:
    try:
        async with session.get(url, timeout=_TIMEOUT, headers=_HEADERS, **kwargs) as r:
            if r.status == 200:
                return await r.text()
            logger.debug(f"GET {url}{r.status}")
    except asyncio.TimeoutError:
        logger.warning(f"Timeout: {url}")
    except Exception as exc:
        logger.error(f"Error fetching {url}: {exc}")
    return None


# ---------------------------------------------------------------------------
# Job dataclass
# ---------------------------------------------------------------------------


[docs]@dataclass class Job: """Job listing data structure.""" title: str company: str location: str url: str source: str posted_date: str | None = None salary: str | None = None description: str | None = None applied: bool = False applied_date: str | None = None search_term: str = ""
# --------------------------------------------------------------------------- # JobSearcher # ---------------------------------------------------------------------------
[docs]class JobSearcher: """Searches job boards for new listings.""" def __init__(self, config_path: str | None = None): settings = get_settings() self.config_path = Path(config_path or settings.config_file_path) self.search_config = self._load_config() self.jobs: list[Job] = [] def _load_config(self) -> dict: if self.config_path.exists(): with open(self.config_path) as f: return json.load(f) return {"search_terms": []} @property def _roles(self) -> list[dict]: return self.search_config.get("search_terms", []) # ------------------------------------------------------------------ # Orchestrator # ------------------------------------------------------------------
[docs] async def search_all(self) -> list[Job]: """Search all configured/enabled sources in parallel.""" settings = get_settings() auth = await get_auth_manager() tasks = [] # Tier 1 - free public APIs if settings.remote_ok_enabled: tasks.append(self._search_remote_ok()) if settings.remotive_enabled: tasks.append(self._search_remotive()) if settings.arbeitnow_enabled: tasks.append(self._search_arbeitnow()) if settings.himalayas_enabled: tasks.append(self._search_himalayas()) if settings.jobicy_enabled: tasks.append(self._search_jobicy()) if settings.the_muse_enabled: tasks.append(self._search_the_muse()) # Tier 2 - RSS if settings.we_work_remotely_enabled: tasks.append(self._search_weworkremotely_rss()) # Tier 3 - API key if settings.adzuna_enabled: if auth.adzuna.is_configured: tasks.append(self._search_adzuna(auth)) else: logger.warning( "Adzuna enabled but ADZUNA_APP_ID/ADZUNA_API_KEY not set." ) if settings.indeed_enabled: if auth.indeed.is_configured: tasks.append(self._search_indeed(auth)) else: logger.warning("Indeed enabled but INDEED_PUBLISHER_ID not set.") if settings.ziprecruiter_enabled: if auth.ziprecruiter.is_configured: tasks.append(self._search_ziprecruiter(auth)) else: logger.warning("ZipRecruiter enabled but ZIPRECRUITER_API_KEY not set.") # Tier 4 - OAuth if settings.linkedin_enabled: if auth.linkedin.is_configured: tasks.append(self._search_linkedin(auth)) else: logger.warning( "LinkedIn enabled but LINKEDIN_CLIENT_ID/SECRET not set." ) # Tier 5 - ATS aggregators if settings.lever_enabled: tasks.append(self._search_lever(settings)) if settings.ashby_enabled: tasks.append(self._search_ashby(settings)) if settings.greenhouse_enabled: tasks.append(self._search_greenhouse(settings)) if not tasks: logger.warning("No job boards enabled.") return [] logger.info(f"Searching {len(tasks)} job board task(s)…") results = await asyncio.gather(*tasks, return_exceptions=True) all_jobs: list[Job] = [] for result in results: if isinstance(result, list): all_jobs.extend(result) elif isinstance(result, Exception): logger.error(f"Board search failed: {result}") # Deduplicate by URL seen: set[str] = set() unique: list[Job] = [] for job in all_jobs: if job.url and job.url not in seen: seen.add(job.url) unique.append(job) self.jobs = unique logger.info(f"Found {len(unique)} unique jobs across all sources.") return unique
# ------------------------------------------------------------------ # Tier 1 - Free Public JSON APIs # ------------------------------------------------------------------ async def _search_remote_ok(self) -> list[Job]: """RemoteOK public JSON API.""" jobs: list[Job] = [] async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") tag = role.lower().replace(" ", "-") data = await _get_json(s, f"https://remoteok.com/api?tag={tag}") if not isinstance(data, list): continue for item in data[1:21]: if ( isinstance(item, dict) and item.get("company") and item.get("position") ): jobs.append( Job( title=item.get("position", ""), company=item.get("company", ""), location=item.get("location", "Remote"), url=item.get("url", ""), source="remote-ok", posted_date=item.get("date", ""), salary=item.get("salary", ""), search_term=role, ) ) return jobs async def _search_remotive(self) -> list[Job]: """Remotive public JSON API.""" jobs: list[Job] = [] category_map = { "python": "software-dev", "devops": "devops", "site reliability engineer": "devops", "javascript": "frontend", "ai": "data", } async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") cat = category_map.get(role.lower(), "software-dev") data = await _get_json( s, f"https://remotive.com/api/remote-jobs?category={cat}&limit=20" ) if not isinstance(data, dict): continue for item in data.get("jobs", [])[:20]: jobs.append( Job( title=item.get("title", ""), company=item.get("company_name", ""), location=item.get("candidate_required_location", "Remote"), url=item.get("url", ""), source="remotive", posted_date=item.get("published_at", ""), salary=item.get("salary", ""), search_term=role, ) ) return jobs async def _search_arbeitnow(self) -> list[Job]: """Arbeitnow free public API - international remote jobs.""" jobs: list[Job] = [] async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") url = f"https://www.arbeitnow.com/api/job-board-api?search={role.replace(' ', '+')}&remote=true" data = await _get_json(s, url) if not isinstance(data, dict): continue for item in data.get("data", [])[:20]: jobs.append( Job( title=item.get("title", ""), company=item.get("company_name", ""), location=item.get("location", "Remote"), url=item.get("url", ""), source="arbeitnow", posted_date=item.get("created_at", ""), description=item.get("description", "")[:300] if item.get("description") else None, search_term=role, ) ) return jobs async def _search_himalayas(self) -> list[Job]: """Himalayas.app free public API.""" jobs: list[Job] = [] async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") url = f"https://himalayas.app/jobs/api?q={role.replace(' ', '+')}&limit=20" data = await _get_json(s, url) if not isinstance(data, dict): continue for item in data.get("jobs", [])[:20]: jobs.append( Job( title=item.get("title", ""), company=item.get("companyName", ""), location=item.get("location", "Remote"), url=item.get("applicationLink", item.get("url", "")), source="himalayas", posted_date=item.get("createdAt", ""), salary=item.get("salary", ""), search_term=role, ) ) return jobs async def _search_jobicy(self) -> list[Job]: """Jobicy free public API - remote jobs.""" jobs: list[Job] = [] async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") url = f"https://jobicy.com/api/v2/remote-jobs?count=20&tag={role.replace(' ', '+')}" data = await _get_json(s, url) if not isinstance(data, dict): continue for item in data.get("jobs", [])[:20]: jobs.append( Job( title=item.get("jobTitle", ""), company=item.get("companyName", ""), location=item.get("jobGeo", "Remote"), url=item.get("url", ""), source="jobicy", posted_date=item.get("pubDate", ""), salary=item.get("annualSalaryMin", ""), search_term=role, ) ) return jobs async def _search_the_muse(self) -> list[Job]: """The Muse public API.""" jobs: list[Job] = [] settings = get_settings() async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") params: dict = {"category": role, "page": 1, "descending": "true"} if settings.the_muse_api_key: params["api_key"] = settings.the_muse_api_key qs = "&".join(f"{k}={v}" for k, v in params.items()) data = await _get_json( s, f"https://www.themuse.com/api/public/jobs?{qs}" ) if not isinstance(data, dict): continue for item in data.get("results", [])[:20]: company = item.get("company", {}).get("name", "") locations = item.get("locations", []) location = ( locations[0].get("name", "Remote") if locations else "Remote" ) jobs.append( Job( title=item.get("name", ""), company=company, location=location, url=item.get("refs", {}).get("landing_page", ""), source="the-muse", posted_date=item.get("publication_date", ""), search_term=role, ) ) return jobs # ------------------------------------------------------------------ # Tier 2 - RSS Feeds # ------------------------------------------------------------------ async def _search_weworkremotely_rss(self) -> list[Job]: """We Work Remotely - parse public RSS feed.""" jobs: list[Job] = [] feed_urls = [ "https://weworkremotely.com/remote-jobs.rss", "https://weworkremotely.com/categories/remote-programming-jobs.rss", "https://weworkremotely.com/categories/remote-devops-sysadmin-jobs.rss", ] keywords = {t.get("role", "").lower() for t in self._roles} | { kw.lower() for t in self._roles for kw in t.get("keywords", []) } async with aiohttp.ClientSession() as s: for feed_url in feed_urls: text = await _get_text(s, feed_url) if not text: continue try: root = ET.fromstring(text) channel = root.find("channel") if channel is None: continue for item in channel.findall("item")[:30]: title_el = item.find("title") link_el = item.find("link") region_el = item.find("region") company_el = item.find("company") date_el = item.find("pubDate") title = title_el.text or "" if title_el is not None else "" link = link_el.text or "" if link_el is not None else "" region = ( region_el.text or "Remote" if region_el is not None else "Remote" ) company = ( company_el.text or "" if company_el is not None else "" ) pub_date = date_el.text or "" if date_el is not None else "" # Basic keyword relevance filter title_lower = title.lower() if keywords and not any(kw in title_lower for kw in keywords): continue if link: jobs.append( Job( title=title, company=company, location=region, url=link, source="we-work-remotely", posted_date=pub_date, search_term="rss", ) ) except ET.ParseError as exc: logger.error(f"WWR RSS parse error: {exc}") return jobs # ------------------------------------------------------------------ # Tier 3 - API-key boards # ------------------------------------------------------------------ async def _search_adzuna(self, auth) -> list[Job]: """Adzuna Jobs API (requires ADZUNA_APP_ID + ADZUNA_API_KEY).""" jobs: list[Job] = [] settings = get_settings() country = settings.adzuna_country or "us" base = f"https://api.adzuna.com/v1/api/jobs/{country}/search" async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") params = { **auth.adzuna.auth_params(), "what": role, "results_per_page": 20, "content-type": "application/json", } data = await _get_json(s, f"{base}/1", params=params) if not isinstance(data, dict): continue for item in data.get("results", [])[:20]: company = item.get("company", {}).get("display_name", "") location = item.get("location", {}).get("display_name", "") jobs.append( Job( title=item.get("title", ""), company=company, location=location, url=item.get("redirect_url", ""), source="adzuna", posted_date=item.get("created", ""), salary=str(item.get("salary_min", "")), description=item.get("description", "")[:300], search_term=role, ) ) return jobs async def _search_indeed(self, auth) -> list[Job]: """Indeed Publisher API (requires INDEED_PUBLISHER_ID).""" jobs: list[Job] = [] base_params = auth.indeed.auth_params() async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") params = { **base_params, "q": role, "l": "", "sort": "date", "radius": 25, "st": "", "jt": "fulltime", "start": 0, "limit": 20, "latlong": 1, "co": "us", "chnl": "", "userip": "1.2.3.4", "useragent": "Mozilla/5.0", } data = await _get_json( s, "http://api.indeed.com/ads/apisearch", params=params ) if not isinstance(data, dict): continue for item in data.get("results", [])[:20]: jobs.append( Job( title=item.get("jobtitle", ""), company=item.get("company", ""), location=f"{item.get('city','')}, {item.get('state','')}".strip( ", " ), url=item.get("url", ""), source="indeed", posted_date=item.get("date", ""), search_term=role, ) ) return jobs async def _search_ziprecruiter(self, auth) -> list[Job]: """ZipRecruiter Partner API (requires ZIPRECRUITER_API_KEY).""" jobs: list[Job] = [] headers = {**_HEADERS, **auth.ziprecruiter.auth_headers()} async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") params = {"search": role, "jobs_per_page": 20} data = await _get_json( s, "https://api.ziprecruiter.com/jobs/v1", params=params, headers=headers, ) if not isinstance(data, dict): continue for item in data.get("jobs", [])[:20]: jobs.append( Job( title=item.get("name", ""), company=item.get("hiring_company", {}).get("name", ""), location=item.get("location", ""), url=item.get("job_url", ""), source="ziprecruiter", posted_date=item.get("posted_time", ""), salary=item.get("salary_interval", ""), search_term=role, ) ) return jobs # ------------------------------------------------------------------ # Tier 4 - OAuth (LinkedIn) # ------------------------------------------------------------------ async def _search_linkedin(self, auth) -> list[Job]: """LinkedIn Jobs API (requires OAuth client credentials).""" jobs: list[Job] = [] token = await auth.linkedin.get_token() if not token: logger.debug("LinkedIn: no valid token - skipping.") return jobs headers = {**_HEADERS, **token.auth_header} async with aiohttp.ClientSession() as s: for term in self._roles: role = term.get("role", "") params = {"keywords": role, "count": 20, "start": 0} data = await _get_json( s, "https://api.linkedin.com/v2/jobSearch", params=params, headers=headers, ) if not isinstance(data, dict): continue for item in data.get("elements", [])[:20]: job_data = item.get("jobPosting", {}) title = job_data.get("title", "") company = ( job_data.get("company", {}).get("name", "") if isinstance(job_data.get("company"), dict) else "" ) location_data = job_data.get("formattedLocation", "") job_id = job_data.get("id", "") url = ( f"https://www.linkedin.com/jobs/view/{job_id}/" if job_id else "" ) jobs.append( Job( title=title, company=company, location=location_data, url=url, source="linkedin", posted_date=str(job_data.get("listedAt", "")), search_term=role, ) ) return jobs # ------------------------------------------------------------------ # Tier 5 - ATS Aggregators # ------------------------------------------------------------------ async def _search_lever(self, settings) -> list[Job]: """Lever ATS public postings API.""" jobs: list[Job] = [] companies = [ c.strip() for c in settings.lever_companies.split(",") if c.strip() ] keywords = {t.get("role", "").lower() for t in self._roles} async with aiohttp.ClientSession() as s: for company in companies: url = f"https://api.lever.co/v0/postings/{company}?mode=json" data = await _get_json(s, url) if not isinstance(data, list): continue for item in data[:15]: title = item.get("text", "") # Filter to relevant roles only if keywords and not any(kw in title.lower() for kw in keywords): continue jobs.append( Job( title=title, company=company.title(), location=item.get("categories", {}).get("location", ""), url=item.get("hostedUrl", item.get("applyUrl", "")), source="lever", posted_date=str(item.get("createdAt", "")), description=item.get("description", "")[:300], search_term=f"lever/{company}", ) ) return jobs async def _search_ashby(self, settings) -> list[Job]: """Ashby ATS public postings API.""" jobs: list[Job] = [] companies = [ c.strip() for c in settings.ashby_companies.split(",") if c.strip() ] keywords = {t.get("role", "").lower() for t in self._roles} async with aiohttp.ClientSession() as s: for company in companies: url = f"https://api.ashbyhq.com/posting-api/job-board/{company}" data = await _get_json(s, url) if not isinstance(data, dict): continue for item in data.get("jobPostings", [])[:15]: title = item.get("title", "") if keywords and not any(kw in title.lower() for kw in keywords): continue location = "" loc = item.get("location") if isinstance(loc, dict): location = loc.get("locationStr", "") elif isinstance(loc, str): location = loc jobs.append( Job( title=title, company=company.title(), location=location, url=item.get("jobUrl", item.get("applyUrl", "")), source="ashby", posted_date=item.get("publishedAt", ""), search_term=f"ashby/{company}", ) ) return jobs async def _search_greenhouse(self, settings) -> list[Job]: """Greenhouse ATS public job board API.""" jobs: list[Job] = [] companies = [ c.strip() for c in settings.greenhouse_companies.split(",") if c.strip() ] keywords = {t.get("role", "").lower() for t in self._roles} async with aiohttp.ClientSession() as s: for company in companies: url = f"https://boards-api.greenhouse.io/v1/boards/{company}/jobs?content=true" data = await _get_json(s, url) if not isinstance(data, dict): continue for item in data.get("jobs", [])[:15]: title = item.get("title", "") if keywords and not any(kw in title.lower() for kw in keywords): continue location = ( item.get("location", {}).get("name", "") if isinstance(item.get("location"), dict) else "" ) jobs.append( Job( title=title, company=company.title(), location=location, url=item.get("absolute_url", ""), source="greenhouse", posted_date=item.get("updated_at", ""), search_term=f"greenhouse/{company}", ) ) return jobs # ------------------------------------------------------------------ # Utility # ------------------------------------------------------------------
[docs] def filter_new_jobs(self, existing_jobs: list[Job]) -> list[Job]: """Filter out jobs already tracked.""" existing_urls = {job.url for job in existing_jobs} return [job for job in self.jobs if job.url not in existing_urls]
# --------------------------------------------------------------------------- # Quick test # --------------------------------------------------------------------------- if __name__ == "__main__": async def test(): searcher = JobSearcher() jobs = await searcher.search_all() print(f"Found {len(jobs)} jobs") from collections import Counter counts = Counter(j.source for j in jobs) for source, count in sorted(counts.items()): print(f" {source:25s} {count:3d} jobs") asyncio.run(test())