Source code for load_balancer

from fastapi import FastAPI, Request, Response
import httpx
import itertools
import re
import hashlib

app = FastAPI()

# Define backend servers
BACKENDS = ["http://10.250.201.148:8001", "http://10.250.201.148:8002", "http://10.250.150.216:8003"]
backend_cycle = itertools.cycle(BACKENDS)
MAX_RETRIES = 3

[docs] def consistent_hash(job_id): """ Generates a consistent hash value for a given job ID. This function uses the MD5 hashing algorithm to compute a hash for the provided job ID string. The resulting hash is then converted to an integer in base 16. Args: job_id (str): The unique identifier for the job to be hashed. Returns: int: A consistent integer hash value derived from the job ID. """ return int(hashlib.md5(job_id.encode()).hexdigest(), 16)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy(path: str, request: Request): """ Smart proxy: - Sticky routing for job_id-specific paths (status/result/image) - Round-robin with retry logic for everything else (2-fault tolerant) """ job_id_match = re.search(r"/(?:image|job_status|job_result)/([a-f0-9\-]+)", path) for attempt in range(MAX_RETRIES): if job_id_match: job_id = job_id_match.group(1) server_idx = consistent_hash(job_id + str(attempt)) % len(BACKENDS) backend = BACKENDS[server_idx] elif path == "process_query": backend = next(backend_cycle) # still random for new jobs else: backend = next(backend_cycle) url = f"{backend}/{path}" print(f"[Load Balancer] Attempt {attempt + 1}: Routing '{path}' to → {backend}") try: async with httpx.AsyncClient() as client: req_data = await request.body() headers = dict(request.headers) resp = await client.request( method=request.method, url=url, headers=headers, content=req_data, params=request.query_params ) content_type = resp.headers.get("content-type", "application/octet-stream") print(f"{backend} joined successfully for '{path}'") return Response( content=resp.content, status_code=resp.status_code, media_type=content_type ) except httpx.RequestError as e: print(f"⚠️ Backend {backend} failed with error: {e}") continue # try next backend return Response(content="❌ All backends failed.", status_code=503)