import gradio as gr
import requests
import time
import argparse
import os
from datetime import datetime
from typing import Dict, List, Any
from dotenv import load_dotenv
from PIL import Image
from io import BytesIO
from database_manager import DatabaseManager # works when run from root # works when run from inside src/
# Load environment variables
load_dotenv()
# Configuration
API_KEY = os.getenv("OPENROUTER_API_KEY", "")
BACKEND_URL = os.getenv("BACKEND_URL") or f"http://{os.getenv('HOST', 'localhost')}:{os.getenv('PORT', '8000')}"
# Set up logging
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Define client-side state
current_job_id = None
model_info = None
domains = None
question_types = None
examples_by_domain = None
[docs]
def init_client(retries: int = 3, delay: float = 1.5) -> bool:
"""
Initializes the client by attempting to connect to the backend and fetching necessary data.
This function tries to establish a connection to the backend server and retrieve information
such as model details, domains, question types, and examples. It retries the connection
a specified number of times with a delay between attempts if the connection fails.
Args:
retries (int): The maximum number of connection attempts. Defaults to 3.
delay (float): The delay (in seconds) between consecutive connection attempts. Defaults to 1.5.
Returns:
bool: True if the connection and data retrieval are successful, False otherwise.
Raises:
None: All exceptions are caught and logged, and the function will return False
if the connection fails after the specified number of retries.
Global Variables:
model_info (dict): Stores information about models retrieved from the backend.
domains (dict): Stores domain data retrieved from the backend.
question_types (list): Stores question types retrieved from the backend.
examples_by_domain (dict): Stores examples categorized by domain retrieved from the backend.
"""
global model_info, domains, question_types, examples_by_domain
attempt = 0
while attempt < retries:
try:
logger.info(f"Attempting to connect to backend ({attempt + 1}/{retries}) at {BACKEND_URL}")
# Try one representative endpoint first to check connectivity
response = requests.get(f"{BACKEND_URL}/models", timeout=5)
if response.status_code != 200:
raise Exception(f"Status code {response.status_code} on /models")
# If models are reachable, fetch everything
model_info = response.json()
response = requests.get(f"{BACKEND_URL}/domains")
domains = response.json().get("domains", {}) if response.status_code == 200 else {}
response = requests.get(f"{BACKEND_URL}/question_types")
question_types = response.json().get("question_types", []) if response.status_code == 200 else []
response = requests.get(f"{BACKEND_URL}/examples")
examples_by_domain = response.json().get("examples", {}) if response.status_code == 200 else {}
return True
except Exception as e:
logger.warning(f"Failed to connect to backend: {e}")
attempt += 1
time.sleep(delay)
logger.error("Failed to connect after multiple attempts.")
return False
[docs]
def update_aggregator(aggregator_id):
"""
Updates the aggregator model on the backend server.
This function sends a POST request to the backend server to update the
aggregator model identified by the given `aggregator_id`. If the request
is successful, the updated aggregator ID is returned. Otherwise, an error
is logged, and `None` is returned.
Args:
aggregator_id (str): The unique identifier of the aggregator to update.
Returns:
str or None: The updated aggregator ID if the request is successful,
or `None` if the request fails or an exception occurs.
Raises:
None: Any exceptions are caught and logged.
"""
try:
response = requests.post(
f"{BACKEND_URL}/update_aggregator",
json={"aggregator_id": aggregator_id}
)
if response.status_code == 200:
return response.json().get("aggregator_id")
else:
logger.error(f"Failed to update aggregator: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error updating aggregator: {str(e)}")
return None
[docs]
def get_example_choices(domain):
"""
Fetches example choices for a given domain from a backend service.
Args:
domain (str): The domain for which to fetch example choices.
If the domain is None or empty, an empty list of choices is returned.
Returns:
gr.update: An object containing the updated choices and value.
- If successful, the choices are populated with examples fetched from the backend.
- If the request fails or an error occurs, the choices are empty.
Raises:
Logs errors if the backend request fails or an exception occurs during execution.
"""
try:
# Check if domain is null or empty
if not domain:
return gr.update(choices=[], value=None)
response = requests.post(
f"{BACKEND_URL}/get_example_choices",
json={"domain": domain}
)
if response.status_code == 200:
examples = response.json().get("examples", [])
return gr.update(choices=examples, value=None)
else:
logger.error(f"Failed to get example choices: {response.status_code}")
return gr.update(choices=[], value=None)
except Exception as e:
logger.error(f"Error getting example choices: {str(e)}")
return gr.update(choices=[], value=None)
[docs]
def fill_query_and_type(selected_example, domain):
"""
Sends a POST request to the backend to fill in a query and determine the question type
based on the provided example and domain.
Args:
selected_example (str): The selected example to be processed.
domain (str): The domain context for the query.
Returns:
tuple: A tuple containing:
- query (str): The filled query string. Returns an empty string if an error occurs.
- question_type (str): The type of the question. Defaults to "None" if not determined.
"""
if not selected_example:
return "", "None" # Prevent 422 errors
try:
response = requests.post(
f"{BACKEND_URL}/fill_query_and_type",
json={"selected_example": selected_example, "domain": domain}
)
if response.status_code == 200:
result = response.json()
return result.get("query", ""), result.get("question_type", "None")
else:
logger.error(f"Failed to fill query and type: {response.status_code}")
return "", "None"
except Exception as e:
logger.error(f"Error filling query and type: {str(e)}")
return "", "None"
[docs]
def update_output_labels(aggregator_id):
"""
Updates the output labels for the aggregator and agent models based on the
provided aggregator ID and the model information retrieved from the backend.
Args:
aggregator_id (str): The ID of the aggregator whose labels need to be updated.
Returns:
dict: A dictionary containing updated labels for the aggregator and up to
three agent models. The keys correspond to the output components
(e.g., `output_aggregator`, `output_model1`, `output_model2`, `output_model3`),
and the values are `gr.update` objects with the new labels.
"""
global model_info
try:
response = requests.get(f"{BACKEND_URL}/models")
if response.status_code == 200:
model_info = response.json()
else:
logger.error(f"Failed to refresh models: {response.status_code}")
return {}
except Exception as e:
logger.error(f"Error refreshing models: {str(e)}")
return {}
# Always build fresh label based on actual aggregator used
base_name = model_info.get(aggregator_id, {}).get("name", aggregator_id)
short_name = base_name.split("/")[-1].split(":")[0].replace("-instruct", "").replace("-", " ").title()
# Label aggregator correctly
if model_info.get(aggregator_id, {}).get("aggregator", False):
agg_label = f"{short_name} (Aggregator)"
else:
agg_label = f"{short_name} (Fallback Aggregator)"
# List other agent models, excluding the actual aggregator
non_aggregator_models = [mid for mid in model_info if mid != aggregator_id]
agent_labels = []
for i in range(3):
if i < len(non_aggregator_models):
model_id = non_aggregator_models[i]
base_name = model_info[model_id]["name"]
short_name = base_name.split("/")[-1].split(":")[0].replace("-instruct", "").replace("-", " ").title()
agent_labels.append(short_name)
else:
agent_labels.append("Agent Not Available")
return {
output_aggregator: gr.update(label=agg_label),
output_model1: gr.update(label=agent_labels[0]),
output_model2: gr.update(label=agent_labels[1]),
output_model3: gr.update(label=agent_labels[2]),
}
[docs]
def process_query(query, api_key, question_type, domain, aggregator_id, ethical_views, session, progress=gr.Progress()):
"""
Processes a user query by sending it to a backend service, polling for the job status,
and retrieving the results, including responses from models, analysis, and visualizations.
Args:
query (str): The user query to be processed.
api_key (str): The API key for authentication with the backend.
question_type (str): The type of question being asked.
domain (str): The domain or context of the query.
aggregator_id (str): The ID of the aggregator model to be used.
session (str): The session or username for tracking the request.
progress (gr.Progress, optional): A progress tracker for updating the UI.
Returns:
list: Contains:
- str: Aggregator model response or error message.
- str: Responses from up to three non-aggregator models.
- float: Consensus score.
- Image or None: Visualizations (heatmap, emotion, polarity, radar).
- gr.update: Updates for warnings and model labels.
Raises:
Exception: For errors during query processing, including network or backend issues.
"""
global current_job_id
consensus_score = 0 # Ensure it's always defined
# Check for required inputs
if not api_key:
return [
"Error: OpenRouter API key is required", "", "", "", 0,
None, None, None, None,
gr.update(value="", visible=False),
gr.update(label="Aggregator"),
gr.update(label="Model 1"),
gr.update(label="Model 2"),
gr.update(label="Model 3")
]
if not query.strip():
return [
"Please enter a query", "", "", "", 0,
None, None, None, None,
gr.update(value="", visible=False),
gr.update(label="Aggregator"),
gr.update(label="Model 1"),
gr.update(label="Model 2"),
gr.update(label="Model 3")
]
try:
progress(0.05, desc="Sending request to backend...")
# Submit query to backend
response = requests.post(
f"{BACKEND_URL}/process_query",
json={
"query": query,
"api_key": api_key,
"question_type": question_type,
"domain": domain,
"aggregator_id": aggregator_id,
"username": session,
"ethical_views": ethical_views
}
)
if response.status_code != 200:
logger.error(f"Failed to process query: {response.status_code}")
return [
f"Error: Backend returned status code {response.status_code}", "", "", "", 0, None, None, None, None
]
# Get job ID
job_data = response.json()
current_job_id = job_data.get("job_id")
if not current_job_id:
logger.error("No job ID returned from backend")
return [
"Error: No job ID returned from backend", "", "", "", 0, None, None, None, None
]
# Poll for job status
status = "processing"
poll_count = 0
max_polls = 120
progress(0.1, desc="Models generating responses...")
while status == "processing" and poll_count < max_polls:
time.sleep(1)
poll_count += 1
status_response = requests.get(f"{BACKEND_URL}/job_status/{current_job_id}")
if status_response.status_code != 200:
logger.error(f"Failed to get job status: {status_response.status_code}")
continue
status_data = status_response.json()
status = status_data.get("status", "processing")
current_progress = status_data.get("progress", 0)
progress(0.1 + (current_progress / 100) * 0.8, desc=f"Processing... {current_progress}%")
if status != "completed":
return [
"Error: Backend processing timed out or failed", "", "", "", 0,
None, None, None, None,
gr.update(value="", visible=False), # plot warning
gr.update(label="Aggregator"),
gr.update(label="Model 1"),
gr.update(label="Model 2"),
gr.update(label="Model 3")
]
# Get job result
progress(0.95, desc="Retrieving results...")
result_response = requests.get(f"{BACKEND_URL}/job_result/{current_job_id}")
if result_response.status_code != 200:
logger.error(f"Failed to get job result: {result_response.status_code}")
return [
f"Error: Failed to retrieve results (status code {result_response.status_code})", "", "", "", 0, None, None, None, None
]
# Parse result
result = result_response.json()
responses = result.get("responses", {})
analysis = result.get("analysis", {})
warning_msg = analysis.get("warning", "")
consensus_score = result.get("consensus_score", 0)
aggregator_id = result.get("aggregator_id")
label_updates = update_output_labels(aggregator_id)
# Get non-aggregator agents from the response
agent_ids = [mid for mid in responses.keys() if mid != aggregator_id]
agent_boxes = []
# Prepare 3 output boxes: either real response or fallback text
for i in range(3):
if i < len(agent_ids):
agent_id = agent_ids[i]
response_text = responses.get(agent_id, f"No response from {agent_id}")
label = model_info.get(agent_id, {}).get("display_name", agent_id)
else:
response_text = "No agent available"
label = "Agent Not Available"
agent_boxes.append((label, response_text))
# Get non-aggregator models
non_aggregator_models = [model_id for model_id, info in model_info.items()
if not info.get("aggregator", False)]
def fetch_image(url):
"""
Fetches an image from the specified URL.
This function sends a GET request to the provided URL and attempts to
retrieve an image. If the request is successful and the response status
code is 200, the image is returned as a PIL Image object. If the request
fails or an exception occurs, a warning is logged, and None is returned.
Args:
url (str): The URL of the image to fetch.
Returns:
PIL.Image.Image or None: The fetched image as a PIL Image object if
successful, otherwise None.
"""
try:
res = requests.get(url)
if res.status_code == 200:
return Image.open(BytesIO(res.content))
except Exception as e:
logger.warning(f"Failed to fetch image from {url}: {e}")
return None
# Get images
heatmap_url = f"{BACKEND_URL}/image/{current_job_id}/heatmap"
emotion_url = f"{BACKEND_URL}/image/{current_job_id}/emotion_chart"
polarity_url = f"{BACKEND_URL}/image/{current_job_id}/polarity_chart"
radar_url = f"{BACKEND_URL}/image/{current_job_id}/radar_chart"
heatmap_url = fetch_image(f"{BACKEND_URL}/image/{current_job_id}/heatmap")
emotion_url = fetch_image(f"{BACKEND_URL}/image/{current_job_id}/emotion_chart")
polarity_url = fetch_image(f"{BACKEND_URL}/image/{current_job_id}/polarity_chart")
radar_url = fetch_image(f"{BACKEND_URL}/image/{current_job_id}/radar_chart")
progress(1.0, desc="Complete!")
return [
responses.get(aggregator_id, "Error: Aggregator model failed to respond"),
agent_boxes[0][1],
agent_boxes[1][1],
agent_boxes[2][1],
consensus_score,
heatmap_url if "error" not in analysis else None,
emotion_url if "error" not in analysis else None,
polarity_url if "error" not in analysis else None,
radar_url if "error" not in analysis else None,
gr.update(value=warning_msg, visible=bool(warning_msg)),
label_updates[output_aggregator],
label_updates[output_model1],
label_updates[output_model2],
label_updates[output_model3],
]
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
return [
f"Error: {str(e)}", "", "", "", 0, None, None, None, None
]
# Create the Gradio interface
[docs]
def create_gradio_interface():
"""
Creates a Gradio interface for a Distributed Multi-Agent LLM System.
This function initializes the Gradio application with multiple interactive components,
including login/signup functionality, query submission, model response display,
analysis visualizations, and interaction history management. It also handles user
authentication, session management, and backend communication for processing queries
and retrieving historical data.
Returns:
gr.Blocks: The Gradio Blocks application instance.
"""
global output_aggregator, output_model1, output_model2, output_model3, consensus_score
global output_heatmap, output_emotion, output_polarity, output_radar
global plot_warning_box
# Initialize client
init_success = init_client()
if not init_success:
raise RuntimeError(f"Failed to connect to backend at {BACKEND_URL} after 3 attempts. Please check the server and try again.")
with gr.Blocks(title="Multi-Agent LLM System") as app:
gr.Markdown(
"<h1 style='text-align: center;'>Distributed Multi-Agent LLM System</h1>",
elem_id="title"
)
# Login/Signup UI block
session = gr.State(value=None) # to store current user session
with gr.Row():
with gr.Column():
auth_toggle = gr.Radio(choices=["Login", "Signup"], value="Login", label="Choose Action")
username_input = gr.Textbox(label="Username")
password_input = gr.Textbox(
label="Password",
type="password"
)
show_password_checkbox = gr.Checkbox(
label="👁 Show Password",
value=False
)
login_button = gr.Button("Login", visible=True)
signup_button = gr.Button("Create Account", visible=False)
with gr.Row():
logout_button = gr.Button("Logout", visible=False)
delete_account_button = gr.Button("Delete Account", visible=False)
login_status = gr.Markdown("Not logged in")
delete_password_input = gr.Textbox(
label="Confirm your password to delete account",
type="password",
visible=False
)
show_delete_password_checkbox = gr.Checkbox(
label="👁 Show Password",
value=False,
visible=False
)
confirm_delete_button = gr.Button("Confirm Deletion", visible=False)
def toggle_auth_mode(mode, session):
"""
Toggles the visibility of authentication UI components based on the current mode
("Signup" or "Login") and session status.
Args:
mode (str): The current authentication mode, either "Signup" or "Login".
session (bool): Indicates whether a user session is active (True) or not (False).
Returns:
tuple: A pair of `gr.update` objects that control the visibility of the
authentication UI components. The first element corresponds to the
"Login" visibility, and the second corresponds to the "Signup" visibility.
"""
if session:
# Already logged in, don't toggle anything
return gr.update(visible=False), gr.update(visible=False)
if mode == "Signup":
return gr.update(visible=False), gr.update(visible=True)
else:
return gr.update(visible=True), gr.update(visible=False)
auth_toggle.change(
fn=toggle_auth_mode,
inputs=[auth_toggle, session],
outputs=[login_button, signup_button]
)
with gr.Row():
with gr.Column():
api_key_input = gr.Textbox(
label="OpenRouter API Key",
placeholder="Enter your OpenRouter API key",
value=API_KEY,
type="password"
)
# Add aggregator selection radio button
aggregator_options = list(model_info.keys()) if model_info else []
default_aggregator = next((model_id for model_id, info in model_info.items()
if info.get("aggregator", False)), None) if model_info else None
aggregator_radio = gr.Radio(
choices=aggregator_options,
label="Select Aggregator Model",
value=default_aggregator
)
domain_options = list(domains.keys()) if domains else []
domain_radio = gr.Radio(
choices=domain_options,
label="Select Domain Expertise",
value="Custom" if "Custom" in domain_options else None
)
ethical_view_selector = gr.CheckboxGroup(
choices=[
"None",
"Utilitarian",
"Deontologist",
"Virtue Ethicist",
"Libertarian",
"Rawlsian",
"Precautionary"
],
value=["None"],
label="Choose Ethical View(s) for Agents",
info="Select upto ethical 3 perspectives, or choose 'None' to skip ethics."
)
ethical_warning_box = gr.Markdown("", visible=False)
ethical_description_box = gr.Markdown("", visible=False)
question_type_options = question_types if question_types else []
question_type = gr.Radio(
choices=question_type_options,
label="Question Type",
value="None" if "None" in question_type_options else None
)
input_query = gr.Textbox(
label="Your Query",
placeholder="Enter your question or prompt",
lines=3
)
example_options = examples_by_domain.get("Custom", []) if examples_by_domain else []
example_selector = gr.Dropdown(
choices=[ex[0] for ex in example_options] if example_options else [],
label="Choose an Example (Optional)"
)
submit_btn = gr.Button("Submit", variant="primary")
with gr.Tabs():
with gr.TabItem("Model Responses"):
# Get current aggregator model ID
current_aggregator_id = default_aggregator
# Get non-aggregator model IDs
non_aggregator_models = [model_id for model_id, info in model_info.items()
if not info.get("aggregator", False)] if model_info else []
# Full-width aggregator response
with gr.Row():
output_aggregator = gr.Textbox(
label=model_info[current_aggregator_id]["display_name"] if model_info and current_aggregator_id in model_info else "Consensus Summary",
lines=10,
max_lines=10
)
# Three equal columns for agent models
with gr.Row():
output_model1 = gr.Textbox(
label=model_info[non_aggregator_models[0]]["display_name"] if model_info and len(non_aggregator_models) > 0 else "Model 1",
lines=8,
max_lines=8
)
output_model2 = gr.Textbox(
label=model_info[non_aggregator_models[1]]["display_name"] if model_info and len(non_aggregator_models) > 1 else "Model 2",
lines=8,
max_lines=8
)
output_model3 = gr.Textbox(
label=model_info[non_aggregator_models[2]]["display_name"] if model_info and len(non_aggregator_models) > 2 else "Model 3",
lines=8,
max_lines=8
)
with gr.TabItem("Analysis Visualizations"):
with gr.Row():
plot_warning_box = gr.Markdown("", visible=False)
with gr.Row():
with gr.Column():
consensus_score = gr.Slider(
label="Consensus Score",
minimum=0,
maximum=100,
value=0,
interactive=False
)
with gr.Row():
with gr.Column():
output_heatmap = gr.Image(label="Response Similarity Matrix")
with gr.Column():
output_emotion = gr.Image(label="Emotional Tone Analysis")
with gr.Row():
with gr.Column():
output_polarity = gr.Image(label="Sentiment Polarity")
with gr.Column():
output_radar = gr.Image(label="Response Feature Comparison")
with gr.TabItem("Interaction History"):
with gr.Row():
refresh_history_btn = gr.Button("🔄 Refresh History")
# Use Gradio Dataframe with appropriate columns
history_list = gr.Dataframe(
headers=["Time", "Query", "Domain", "Question Type", "Consensus", "Roles"],
datatype=["str", "str", "str", "str", "number", "str"],
interactive=False,
row_count=10
)
# Hidden state to store job IDs
history_job_ids = gr.State([])
def refresh_history(session_user):
"""
Fetches and processes the history of user queries from the backend.
Args:
session_user (str): The username of the session user whose history is to be fetched.
Returns:
tuple: A tuple containing:
- rows (list): A list of lists where each inner list represents a row with the following details:
- timestamp (str): The formatted timestamp of the query.
- preview (str): A preview of the query (truncated to 100 characters if necessary).
- domain (str): The domain of the query (default is "Custom").
- question_type (str): The type of question (default is "None").
- consensus (float): The consensus score of the query.
- job_ids (list): A list of job IDs corresponding to the queries.
"""
try:
response = requests.get(f"{BACKEND_URL}/history?username={session_user}&limit=100")
if response.status_code != 200:
return [], []
history = response.json().get("history", [])
if not history:
return [], []
rows = []
job_ids = []
for item in history:
timestamp = datetime.fromtimestamp(item["timestamp"]).strftime("%Y-%m-%d %H:%M")
query = item["query"]
preview = query[:100] + "..." if len(query) > 100 else query
domain = item.get("domain", "Custom")
question_type = item.get("question_type", "None")
# Get aggregator response preview - improved handling
responses = item.get("responses", {})
aggregator_id = item.get("aggregator_id", "")
# If aggregator_id is empty or not in responses, try to find an aggregator response
if not aggregator_id or aggregator_id not in responses:
# Try to identify an aggregator from the model_info
for model_id, info in model_info.items():
if info.get("aggregator", False) and model_id in responses:
aggregator_id = model_id
break
aggregator_response = responses.get(aggregator_id, "")
if not aggregator_response and responses:
# If still no aggregator response but we have responses, use the first one
aggregator_response = next(iter(responses.values()), "")
consensus = item.get("consensus_score", 0)
job_id = item["job_id"]
roles = item.get("roles", "")
rows.append([timestamp, preview, domain, question_type, consensus, roles])
job_ids.append(job_id)
return rows, job_ids
except Exception as e:
logger.error(f"Error fetching history: {str(e)}")
return [], []
# Selection indication
with gr.Row():
gr.Markdown("First, select a row by clicking on it, then use these buttons:")
# Add explicit buttons with selected row indicator
with gr.Row():
selected_row_info = gr.Markdown("No row selected")
selected_row_idx = gr.State(-1)
with gr.Row():
load_btn = gr.Button("📥 Load Selected Entry")
delete_btn = gr.Button("🗑️ Delete Selected Entry")
def select_history_row(evt: gr.SelectData):
"""
Handles the selection of a row in a history table.
Args:
evt (gr.SelectData): The event data containing information about the selected row.
Returns:
tuple: A tuple containing the index of the selected row (int) and a string message
indicating the selected row number (1-based index).
"""
row_idx = evt.index[0]
return row_idx, f"Selected row: {row_idx + 1}"
# Connect selection event
history_list.select(
fn=select_history_row,
outputs=[selected_row_idx, selected_row_info]
)
# Load selected row
def load_selected_row(row_idx, job_ids):
"""
Loads the details of a selected job row based on the provided row index and job IDs.
Args:
row_idx (int): The index of the selected row in the job list.
job_ids (list): A list of job IDs corresponding to the rows.
Returns:
list: A list of 14 elements containing the following:
- gr.update(value=query): The query string associated with the job.
- domain (str): The domain of the query (e.g., "Custom").
- q_type (str): The question type (e.g., "None").
- None: Placeholder to clear the example dropdown.
- str: The response from the aggregator model.
- str: The response from the first non-aggregator model (if available).
- str: The response from the second non-aggregator model (if available).
- str: The response from the third non-aggregator model (if available).
- float: The consensus score value.
- Image or None: The heatmap image fetched from the backend.
- Image or None: The emotion chart image fetched from the backend.
- Image or None: The polarity chart image fetched from the backend.
- Image or None: The radar chart image fetched from the backend.
- str: The aggregator ID.
Raises:
Exception: Logs any unexpected errors encountered during the process.
"""
if row_idx < 0 or row_idx >= len(job_ids):
return [None] * 14 # Include aggregator_radio in the count (now 14 total outputs)
try:
job_id = job_ids[row_idx]
# Get full job details
result_response = requests.get(f"{BACKEND_URL}/job_result/{job_id}")
if result_response.status_code == 404:
logger.warning(f"Job {job_id} not found. It may have been deleted.")
return ["Job not found - refresh history", gr.update(value="Custom"), gr.update(value="None"), None, "", "", "", "", 0, None, None, None, None, gr.update(value=None)]
if result_response.status_code == 200:
result = result_response.json()
# Get responses
responses = result.get("responses", {})
aggregator_id = result.get("aggregator_id", "")
# Get non-aggregator model IDs
non_aggregator_models = [model_id for model_id, info in model_info.items()
if not info.get("aggregator", False)] if model_info else []
consensus_score_value = result.get("consensus_score", 0)
# Get query and metadata
query = result.get("query", "")
domain = result.get("domain", "Custom")
q_type = result.get("question_type", "None")
# Define fetch_image function locally if not already defined
def fetch_image(url):
"""
Fetches an image from the given URL.
This function sends a GET request to the specified URL and attempts to
retrieve an image. If the request is successful and the response status
code is 200, the image is returned as a PIL Image object. If the request
fails or an exception occurs, a warning is logged, and None is returned.
Args:
url (str): The URL of the image to fetch.
Returns:
PIL.Image.Image or None: The fetched image as a PIL Image object if
successful, otherwise None.
"""
try:
res = requests.get(url)
if res.status_code == 200:
return Image.open(BytesIO(res.content))
except Exception as e:
logger.warning(f"Failed to fetch image from {url}: {e}")
return None
# Prepare return values
return [
gr.update(value=query),
domain, # Force domain update using gr.update()
q_type, # Force question type update
None, # Clear example dropdown
responses.get(aggregator_id, ""),
responses.get(non_aggregator_models[0], "") if len(non_aggregator_models) > 0 else "",
responses.get(non_aggregator_models[1], "") if len(non_aggregator_models) > 1 else "",
responses.get(non_aggregator_models[2], "") if len(non_aggregator_models) > 2 else "",
consensus_score_value,
fetch_image(f"{BACKEND_URL}/image/{job_id}/heatmap"),
fetch_image(f"{BACKEND_URL}/image/{job_id}/emotion_chart"),
fetch_image(f"{BACKEND_URL}/image/{job_id}/polarity_chart"),
fetch_image(f"{BACKEND_URL}/image/{job_id}/radar_chart"), # Re-added radar chart
aggregator_id # Force aggregator update
]
elif result_response.status_code == 404:
logger.warning(f"Job {job_id} not found. It may have been deleted.")
# Refresh the history to remove stale entries
refresh_history()
return [None] * 14
else:
logger.error(f"Failed to fetch job {job_id}: {result_response.status_code}")
return [None] * 14
except Exception as e:
logger.error(f"Error loading job: {str(e)}")
return [None] * 14
# Delete selected row
def delete_selected_row(row_idx, job_ids, session_user):
"""
Deletes a selected row from the job history based on the provided row index.
Args:
row_idx (int): The index of the row to delete. Must be within the range of `job_ids`.
job_ids (list): A list of job IDs corresponding to the rows in the job history.
session_user (str): The username of the current session user.
Returns:
tuple: A tuple containing:
- Updated rows (or `gr.update()` if no update is performed).
- Updated job IDs (or `gr.update()` if no update is performed).
- A message string indicating the result of the operation.
Raises:
None: Any exceptions encountered during the operation are caught and logged.
"""
if row_idx < 0 or row_idx >= len(job_ids):
return gr.update(), gr.update(), "No row selected"
try:
job_id = job_ids[row_idx]
# Delete the job
delete_response = requests.delete(f"{BACKEND_URL}/history/{job_id}?username={session_user}")
if delete_response.status_code == 200:
logger.info(f"Deleted job {job_id}")
# Refresh the history
rows, job_ids = refresh_history(session_user)
return rows, job_ids, "Row deleted successfully"
else:
logger.error(f"Failed to delete job {job_id}: {delete_response.status_code}")
return gr.update(), gr.update(), f"Failed to delete: {delete_response.status_code}"
except Exception as e:
logger.error(f"Error deleting job: {str(e)}")
return gr.update(), gr.update(), f"Error: {str(e)}"
# Connect buttons
load_btn.click(
fn=load_selected_row,
inputs=[selected_row_idx, history_job_ids],
outputs=[
input_query, domain_radio, question_type, example_selector,
output_aggregator, output_model1, output_model2, output_model3,
consensus_score, output_heatmap, output_emotion, output_polarity,
output_radar, aggregator_radio
]
).then(fn=lambda: (-1, "No row selected"), outputs=[selected_row_idx, selected_row_info])
delete_btn.click(
fn=delete_selected_row,
inputs=[selected_row_idx, history_job_ids, session],
outputs=[history_list, history_job_ids, selected_row_info]
).then(fn=lambda: (-1, "No row selected"), outputs=[selected_row_idx, selected_row_info])
# Connect refresh button
refresh_history_btn.click(
fn=refresh_history,
inputs=[session],
outputs=[history_list, history_job_ids]
)
def is_valid_password(password):
"""
Validates whether a given password meets the following criteria:
- Contains at least 8 characters.
- Includes at least one uppercase letter.
- Includes at least one lowercase letter.
- Includes at least one numeric digit.
Args:
password (str): The password string to validate.
Returns:
bool: True if the password meets all the criteria, False otherwise.
"""
return (
len(password) >= 8 and
any(c.isupper() for c in password) and
any(c.islower() for c in password) and
any(c.isdigit() for c in password)
)
def login_user(username, password):
"""
Authenticates a user based on the provided username and password.
Args:
username (str): The username of the user attempting to log in.
password (str): The password associated with the username.
Returns:
tuple: Contains the username (or None) and UI component updates.
"""
db = DatabaseManager()
if db.verify_user(username, password):
return (
username,
gr.update(visible=False), # login button
gr.update(visible=False), # signup button
gr.update(visible=True), # logout button
gr.update(visible=True), # delete button
gr.update(visible=False), # password field
gr.update(visible=False), # confirm delete button
gr.update(value=f"✅ Logged in as **{username}**"),
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
else:
return (
None,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(value="⚠️ Invalid username or password."),
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
def signup_user(username, password):
"""
Handles the signup process for a new user.
Args:
username (str): Desired username for the new account.
password (str): Password for the new account.
Returns:
tuple: Updates UI components and displays signup result.
"""
db = DatabaseManager()
if not is_valid_password(password):
return (
None,
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(value="❌ Password must be at least 8 characters, include a lowercase, uppercase letter, and a number."),
gr.update(value=False), # show_password_checkbox reset
gr.update(type="password") # password input reset
)
result = db.create_user(username, password)
if result is True:
return (
username,
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=True), gr.update(visible=True),
gr.update(visible=False), gr.update(visible=False),
gr.update(value=f"✅ **Account created! Welcome, {username}**"),
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
elif result == "duplicate":
return (
None,
gr.update(visible=False), gr.update(visible=False), # hide both
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(value="⚠️ **The username already exists.**"),
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
else:
return (
None,
gr.update(visible=False), gr.update(visible=False), # hide both
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(value="❌ **Signup failed due to a server error.**"),
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
def logout_user():
"""
Logs out the user and updates the UI components to reflect the logged-out state.
Returns:
tuple: Updates for UI components to reset to the logged-out state.
"""
return (
None,
gr.update(visible=False), # login
gr.update(visible=False), # signup
gr.update(visible=False), # logout
gr.update(visible=False), # delete
gr.update(visible=False), # password field
gr.update(visible=False), # confirm delete
gr.update(value="✅ **You have been logged out successfully.**"),
gr.update(value=False, visible=False), # hide + uncheck the checkbox
gr.update(value=False), # reset checkbox
gr.update(type="password")
)
def delete_account(username, password):
"""
Deletes a user account from the database if the provided credentials are valid.
Args:
username (str): The username of the account to be deleted.
password (str): The password associated with the account.
Returns:
tuple: A tuple containing updates for the UI components based on the success or failure of the operation.
"""
db = DatabaseManager()
if db.delete_user(username, password):
return (
None, # clear session
gr.update(visible=True), # login
gr.update(visible=True), # signup
gr.update(visible=False), # logout
gr.update(visible=False), # delete account
gr.update(value=""), # clear password field
gr.update(visible=False), # hide confirm delete
gr.update(value="✅ Account deleted successfully."),
gr.update(value=False, visible=False) # hide + uncheck the checkbox
)
else:
return (
username, # session still active
gr.update(visible=False), gr.update(visible=False), # hide login/signup
gr.update(visible=True), gr.update(visible=True), # keep logout + delete
gr.update(value=""), # clear password input
gr.update(visible=False), # hide confirm delete (to prevent retry loop)
gr.update(value="❌ Invalid password. Account not deleted.")
)
def show_delete_inputs():
"""
Updates the visibility of UI components related to the delete operation.
This function returns a tuple of `gr.update` calls that make the following
components visible:
- `delete_password_input`: Input field for entering the delete password.
- `confirm_delete_button`: Button to confirm the delete action.
- `show_delete_password_checkbox`: Checkbox to toggle the visibility of the delete password.
Returns:
tuple: A tuple containing `gr.update` objects to set the visibility of
the delete-related UI components to `True`.
"""
return (
gr.update(visible=True), # delete_password_input
gr.update(visible=True), # confirm_delete_button
gr.update(visible=True) # show_delete_password_checkbox
)
def toggle_password_visibility(show):
"""
Toggles the visibility of a password input field.
Args:
show (bool): A boolean indicating whether to show the password.
If True, the password will be displayed as plain text.
If False, the password will be hidden.
Returns:
dict: A dictionary containing the updated type for the input field,
either "text" for visible or "password" for hidden.
"""
return gr.update(type="text" if show else "password")
def validate_ethical_selection(selected):
"""
Validates the selection of ethical perspectives and provides corresponding warnings
and descriptions.
Args:
selected (list of str): A list of selected ethical perspectives. Each perspective
should be one of the predefined options:
["Utilitarian", "Deontologist", "Virtue Ethicist",
"Libertarian", "Rawlsian", "Precautionary", "None"].
Returns:
tuple: A tuple containing two `gr.update` objects:
- The first element updates the warning message, indicating if the selection
is invalid (e.g., selecting "None" with other perspectives or selecting an
invalid number of perspectives).
- The second element updates the descriptions of the selected ethical
perspectives, formatted as a string with detailed explanations.
"""
if "None" in selected and len(selected) > 1:
warning = "⚠️ Cannot select 'None' with other ethical views."
elif "None" not in selected and len(selected) > 3:
warning = "⚠️ You can select up to 3 ethical perspectives only."
else:
warning = ""
descriptions = {
"Utilitarian": "👉 **Utilitarian**: Maximize overall well-being and minimize harm.",
"Deontologist": "👉 **Deontologist**: Follow strict moral rules and duties.",
"Virtue Ethicist": "👉 **Virtue Ethicist**: Act based on virtuous character traits like honesty or compassion.",
"Libertarian": "👉 **Libertarian**: Prioritize individual freedom, consent, and autonomy.",
"Rawlsian": "👉 **Rawlsian**: Maximize justice and fairness for the least advantaged.",
"Precautionary": "👉 **Precautionary**: Avoid catastrophic risks and irreversible harm.",
"None": "👉 **None**: Skip assigning any ethical role."
}
desc_lines = [descriptions[r] for r in selected if r in descriptions]
return (
gr.update(value=warning, visible=bool(warning)),
gr.update(value="\n\n".join(desc_lines), visible=bool(desc_lines))
)
ethical_view_selector.change(
fn=validate_ethical_selection,
inputs=[ethical_view_selector],
outputs=[ethical_warning_box, ethical_description_box]
)
login_button.click(
fn=login_user,
inputs=[username_input, password_input],
outputs=[
session,
login_button, signup_button, logout_button,
delete_account_button, delete_password_input,
confirm_delete_button, login_status,
show_password_checkbox, password_input # main login checkbox & field
]
).then(
fn=toggle_auth_mode,
inputs=[auth_toggle, session],
outputs=[login_button, signup_button]
)
signup_button.click(
fn=signup_user,
inputs=[username_input, password_input],
outputs=[
session,
login_button, signup_button, logout_button,
delete_account_button, delete_password_input,
confirm_delete_button, login_status,
show_password_checkbox, password_input # main login checkbox & field
]
).then(
fn=toggle_auth_mode,
inputs=[auth_toggle, session],
outputs=[login_button, signup_button]
)
logout_button.click(
fn=logout_user,
outputs=[
session,
login_button, signup_button, logout_button,
delete_account_button, delete_password_input,
confirm_delete_button, login_status,
show_delete_password_checkbox,
show_password_checkbox, password_input # main login checkbox & field
]
).then(
fn=lambda: (-1, "No row selected"),
outputs=[selected_row_idx, selected_row_info]
).then(
fn=lambda: ([], []), # This clears history table and job IDs
outputs=[history_list, history_job_ids]
).then(
fn=toggle_auth_mode,
inputs=[auth_toggle, session],
outputs=[login_button, signup_button]
)
delete_account_button.click(
fn=show_delete_inputs,
inputs=[],
outputs=[
delete_password_input,
confirm_delete_button,
show_delete_password_checkbox
]
)
confirm_delete_button.click(
fn=delete_account,
inputs=[session, delete_password_input],
outputs=[
session,
login_button, signup_button,
logout_button, delete_account_button,
delete_password_input, confirm_delete_button,
login_status, show_delete_password_checkbox
]
).then(
fn=lambda: (-1, "No row selected"),
outputs=[selected_row_idx, selected_row_info]
).then(
fn=lambda: ([], []), # <-- This clears history table and job IDs
outputs=[history_list, history_job_ids]
).then(
fn=toggle_auth_mode,
inputs=[auth_toggle, session],
outputs=[login_button, signup_button]
)
show_password_checkbox.change(
fn=toggle_password_visibility,
inputs=[show_password_checkbox],
outputs=[password_input]
)
show_delete_password_checkbox.change(
fn=toggle_password_visibility,
inputs=[show_delete_password_checkbox],
outputs=[delete_password_input]
)
# Load history on startup
app.load(fn=refresh_history, inputs=[session], outputs=[history_list, history_job_ids])
# Connect the domain radio button to update example choices
domain_radio.change(fn=get_example_choices, inputs=domain_radio, outputs=example_selector)
# Connect the example selector to fill query and type
example_selector.change(fn=fill_query_and_type, inputs=[example_selector, domain_radio], outputs=[input_query, question_type])
# Connect the radio button to update the aggregator
aggregator_radio.change(
fn=update_aggregator,
inputs=aggregator_radio,
outputs=aggregator_radio
).then(
fn=update_output_labels,
inputs=aggregator_radio,
outputs=[output_aggregator, output_model1, output_model2, output_model3]
)
def guarded_process(query, api_key, question_type, domain, aggregator_id, ethical_views, session_user):
"""
A wrapper function that ensures a user session is active before processing a query.
Args:
*args: A variable-length argument list where the last argument is expected
to be the session user.
Returns:
tuple: If the session user is not active, returns a tuple containing:
- gr.update: An update object with a message prompting the user to log in.
- Empty strings and zeros for other return values.
- gr.update objects to reset UI components to their default state.
Otherwise, delegates the processing to the `process_query` function with
the provided arguments (excluding the session user).
"""
if not session_user:
return (
gr.update(value="❌ Please log in first."), "", "", "", 0,
None, None, None, None,
gr.update(value="", visible=False),
gr.update(label="Aggregator"),
gr.update(label="Model 1"),
gr.update(label="Model 2"),
gr.update(label="Model 3")
)
return process_query(query, api_key, question_type, domain, aggregator_id, ethical_views, session_user)
submit_btn.click(
fn=guarded_process,
inputs=[input_query, api_key_input, question_type, domain_radio, aggregator_radio, ethical_view_selector, session],
outputs=[
output_aggregator, output_model1, output_model2, output_model3,
consensus_score, output_heatmap, output_emotion, output_polarity,
output_radar, plot_warning_box,
output_aggregator, output_model1, output_model2, output_model3
]
)
return app
# Create .env file template
[docs]
def create_env_template():
"""
Creates a `.env` file in the current working directory if it does not already exist.
The `.env` file contains default configuration settings for the OpenRouter API,
including placeholders for the API key and backend URL.
Contents of the generated `.env` file:
- OPENROUTER_API_KEY: Placeholder for the OpenRouter API key.
- BACKEND_URL: Default backend URL set to `http://localhost:8000`.
This function ensures that the required environment configuration file is present
for the application to function correctly.
"""
if not os.path.exists(".env"):
with open(".env", "w") as f:
f.write("""# OpenRouter API Configuration
OPENROUTER_API_KEY=
BACKEND_URL=http://localhost:8000
""")
# Main function to run the system
[docs]
def main():
"""
The main function for the Multi-Agent LLM Client application. This function
parses command-line arguments, updates the backend URL, creates a .env
template, and launches a Gradio interface.
"""
global BACKEND_URL # Declare this before using BACKEND_URL anywhere
# Parse command line arguments
parser = argparse.ArgumentParser(description="Multi-Agent LLM Client")
parser.add_argument("--backend-url", type=str, default=BACKEND_URL, help="URL of the backend server")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host for Gradio client")
parser.add_argument("--port", type=int, default=7860, help="Port to run the Gradio interface on")
parser.add_argument("--share", action="store_true", help="Create a shareable link")
args = parser.parse_args()
# Update backend URL
BACKEND_URL = args.backend_url
# Create .env template
create_env_template()
# Create and launch Gradio app
app = create_gradio_interface()
app.launch(server_name=args.host, server_port=args.port, share=args.share)
if __name__ == "__main__":
main()