Application Modules
This section provides documentation for each Python module in the Distributed Multi Agent LLMs application.
server module
- class server.AggregatorRequest(*, aggregator_id: str)[source]
Bases:
BaseModelRepresents a request model for an aggregator.
- Attributes:
aggregator_id (str): A unique identifier for the aggregator.
- aggregator_id: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class server.ExampleRequest(*, domain: str)[source]
Bases:
BaseModelExampleRequest is a data model representing a request structure.
- Attributes:
domain (str): The domain name or identifier associated with the request.
- domain: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class server.FillQueryRequest(*, selected_example: str, domain: str)[source]
Bases:
BaseModelFillQueryRequest is a data model representing a request to fill a query.
- Attributes:
selected_example (str): The selected example to be used in the query. domain (str): The domain or context in which the query is being executed.
- domain: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- selected_example: str
- class server.OpenRouterClient(api_key, site_url=None, site_name=None)[source]
Bases:
objectOpenRouterClient is a class designed to interact with the OpenRouter API for generating responses, analyzing text embeddings, and performing sentiment and emotional tone analysis.
- Attributes:
api_key (str): The API key used for authenticating with the OpenRouter API. site_url (str, optional): The URL of the site making the request, used for HTTP-Referer header. site_name (str, optional): The name of the site making the request, used for X-Title header. url (str): The endpoint URL for the OpenRouter API. sentence_encoder (SentenceTransformer): A pre-trained model for generating text embeddings. sentiment_analyzer (SentimentIntensityAnalyzer): A VADER sentiment analyzer for sentiment analysis.
- Methods:
- generate_response(model_name: str, prompt: str, temperature: float = 0.7) -> str:
Asynchronously generates a response from a specified model via the OpenRouter API. Includes fault tolerance and error handling for API failures.
- get_embeddings(texts: List[str]) -> np.ndarray:
Generates embeddings for a list of input texts using a pre-trained sentence transformer.
- analyze_emotional_tones(text: str) -> Dict[str, float]:
Analyzes emotional tones in the input text using pattern matching and sentiment analysis. Returns a dictionary of emotional tone scores.
- analyze_sentiment(text: str) -> Dict[str, Any]:
Analyzes the sentiment of the input text using VADER and TextBlob. Returns a dictionary containing polarity, compound score, subjectivity, and emotional tones.
- analyze_emotional_tones(text: str) Dict[str, float][source]
Analyze the emotional tones present in a given text.
This method evaluates the input text for specific emotional tones such as Empathetic, Judgmental, Analytical, Ambivalent, Defensive, and Curious. It uses predefined regex patterns to detect tone-related keywords and incorporates sentiment analysis using VADER to adjust tone scores.
- Args:
text (str): The input text to analyze.
- Returns:
Dict[str, float]: A dictionary where keys are emotional tone categories and values are their respective normalized scores (rounded to 4 decimal places). The scores represent the relative presence of each tone in the text.
- analyze_sentiment(text: str) Dict[str, Any][source]
Analyzes the sentiment and emotional tone of the given text. This method combines sentiment analysis from VADER and TextBlob to calculate an average polarity and subjectivity score. It also evaluates emotional tones and determines a contextual tone based on the polarity.
- Args:
text (str): The input text to analyze.
- Returns:
- Dict[str, Any]: A dictionary containing the following keys:
‘polarity’ (float): The average polarity score from VADER and TextBlob.
‘compound’ (float): The compound sentiment score from VADER.
‘subjectivity’ (float): The subjectivity score from TextBlob.
‘emotional_tones’ (Dict[str, float]): A dictionary of emotional tones and their scores.
‘tone_context’ (str): A contextual description of the top emotional tone combined with sentiment polarity.
- async generate_response(model_name: str, prompt: str, temperature: float = 0.7) str[source]
Asynchronously generates a response from a specified model based on the given prompt.
- Args:
model_name (str): The name of the model to use for generating the response. prompt (str): The input prompt to send to the model. temperature (float, optional): The sampling temperature for the model’s response.
- Returns:
str: The generated response from the model, or an error message if the operation fails.
- Raises:
asyncio.TimeoutError: If the API request times out. Exception: For any other unexpected errors during the process.
- class server.QueryRequest(*, query: str, api_key: str = None, question_type: str = 'None', domain: str = 'None', aggregator_id: str = None, username: str, ethical_views: List[str] = [])[source]
Bases:
BaseModelQueryRequest is a data model representing the structure of a query request.
- Attributes:
query (str): The query string provided by the user. api_key (str, optional): An optional API key for authentication. Defaults to None. question_type (str): The type of question being asked. Defaults to “None”. domain (str): The domain or context of the query. Defaults to “None”. aggregator_id (str, optional): An optional identifier for the aggregator. Defaults to None. username (str): The username of the individual making the query.
- aggregator_id: str
- api_key: str
- domain: str
- ethical_views: List[str]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- query: str
- question_type: str
- username: str
- class server.ResponseAggregator(openrouter_client)[source]
Bases:
objectThis class is responsible for processing queries through multiple AI models, aggregating their responses, and generating a consensus summary. It also provides analysis of the responses, including sentiment, similarity, and visualization of key metrics.
- Attributes:
client (Any): The client used to interact with AI models for generating responses.
- async analyze_responses(query: str, responses: Dict[str, str]) Dict[str, Any][source]
Analyze the responses from different models and generate insights. This method processes the responses from various models, performs sentiment analysis, calculates similarity metrics, and generates visualizations to provide insights into the responses. If there are fewer than two valid responses, it skips the visual analysis and returns basic metrics with a warning.
- Args:
query (str): The input query for which the responses were generated. responses (Dict[str, str]): A dictionary model IDs as keys and responses as values.
- Returns:
Dict[str, Any]: A dictionary containing the some keys.
- Raises:
Exception: If an error occurs during the analysis process.
- async generate_consensus_summary(query: str, responses: Dict[str, str], aggregator_id: str) str[source]
Generate a summarized consensus from all model responses using the aggregator model.
- Args:
query (str): The query or prompt that was provided to the models. responses (Dict[str, str]): A dictionary mapping model IDs to their respective responses. aggregator_id (str): The ID of the designated aggregator model.
- Returns:
- str: A comprehensive consensus summary generated by the aggregator model. If all models fail
or encounter errors, a fallback message is returned indicating the inability to generate a summary.
- async process_query(query: str, question_type: str = 'none', ethical_views: List[str] = ['None']) Dict[str, Any][source]
Process a query through multiple agent models, aggregate their responses, and provide a consensus summary. This method performs the following steps: 1. Formats the query based on the specified question type. 2. Checks the health of agent models and selects up to three healthy models for processing. 3. Sends the query to the selected models asynchronously and collects their responses. 4. Identifies an aggregator model to summarize the responses, with failover to backup models if necessary. 5. Generates a consensus summary using the aggregator model. 6. Analyzes the responses and returns the results.
- Args:
query (str): The input query to process. question_type (str, optional): The type of question, used to apply a prefix to the query. Defaults to “none”. ethical_views (List[str], optional): A list of atmost 3 ethical perspectives or [“None”] to skip role assignments.
- Returns:
Dict[str, Any]: A dictionary containing the some keys.
- async server.api_agent_health()
Asynchronously checks the health of agents and formats the health status response. This function calls check_agent_health() to update the health status of agents. It then processes the agent_health dictionary to create a formatted response containing the health status of each agent, including the time since the last heartbeat, the number of failures, and retries.
- Returns:
dict: A dictionary where each key is a model ID and the value is another dictionary.
- async server.api_delete_history_item(job_id: str, username: str)
Delete an interaction from the user’s history.
- Args:
job_id (str): The unique identifier of the job or interaction to be deleted. username (str): The username of the user whose interaction history is being modified.
- Returns:
dict: A dictionary indicating the success of the operation with a key “success” set to True if the deletion was successful.
- Raises:
HTTPException: If the deletion fails or an unexpected error occurs, an HTTPException is raised with a status code of 500 and an appropriate error message.
- async server.api_fill_query_and_type(request: FillQueryRequest)
Handles the API request to fill a query and its corresponding question type based on a selected example.
- Args:
request (FillQueryRequest): The request object containing the domain and the selected example.
- Returns:
- dict: A dictionary containing:
“query” (str): The query string from the selected example.
“question_type” (str): The type of question associated with the query. Returns “None” if no matching example is found.
- Raises:
HTTPException: If an unexpected error occurs, returns a 500 status code with the error details.
- async server.api_get_domains()
Asynchronous function to retrieve a list of domains.
- Returns:
dict: A dictionary containing the key “domains” mapped to the value of the global variable DOMAINS.
- async server.api_get_example_choices(request: ExampleRequest)
Get example choices for a domain
- async server.api_get_examples()
Asynchronous function to retrieve example data.
- Returns:
dict: A dictionary containing examples categorized by domain.
- async server.api_get_history(username: str, limit: int = 50)
Fetches the interaction history for a specified user.
- Args:
username (str): The username of the user whose interaction history is to be retrieved. limit (int, optional): The maximum number of history records to retrieve. Defaults to 50.
- Returns:
dict: A dictionary containing the user’s interaction history under the key “history”.
- Raises:
HTTPException: If an error occurs while retrieving the history, an HTTP 500 error is raised with the error details.
- async server.api_get_image(job_id: str, image_type: str)
Retrieve a specific type of image associated with a completed job.
- Args:
job_id (str): The unique identifier of the job. image_type (str): The type of image to retrieve.
- Raises:
HTTPException: error codes
- Returns:
- StreamingResponse: A streaming response containing the requested image
in PNG format.
- async server.api_get_models()
Fetches the list of available models and their metadata.
This asynchronous function retrieves information about all available models, including their name, whether they use an aggregator, their associated color, and a formatted display name.
- Returns:
dict: A dictionary where each key is a model ID and the value is another dictionary.
- async server.api_get_question_types()
Asynchronous function to retrieve available question types.
This function returns a dictionary containing a list of question types derived from the keys of the QUESTION_PREFIXES dictionary, along with an additional “None” option.
- Returns:
dict: A dictionary with a single key “question_types”, whose value is a list of available question types.
- async server.api_job_result(job_id: str)
Retrieve the result of a job by its ID.
This function checks if the job is currently active and returns its status or result. If the job is not active, it attempts to retrieve the job’s result from the database. If found, the job’s data is rehydrated into the active jobs for further use.
- Args:
job_id (str): The unique identifier of the job.
- Returns:
- dict: A dictionary containing the job’s result, including responses, analysis,
consensus score, query, question type, domain, and aggregator ID.
- Raises:
HTTPException: If the job is not found in both active jobs and the database.
- async server.api_job_status(job_id: str)
Retrieve the status of a specific job.
- Args:
job_id (str): The unique identifier of the job.
- Returns:
dict: The status information of the job if it exists in active_jobs.
- Raises:
HTTPException: If the job_id is not found in active_jobs, a 404 error is raised.
- async server.api_process_query(request: QueryRequest, background_tasks: fastapi.BackgroundTasks)
Handles the processing of a query request asynchronously. This function updates agent heartbeats, validates the API key and query, applies domain prefixes if specified, updates the aggregator if provided, and starts a background task to process the query.
- Args:
- request (QueryRequest): The query request object containing the query,
API key, domain, question type, username, and optional aggregator ID.
- background_tasks (BackgroundTasks): The background tasks manager to
schedule asynchronous tasks.
- Returns:
- dict: A dictionary containing the job ID and the processing status.
If there are validation errors, an error message is returned.
- Raises:
HTTPException: If an unexpected error occurs during query processing.
- async server.api_reset_agent(model_id: str)
Handles the API request to reset a specific agent by its model ID.
- Args:
model_id (str): The unique identifier of the agent to be reset.
- Raises:
HTTPException: If the specified agent is not found in the system.
- Returns:
- dict: A dictionary containing the status and a success message indicating
that the agent has been reset.
- async server.api_update_aggregator(request: AggregatorRequest)
Handles the API request to update an aggregator.
- Args:
request (AggregatorRequest): The request object containing the aggregator ID to be updated.
- Returns:
dict: A dictionary containing the success status and the updated aggregator ID.
- Raises:
HTTPException: If an error occurs during the update process, an HTTP 500 error is raised with the error details.
- server.assign_ethics_to_agents(agent_ids: List[str], ethical_views: List[str]) Dict[str, str][source]
Assigns ethical perspectives to a list of agents based on the provided ethical views.
- Parameters:
agent_ids (List[str]): A list of agent identifiers. ethical_views (List[str]): A list of ethical perspectives to assign.
- Returns:
Dict[str, str]: A dictionary mapping each agent ID to its assigned ethical perspective.
- Raises:
ValueError: If the number of ethical perspectives is not 1, 3, or “None”.
- async server.check_agent_health()[source]
Periodically checks the health of agents and updates their status based on heartbeat activity.
- Raises:
Any exceptions raised by update_agent_heartbeat or other asynchronous operations.
- server.format_model_name(model_id)[source]
Formats the model name based on its configuration. This function retrieves the model configuration using the provided model_id from the MODELS dictionary. It extracts the model name from the path, removes unnecessary formatting (e.g., version numbers and suffixes like “-instruct”), capitalizes each word, and appends an “Aggregator” tag if applicable.
- Args:
model_id (str): The identifier for the model in the MODELS dictionary.
- Returns:
str: A human-readable, formatted model name.
- server.lifespan(app: fastapi.FastAPI)[source]
Handles the lifespan events of the FastAPI application. This function is a generator that manages the startup and shutdown events of the application. During startup, it initializes a background task to periodically check the health of agents. The background task runs indefinitely at a specified interval.
- Args:
app (FastAPI): The FastAPI application instance.
- Yields:
None: Control is passed to the application after startup tasks are initialized.
- async server.process_query_background(job_id: str, query: str, question_type: str, domain: str, ethical_views: List[str], username: str)[source]
Processes a query asynchronously in the background, updating job status and progress, interacting with agents, aggregating responses, and saving results to the database.
- Args:
job_id (str): A unique identifier for the job being processed. query (str): The query string to be processed. question_type (str): The type of question being asked (e.g., factual, analytical). domain (str): The domain or context of the query. ethical_views (List[str]): A list of ethical perspectives. username (str): The username of the user submitting the query.
- Raises:
Exception: Captures and logs any errors that occur during processing.
- async server.reset_failed_agent(model_id)[source]
Reset the status of a failed agent to “healthy” and clear its failure and retry counts.
- Args:
model_id (str): The unique identifier of the agent to reset.
- async server.root()
Handles the root endpoint of the server.
This asynchronous function returns a JSON response indicating that the Multi-Agent LLM Backend is operational.
- Returns:
dict: A dictionary containing a message confirming the server’s status.
- async server.update_agent_heartbeat(model_id)[source]
Update the last heartbeat time and status for a specific agent.
This function updates the last_heartbeat timestamp, sets the agent’s status to “healthy”, and marks the agent as having processed a request in the agent_health dictionary.
- Args:
- model_id (str): The unique identifier of the agent whose heartbeat
information is being updated.
- Raises:
KeyError: If the provided model_id is not found in the agent_health dictionary.
- server.update_aggregator(new_aggregator_id)[source]
Update the aggregator status in the MODELS dictionary. This function resets the ‘aggregator’ status for all models in the MODELS dictionary and sets the ‘aggregator’ status to True for the model with the specified new_aggregator_id.
- Args:
new_aggregator_id (str): The ID of the model to be set as the new aggregator.
- Returns:
str: The ID of the model that was set as the new aggregator.
client module
- client.create_env_template()[source]
Creates a .env file in the current working directory if it does not already exist.
The .env file contains default configuration settings for the OpenRouter API, including placeholders for the API key and backend URL.
Contents of the generated .env file: - OPENROUTER_API_KEY: Placeholder for the OpenRouter API key. - BACKEND_URL: Default backend URL set to http://localhost:8000.
This function ensures that the required environment configuration file is present for the application to function correctly.
- client.create_gradio_interface()[source]
Creates a Gradio interface for a Distributed Multi-Agent LLM System. This function initializes the Gradio application with multiple interactive components, including login/signup functionality, query submission, model response display, analysis visualizations, and interaction history management. It also handles user authentication, session management, and backend communication for processing queries and retrieving historical data.
- Returns:
gr.Blocks: The Gradio Blocks application instance.
- client.fill_query_and_type(selected_example, domain)[source]
Sends a POST request to the backend to fill in a query and determine the question type based on the provided example and domain.
- Args:
selected_example (str): The selected example to be processed. domain (str): The domain context for the query.
- Returns:
- tuple: A tuple containing:
query (str): The filled query string. Returns an empty string if an error occurs.
question_type (str): The type of the question. Defaults to “None” if not determined.
- client.get_example_choices(domain)[source]
Fetches example choices for a given domain from a backend service.
- Args:
- domain (str): The domain for which to fetch example choices.
If the domain is None or empty, an empty list of choices is returned.
- Returns:
- gr.update: An object containing the updated choices and value.
If successful, the choices are populated with examples fetched from the backend.
If the request fails or an error occurs, the choices are empty.
- Raises:
Logs errors if the backend request fails or an exception occurs during execution.
- client.init_client(retries: int = 3, delay: float = 1.5) bool[source]
Initializes the client by attempting to connect to the backend and fetching necessary data.
This function tries to establish a connection to the backend server and retrieve information such as model details, domains, question types, and examples. It retries the connection a specified number of times with a delay between attempts if the connection fails.
- Args:
retries (int): The maximum number of connection attempts. Defaults to 3. delay (float): The delay (in seconds) between consecutive connection attempts. Defaults to 1.5.
- Returns:
bool: True if the connection and data retrieval are successful, False otherwise.
- Raises:
None: All exceptions are caught and logged, and the function will return False if the connection fails after the specified number of retries.
- Global Variables:
model_info (dict): Stores information about models retrieved from the backend. domains (dict): Stores domain data retrieved from the backend. question_types (list): Stores question types retrieved from the backend. examples_by_domain (dict): Stores examples categorized by domain retrieved from the backend.
- client.main()[source]
The main function for the Multi-Agent LLM Client application. This function parses command-line arguments, updates the backend URL, creates a .env template, and launches a Gradio interface.
- client.process_query(query, api_key, question_type, domain, aggregator_id, ethical_views, session, progress=gradio.Progress)[source]
Processes a user query by sending it to a backend service, polling for the job status, and retrieving the results, including responses from models, analysis, and visualizations.
- Args:
query (str): The user query to be processed. api_key (str): The API key for authentication with the backend. question_type (str): The type of question being asked. domain (str): The domain or context of the query. aggregator_id (str): The ID of the aggregator model to be used. session (str): The session or username for tracking the request. progress (gr.Progress, optional): A progress tracker for updating the UI.
- Returns:
- list: Contains:
str: Aggregator model response or error message.
str: Responses from up to three non-aggregator models.
float: Consensus score.
Image or None: Visualizations (heatmap, emotion, polarity, radar).
gr.update: Updates for warnings and model labels.
- Raises:
Exception: For errors during query processing, including network or backend issues.
- client.update_aggregator(aggregator_id)[source]
Updates the aggregator model on the backend server.
This function sends a POST request to the backend server to update the aggregator model identified by the given aggregator_id. If the request is successful, the updated aggregator ID is returned. Otherwise, an error is logged, and None is returned.
- Args:
aggregator_id (str): The unique identifier of the aggregator to update.
- Returns:
str or None: The updated aggregator ID if the request is successful, or None if the request fails or an exception occurs.
- Raises:
None: Any exceptions are caught and logged.
- client.update_output_labels(aggregator_id)[source]
Updates the output labels for the aggregator and agent models based on the provided aggregator ID and the model information retrieved from the backend.
- Args:
aggregator_id (str): The ID of the aggregator whose labels need to be updated.
- Returns:
- dict: A dictionary containing updated labels for the aggregator and up to
three agent models. The keys correspond to the output components (e.g., output_aggregator, output_model1, output_model2, output_model3), and the values are gr.update objects with the new labels.
database_manager module
- class database_manager.DatabaseManager(db_path='/home/runner/work/Distributed-Multi-Agents/Distributed-Multi-Agents/src/../agent_history.db')[source]
Bases:
objectDatabaseManager is a class responsible for managing interactions with an SQLite database. It provides methods to initialize the database, store and retrieve data related to user interactions, responses, and analysis, as well as manage user accounts.
- Attributes:
db_path (str): The file path to the SQLite database.
- create_user(username: str, password: str) bool[source]
Creates a new user in the database with the given username and password.
- Args:
username (str): The username of the new user. password (str): The plaintext password of the new user.
- Returns:
bool: True if the user was successfully created. str: “duplicate” if the username already exists in the database. str: “error” if an unexpected error occurs during user creation.
- Raises:
None: All exceptions are handled internally and logged.
- delete_interaction(job_id: str, username: str) bool[source]
Deletes an interaction and its associated data from the database. This method removes entries from the responses, analysis, and interactions tables in the database for a given job ID, provided the specified username matches the owner of the interaction.
- Args:
job_id (str): The unique identifier of the job to be deleted. username (str): The username of the user attempting to delete the interaction.
- Returns:
- bool: True if the interaction and associated data were successfully deleted,
False if the username does not match the owner or an error occurs.
- Raises:
None: Any exceptions encountered are logged and handled internally.
- delete_user(username: str, password: str) bool[source]
Deletes a user from the database if the provided username and password match. The password is hashed using SHA-256.
- Args:
username (str): The username of the user to be deleted. password (str): The plaintext password of the user to be deleted.
- Returns:
bool: True if the user was successfully deleted, False otherwise.
- Raises:
Exception: Logs an error if an exception occurs during the deletion process.
- get_interaction(job_id: str) Dict[str, Any] | None[source]
Retrieves interaction details from the database for a given job ID.
- Args:
job_id (str): The unique identifier for the job whose interaction details are to be retrieved.
- Returns:
Optional[Dict[str, Any]]: A dictionary containing the interaction details if found, or None if no interaction exists for the given job ID.
- Raises:
Logs an error and returns None if an exception occurs during database operations.
- get_interaction_history(limit: int = 50) List[Dict[str, Any]][source]
Retrieves the interaction history from the database, including associated responses.
- Args:
limit (int): The maximum number of interactions to retrieve. Defaults to 50.
- Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents an interaction.
- get_user_history(username: str, limit: int = 50) List[Dict[str, Any]][source]
Retrieves the interaction history of a specific user from the database.
- Args:
username (str): The username of the user whose history is to be retrieved. limit (int, optional): The maximum number of interactions to retrieve. Defaults to 50.
- Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents an interaction.
- initialize_db()[source]
Initializes the database by creating necessary tables if they do not already exist. Tables created: - interactions: Stores interaction details such as job ID, query, domain, question type, timestamp, username, and roles. - responses: Stores agent responses linked to interactions, including job ID, agent ID, response text, aggregator status, and timestamp. - analysis: Stores analysis data for interactions, including consensus score, analysis data, and timestamp. - users: Stores user credentials with unique usernames and passwords. Logs a success message upon successful initialization or an error message if an exception occurs.
- save_analysis(job_id: str, consensus_score: float, analysis_data: Dict[str, Any]) bool[source]
Saves the analysis data for a specific job into the database.
- Args:
job_id (str): The unique identifier for the job. consensus_score (float): The consensus score associated with the analysis. analysis_data (Dict[str, Any]): A dictionary containing the analysis data to be saved.
- Returns:
bool: True if the analysis data was successfully saved, False otherwise.
- Raises:
Exception: Logs any exception that occurs during the database operation.
- save_interaction(job_id: str, query: str, domain: str, question_type: str, username: str, roles: str | None = '') bool[source]
Saves an interaction record to the database.
- Args:
job_id (str): The unique identifier for the job associated with the interaction. query (str): The query or input provided by the user. domain (str): The domain or category of the interaction. question_type (str): The type of question being asked. username (str): The username of the individual initiating the interaction. roles (Optional[str]): Additional roles or metadata associated with the interaction. Defaults to an empty string.
- Returns:
bool: True if the interaction was successfully saved, False otherwise.
- Raises:
Logs an error message if an exception occurs during the database operation.
- save_responses(job_id: str, responses: Dict[str, str], aggregator_id: str | None = None) bool[source]
Saves agent responses to the database for a specific job.
- Args:
job_id (str): The unique identifier for the job. responses (Dict[str, str]): A dictionary mapping agent IDs to their responses. aggregator_id (Optional[str]): The ID of the aggregator agent, if any. Defaults to None.
- Returns:
bool: True if the responses were successfully saved, False otherwise.
- Raises:
Exception: Logs an error message if an exception occurs during the database operation.
- verify_user(username: str, password: str) bool[source]
Verifies the credentials of a user by checking the provided username and password against the stored records in the database.
- Args:
username (str): The username of the user to verify. password (str): The plaintext password of the user to verify.
- Returns:
- bool: True if the username and hashed password match a record in the database,
False otherwise.
- Raises:
Exception: Logs an error message if an exception occurs during the verification process.
load_balancer module
- load_balancer.consistent_hash(job_id)[source]
Generates a consistent hash value for a given job ID.
This function uses the MD5 hashing algorithm to compute a hash for the provided job ID string. The resulting hash is then converted to an integer in base 16.
- Args:
job_id (str): The unique identifier for the job to be hashed.
- Returns:
int: A consistent integer hash value derived from the job ID.
- async load_balancer.proxy(path: str, request: fastapi.Request)
Smart proxy: - Sticky routing for job_id-specific paths (status/result/image) - Round-robin with retry logic for everything else (2-fault tolerant)