""" AutoJobs API — FastAPI Backend Multi-user job search with per-user API keys + subscription plans """ from fastapi import FastAPI, HTTPException, Depends, APIRouter from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from typing import List, Optional from datetime import datetime, timedelta import os, json, sqlite3, uuid from contextlib import asynccontextmanager DB_PATH = os.getenv("DB_PATH", "/var/www/autojobs/autojobs.db") # Router with /autojobs/api prefix router = APIRouter(prefix="/autojobs/api") # --- Plans & Pricing --- # Private User Plans (job seekers) PRIVATE_PLANS = { "free": {"name": "Free", "applications": 5, "ai_customize": 5, "cover_letters": 5, "price": 0}, "starter": {"name": "Starter", "applications": 20, "ai_customize": 20, "cover_letters": 20, "price": 29}, "pro": {"name": "Pro", "applications": 100, "ai_customize": 100, "cover_letters": 100, "price": 69}, "ultra": {"name": "Ultra", "applications": 200, "ai_customize": 200, "cover_letters": 200, "price": 149}, "unlimited": {"name": "Unlimited", "applications": 99999, "ai_customize": 99999, "cover_letters": 99999, "price": 199}, } # Alias for backwards compatibility PLANS = PRIVATE_PLANS # Agency Plans (recruiting agencies - manage multiple clients) AGENCY_PLANS = { "agency_starter": {"name": "Agency Starter", "submissions": 1000, "clients": 10, "price": 555}, "agency_growth": {"name": "Agency Growth", "submissions": 3000, "clients": 50, "price": 999}, "agency_scale": {"name": "Agency Scale", "submissions": 5000, "clients": 150, "price": 1499}, "agency_enterprise": {"name": "Agency Enterprise", "submissions": 10000, "clients": 500, "price": 2222}, } # --- Database --- def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Users with plan info c.execute(""" CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, name TEXT, user_type TEXT DEFAULT 'private', plan TEXT DEFAULT 'free', api_keys TEXT DEFAULT '{}', profile TEXT DEFAULT '{}', linkedin_access_token TEXT DEFAULT '', linkedin_refresh_token TEXT DEFAULT '', linkedin_profile_id TEXT DEFAULT '', monthly_count INTEGER DEFAULT 0, ai_customize_count INTEGER DEFAULT 0, billing_cycle_start TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Applications tracking c.execute(""" CREATE TABLE IF NOT EXISTS applications ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, job_title TEXT, company TEXT, location TEXT, job_url TEXT, status TEXT DEFAULT 'pending', resume_version TEXT, cover_letter TEXT, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) """) # API usage tracking c.execute(""" CREATE TABLE IF NOT EXISTS api_usage ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, source TEXT, jobs_found INTEGER, searched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) """) # Agency client profiles c.execute(""" CREATE TABLE IF NOT EXISTS client_profiles ( id TEXT PRIMARY KEY, agency_id TEXT NOT NULL, client_name TEXT, client_email TEXT, profile_data TEXT DEFAULT '{}', submission_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (agency_id) REFERENCES users(id) ) """) conn.commit() conn.close() @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield app = FastAPI(title="AutoJobs API", version="1.1.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(router) security = HTTPBearer(auto_error=False) # --- Helper Functions --- def get_user_row(user_id: str): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT * FROM users WHERE id = ?", (user_id,)) user = c.fetchone() conn.close() return dict(user) if user else None def check_plan_limit(user: dict, action: str, count: int = 1) -> dict: """Check if user has reached their plan limit""" plan_name = user.get("plan", "free") user_type = user.get("user_type", "private") # Agency plans if plan_name in AGENCY_PLANS: plan = AGENCY_PLANS[plan_name] if action == "submission": used = user.get("monthly_count", 0) limit = plan["submissions"] else: return {"allowed": True, "remaining": 99999} remaining = max(0, limit - used) return {"allowed": remaining >= count, "remaining": remaining, "limit": limit, "used": used, "type": "agency", "user_type": "agency"} # Private user plans plan = PRIVATE_PLANS.get(plan_name, PRIVATE_PLANS["free"]) if action == "application": used = user.get("monthly_count", 0) limit = plan["applications"] elif action == "ai_customize": used = user.get("ai_customize_count", 0) limit = plan["ai_customize"] else: return {"allowed": True, "remaining": 99999} remaining = max(0, limit - used) return {"allowed": remaining >= count, "remaining": remaining, "limit": limit, "used": used, "type": "jobseeker", "user_type": "private"} def increment_usage(user_id: str, action: str): conn = sqlite3.connect(DB_PATH) c = conn.cursor() if action == "submission": c.execute("UPDATE users SET monthly_count = monthly_count + 1 WHERE id = ?", (user_id,)) elif action == "ai_customize": c.execute("UPDATE users SET ai_customize_count = ai_customize_count + 1 WHERE id = ?", (user_id,)) conn.commit() conn.close() def reset_monthly_counts(): """Reset counts at the start of a new billing cycle (run daily via cron)""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" UPDATE users SET monthly_count = 0, ai_customize_count = 0, billing_cycle_start = CURRENT_TIMESTAMP WHERE date('now') >= date(billing_cycle_start, '+30 days') """) conn.commit() conn.close() # --- Auth --- def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): if not credentials: raise HTTPException(status_code=401, detail="Missing auth") return {"id": credentials.credentials} # --- Models --- class UserProfile(BaseModel): name: str email: str title: str summary: str skills: List[str] experience: List[dict] education: str linkedin_url: str = "" job_keywords: List[str] job_locations: List[str] class APIKeys(BaseModel): jooble: str = "" jsearch: str = "" perplexity: str = "" class JobSearchRequest(BaseModel): query: str location: str = "" page: int = 1 class ApplicationSubmit(BaseModel): job: dict resume: str cover_letter: str status: str = "applied" class PlanUpdate(BaseModel): plan: str # --- Routes --- @app.get("/") def root(): return {"status": "ok", "service": "AutoJobs API", "version": "1.1.0"} @app.get("/health") def health(): return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} # --- User Routes --- # --- LinkedIn OAuth --- @router.get("/auth/linkedin") def linkedin_auth(redirect_uri: str = "https://hostpioneers.com/autojobs/dashboard"): """Start LinkedIn OAuth flow""" client_id = os.getenv("LINKEDIN_CLIENT_ID", "") if not client_id: raise HTTPException(status_code=500, detail="LinkedIn OAuth not configured") scope = "openid profile email w_member_social" state = str(uuid.uuid4()) auth_url = ( f"https://www.linkedin.com/oauth/v2/authorization" f"?response_type=code" f"&client_id={client_id}" f"&redirect_uri={redirect_uri}" f"&scope={scope}" f"&state={state}" ) return {"auth_url": auth_url} @router.post("/auth/linkedin/callback") def linkedin_callback(code: str, user=Depends(get_current_user)): """Exchange LinkedIn code for access token and store it""" client_id = os.getenv("LINKEDIN_CLIENT_ID", "") client_secret = os.getenv("LINKEDIN_CLIENT_SECRET", "") redirect_uri = "https://hostpioneers.com/autojobs/dashboard" import requests token_url = "https://www.linkedin.com/oauth/v2/accessToken" data = { "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_id": client_id, "client_secret": client_secret, } try: resp = requests.post(token_url, data=data, timeout=10) if resp.status_code == 200: token_data = resp.json() access_token = token_data.get("access_token", "") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" UPDATE users SET linkedin_access_token = ?, linkedin_refresh_token = ? WHERE id = ? """, (access_token, token_data.get("refresh_token", ""), user["id"])) conn.commit() conn.close() return {"success": True, "message": "LinkedIn connected"} else: raise HTTPException(status_code=400, detail="Failed to get LinkedIn token") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/linkedin/resumes") def get_linkedin_resumes(user=Depends(get_current_user)): """Get user's LinkedIn saved resumes""" user_row = get_user_row(user["id"]) if not user_row or not user_row.get("linkedin_access_token"): raise HTTPException(status_code=400, detail="LinkedIn not connected") import requests headers = {"Authorization": f"Bearer {user_row['linkedin_access_token']}"} # Get LinkedIn profile to get resume info try: # LinkedIn Lite Profile profile_resp = requests.get( "https://api.linkedin.com/v2/me", headers=headers, timeout=10 ) # For MVP - return placeholder (LinkedIn API requires special approval for resume access) # In production you'd use LinkedIn's resume API return { "resumes": [ {"id": "linkedin_default", "name": "LinkedIn Saved Resume", "source": "linkedin"} ], "message": "Connect your LinkedIn to import your saved resumes" } except: return { "resumes": [], "message": "Could not access LinkedIn. Please re-connect." } @router.post("/linkedin/disconnect") def linkedin_disconnect(user=Depends(get_current_user)): """Disconnect LinkedIn account""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" UPDATE users SET linkedin_access_token = '', linkedin_refresh_token = '', linkedin_profile_id = '' WHERE id = ? """, (user["id"],)) conn.commit() conn.close() return {"success": True} @router.post("/users") def create_user(data: dict): """Create new user (private or agency)""" user_id = str(uuid.uuid4()) email = data.get("email", "") name = data.get("name", "") user_type = data.get("user_type", "private") plan = data.get("plan", "free") conn = sqlite3.connect(DB_PATH) c = conn.cursor() try: c.execute(""" INSERT INTO users (id, email, name, user_type, plan, monthly_count, ai_customize_count) VALUES (?, ?, ?, ?, ?, 0, 0) """, (user_id, email, name, user_type, plan)) conn.commit() except sqlite3.IntegrityError: conn.close() raise HTTPException(status_code=400, detail="Email already registered") conn.close() return {"user_id": user_id, "email": email, "plan": "free"} @router.post("/users/login") def login(data: dict): """Login by email — returns user_id as token (MVP auth)""" email = data.get("email", "") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT id, email, name, user_type, plan FROM users WHERE email = ?", (email,)) row = c.fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail="User not found") return { "user_id": row["id"], "email": row["email"], "name": row["name"], "user_type": row["user_type"], "plan": row["plan"] } @router.get("/users/me") def get_me(user=Depends(get_current_user)): """Get current user info with plan details""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") user_type = user_row.get("user_type", "private") if user_type == "agency": plan = AGENCY_PLANS.get(user_row["plan"], AGENCY_PLANS["agency_starter"]) else: plan = PRIVATE_PLANS.get(user_row["plan"], PRIVATE_PLANS["free"]) return { "id": user_row["id"], "email": user_row["email"], "name": user_row["name"], "user_type": user_type, "plan": user_row["plan"], "plan_details": { "name": plan["name"], "price": plan["price"] }, "linkedin_connected": bool(user_row.get("linkedin_access_token")), "usage": { "applications_this_month": user_row["monthly_count"], "ai_customizations_this_month": user_row["ai_customize_count"], "applications_remaining": max(0, plan.get("applications", plan.get("submissions", 99999)) - user_row["monthly_count"]), "ai_customize_remaining": max(0, plan.get("ai_customize", 99999) - user_row["ai_customize_count"]) }, "billing_cycle_start": user_row["billing_cycle_start"], "created_at": user_row["created_at"] } @router.post("/users/me/plan") def update_plan(plan_update: PlanUpdate, user=Depends(get_current_user)): """Update user's subscription plan""" if plan_update.plan not in PLANS: raise HTTPException(status_code=400, detail="Invalid plan") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("UPDATE users SET plan = ? WHERE id = ?", (plan_update.plan, user["id"])) conn.commit() conn.close() return {"plan": plan_update.plan, "price": PLANS[plan_update.plan]["price"]} @router.post("/users/me/api-keys") def save_api_keys(keys: APIKeys, user=Depends(get_current_user)): """Save user's own API keys""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("UPDATE users SET api_keys = ? WHERE id = ?", (keys.json(), user["id"])) conn.commit() conn.close() return {"saved": True} @router.get("/users/me/api-keys") def get_api_keys(user=Depends(get_current_user)): """Get user's stored API keys (masked for security)""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") keys = json.loads(user_row.get("api_keys", "{}")) # Mask keys for display masked = {} for k, v in keys.items(): if v and len(v) > 8: masked[k] = v[:4] + "..." + v[-4:] else: masked[k] = v return masked @router.post("/users/me/profile") def save_profile(profile: UserProfile, user=Depends(get_current_user)): """Save user's profile""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("UPDATE users SET profile = ? WHERE id = ?", (profile.json(), user["id"])) conn.commit() conn.close() return {"saved": True} @router.get("/users/me/profile") def get_profile(user=Depends(get_current_user)): """Get user's profile""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") profile = json.loads(user_row.get("profile", "{}")) return profile @router.get("/plans") def list_plans(): """List all available plans separated by user type""" return { "private_plans": PRIVATE_PLANS, "agency_plans": AGENCY_PLANS } # --- Job Search --- @router.post("/jobs/search") def search_jobs(req: JobSearchRequest, user=Depends(get_current_user)): """Search for jobs using user's API keys""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") keys = json.loads(user_row.get("api_keys", "{}")) results = [] # Search Jooble if keys.get("jooble"): try: jooble_results = search_jooble(req.query, req.location, keys["jooble"], req.page) results.extend(jooble_results) except Exception as e: pass # Search JSearch if keys.get("jsearch"): try: jsearch_results = search_jsearch(req.query, req.location, keys["jsearch"], req.page) results.extend(jsearch_results) except Exception as e: pass # Deduplicate by job URL seen = set() unique = [] for job in results: if job.get("job_url") and job["job_url"] not in seen: seen.add(job["job_url"]) unique.append(job) return {"jobs": unique, "total": len(unique)} def search_jooble(query: str, location: str, api_key: str, page: int = 1) -> List[dict]: """Search Jooble API""" import requests url = "https://ukr-dev.com/api/v1/vacancies/search" params = {"keywords": query, "location": location, "page": page, "results_per_page": 20} headers = {"X-Api-Key": api_key} try: resp = requests.get(url, params=params, headers=headers, timeout=10) if resp.status_code == 200: data = resp.json() jobs = [] for item in data.get("items", []): jobs.append({ "title": item.get("title", ""), "company": item.get("employer", {}).get("name", ""), "location": item.get("area", {}).get("name", ""), "job_url": item.get("link", ""), "source": "jooble" }) return jobs except: pass return [] def search_jsearch(query: str, location: str, api_key: str, page: int = 1) -> List[dict]: """Search RapidAPI JSearch""" import requests url = "https://jsearch.p.rapidapi.com/search" params = {"query": f"{query} {location}".strip(), "page": page, "num_pages": 1} headers = {"X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "jsearch.p.rapidapi.com"} try: resp = requests.get(url, params=params, headers=headers, timeout=10) if resp.status_code == 200: data = resp.json() jobs = [] for item in data.get("data", []): jobs.append({ "title": item.get("job_title", ""), "company": item.get("employer_name", ""), "location": item.get("job_location", ""), "job_url": item.get("job_apply_link", "") or item.get("job_google_link", ""), "source": "jsearch" }) return jobs except: pass return [] # --- AI Customize --- @router.post("/ai/customize") def customize_job(req: dict, user=Depends(get_current_user)): """AI customize resume + cover letter for a job""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") # Check plan limit check = check_plan_limit(user_row, "ai_customize") if not check["allowed"]: raise HTTPException( status_code=402, detail={ "error": "AI customization limit reached", "plan": user_row["plan"], "limit": check["limit"], "used": check["used"], "message": f"You've used all {check['used']} AI customizations for this billing cycle. Upgrade your plan for more." } ) job_description = req.get("job_description", "") resume_text = req.get("resume", "") company_name = req.get("company", "") job_title = req.get("title", "") keys = json.loads(user_row.get("api_keys", "{}")) perplexity_key = keys.get("perplexity", "") if not perplexity_key: raise HTTPException(status_code=400, detail="Perplexity API key required") # Call Perplexity Sonar API import requests url = "https://api.perplexity.ai/chat/completions" headers = {"Authorization": f"Bearer {perplexity_key}", "Content-Type": "application/json"} payload = { "model": "sonar", "messages": [ { "role": "system", "content": "You are a professional resume writer and career consultant. Customize the resume and write a cover letter based on the job description." }, { "role": "user", "content": f"""Job Title: {job_title} Company: {company_name} Job Description: {job_description} My Resume: {resume_text} Please provide: 1. A customized resume summary highlighting the most relevant skills and experience for this job 2. A tailored cover letter (200-300 words) Format your response as: ## RESUME_CUSTOMIZATION [your customized resume content here] ## COVER_LETTER [your cover letter here]""" } ] } try: resp = requests.post(url, json=payload, headers=headers, timeout=30) if resp.status_code == 200: result = resp.json() content = result["choices"][0]["message"]["content"] # Parse response parts = content.split("## COVER_LETTER") resume_custom = parts[0].replace("## RESUME_CUSTOMIZATION", "").strip() cover_letter = parts[1].strip() if len(parts) > 1 else "" # Increment usage increment_usage(user["id"], "ai_customize") return { "resume_customization": resume_custom, "cover_letter": cover_letter } else: raise HTTPException(status_code=502, detail="AI service error") except requests.exceptions.Timeout: raise HTTPException(status_code=504, detail="AI request timed out") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # --- Applications --- @router.post("/applications") def submit_application(app_data: ApplicationSubmit, user=Depends(get_current_user)): """Submit a job application (tracks usage against plan limit)""" user_row = get_user_row(user["id"]) if not user_row: raise HTTPException(status_code=404, detail="User not found") # Check plan limit (use "submission" for agency, "application" for jobseeker) plan_name = user_row.get("plan", "free") action = "submission" if plan_name in AGENCY_PLANS else "application" check = check_plan_limit(user_row, action) if not check["allowed"]: raise HTTPException( status_code=402, detail={ "error": "Limit reached", "plan": plan_name, "limit": check["limit"], "used": check["used"], "type": check.get("type", "jobseeker"), "message": f"You've used {check['used']} of your {check['limit']} submissions this billing cycle. Upgrade your plan for more." } ) # Create application record app_id = str(uuid.uuid4()) job = app_data.job conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" INSERT INTO applications (id, user_id, job_title, company, location, job_url, status, resume_version, cover_letter) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( app_id, user["id"], job.get("title", ""), job.get("company", ""), job.get("location", ""), job.get("job_url", ""), app_data.status, app_data.resume[:5000] if app_data.resume else "", app_data.cover_letter[:5000] if app_data.cover_letter else "" )) conn.commit() conn.close() # Increment usage action = "submission" if plan_name in AGENCY_PLANS else "application" increment_usage(user["id"], action) return {"id": app_id, "status": "submitted"} @router.get("/applications") def get_applications(user=Depends(get_current_user), status: str = None, limit: int = 50): """Get user's applications""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() if status: c.execute("SELECT * FROM applications WHERE user_id = ? AND status = ? ORDER BY applied_at DESC LIMIT ?", (user["id"], status, limit)) else: c.execute("SELECT * FROM applications WHERE user_id = ? ORDER BY applied_at DESC LIMIT ?", (user["id"], limit)) rows = c.fetchall() conn.close() return {"applications": [dict(r) for r in rows]} @router.patch("/applications/{app_id}") def update_application(app_id: str, status_update: dict, user=Depends(get_current_user)): """Update application status""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() new_status = status_update.get("status", "applied") c.execute("UPDATE applications SET status = ? WHERE id = ? AND user_id = ?", (new_status, app_id, user["id"])) conn.commit() conn.close() return {"updated": True, "status": new_status} # --- Admin Routes --- @router.get("/admin/stats") def admin_stats(): """Platform-wide statistics""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT COUNT(*) as count FROM users") total_users = c.fetchone()["count"] c.execute("SELECT COUNT(*) as count FROM applications") total_applications = c.fetchone()["count"] c.execute("SELECT COUNT(*) as count FROM applications WHERE status = 'applied'") applied = c.fetchone()["count"] c.execute("SELECT COUNT(*) as count FROM applications WHERE status = 'interview'") interviews = c.fetchone()["count"] c.execute(""" SELECT plan, COUNT(*) as count FROM users GROUP BY plan """) plan_distribution = [dict(r) for r in c.fetchall()] c.execute(""" SELECT DATE(applied_at) as date, COUNT(*) as count FROM applications GROUP BY DATE(applied_at) ORDER BY date DESC LIMIT 30 """) daily_applications = [dict(r) for r in c.fetchall()] c.execute(""" SELECT company, COUNT(*) as count FROM applications WHERE company != '' GROUP BY company ORDER BY count DESC LIMIT 10 """) top_companies = [dict(r) for r in c.fetchall()] conn.close() return { "total_users": total_users, "total_applications": total_applications, "applied": applied, "interviews": interviews, "conversion_rate": round(interviews / applied * 100, 1) if applied > 0 else 0, "plan_distribution": plan_distribution, "daily_applications": daily_applications, "top_companies": top_companies } @router.get("/admin/users") def admin_users(): """All users with stats""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" SELECT u.id, u.email, u.name, u.plan, u.monthly_count, u.ai_customize_count, COUNT(a.id) as applications FROM users u LEFT JOIN applications a ON u.id = a.user_id GROUP BY u.id ORDER BY u.created_at DESC """) rows = c.fetchall() conn.close() return {"users": [dict(r) for r in rows]} @router.get("/admin/applications") def admin_applications(limit: int = 100): """All applications""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" SELECT a.*, u.email as user_email FROM applications a JOIN users u ON a.user_id = u.id ORDER BY a.applied_at DESC LIMIT ? """, (limit,)) rows = c.fetchall() conn.close() return {"applications": [dict(r) for r in rows]} # --- Agency Routes --- @router.post("/agency/clients") def create_client(client_data: dict, user=Depends(get_current_user)): """Create a client profile under the agency account""" plan_name = user.get("id") # We need to check plan from user dict user_row = get_user_row(user["id"]) if not user_row or user_row["plan"] not in AGENCY_PLANS: raise HTTPException(status_code=403, detail="Agency plan required") plan = AGENCY_PLANS[user_row["plan"]] # Check client limit conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT COUNT(*) as count FROM client_profiles WHERE agency_id = ?", (user["id"],)) current_clients = c.fetchone()["count"] if current_clients >= plan["clients"]: conn.close() raise HTTPException(status_code=402, detail=f"Client limit reached. Your plan allows {plan['clients']} clients.") client_id = str(uuid.uuid4()) c.execute(""" INSERT INTO client_profiles (id, agency_id, client_name, client_email, profile_data) VALUES (?, ?, ?, ?, ?) """, ( client_id, user["id"], client_data.get("name", ""), client_data.get("email", ""), json.dumps(client_data.get("profile", {})) )) conn.commit() conn.close() return {"client_id": client_id, "name": client_data.get("name", "")} @router.get("/agency/clients") def list_clients(user=Depends(get_current_user)): """List all clients under agency account""" user_row = get_user_row(user["id"]) if not user_row or user_row["plan"] not in AGENCY_PLANS: raise HTTPException(status_code=403, detail="Agency plan required") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute("SELECT * FROM client_profiles WHERE agency_id = ? ORDER BY created_at DESC", (user["id"],)) rows = c.fetchall() conn.close() return {"clients": [dict(r) for r in rows]} @router.get("/agency/usage") def agency_usage(user=Depends(get_current_user)): """Get agency submission usage""" user_row = get_user_row(user["id"]) if not user_row or user_row["plan"] not in AGENCY_PLANS: raise HTTPException(status_code=403, detail="Agency plan required") plan = AGENCY_PLANS[user_row["plan"]] return { "plan": user_row["plan"], "submissions_limit": plan["submissions"], "submissions_used": user_row["monthly_count"], "submissions_remaining": max(0, plan["submissions"] - user_row["monthly_count"]), "clients_limit": plan["clients"], "clients_used": len([]), # Will be populated from DB } @router.delete("/agency/clients/{client_id}") def delete_client(client_id: str, user=Depends(get_current_user)): """Delete a client profile""" user_row = get_user_row(user["id"]) if not user_row or user_row["plan"] not in AGENCY_PLANS: raise HTTPException(status_code=403, detail="Agency plan required") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("DELETE FROM client_profiles WHERE id = ? AND agency_id = ?", (client_id, user["id"])) conn.commit() conn.close() return {"deleted": True} @router.post("/admin/reset-usage") def admin_reset_usage(user_id: str = None): """Admin: Reset usage counts (monthly billing cycle reset)""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() if user_id: c.execute("UPDATE users SET monthly_count = 0, ai_customize_count = 0, billing_cycle_start = CURRENT_TIMESTAMP WHERE id = ?", (user_id,)) msg = f"Reset usage for user {user_id}" else: c.execute("UPDATE users SET monthly_count = 0, ai_customize_count = 0, billing_cycle_start = CURRENT_TIMESTAMP") msg = "Reset all users" conn.commit() conn.close() return {"success": True, "message": msg} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)