Source code for database_manager

import os
import sqlite3
import json
import time
import logging
import hashlib
from typing import Dict, List, Any, Optional

logger = logging.getLogger(__name__)

[docs] class DatabaseManager: """ DatabaseManager is a class responsible for managing interactions with an SQLite database. It provides methods to initialize the database, store and retrieve data related to user interactions, responses, and analysis, as well as manage user accounts. Attributes: db_path (str): The file path to the SQLite database. """ def __init__(self, db_path = os.path.join(os.path.dirname(__file__), "../agent_history.db")): """ Initializes the DatabaseManager instance. Args: db_path (str): The file path to the database. Defaults to a path pointing to "../agent_history.db" relative to the current file. Attributes: db_path (str): The file path to the database. Calls: initialize_db(): Sets up the database if it does not already exist. """ self.db_path = db_path self.initialize_db()
[docs] def initialize_db(self): """ Initializes the database by creating necessary tables if they do not already exist. Tables created: - interactions: Stores interaction details such as job ID, query, domain, question type, timestamp, username, and roles. - responses: Stores agent responses linked to interactions, including job ID, agent ID, response text, aggregator status, and timestamp. - analysis: Stores analysis data for interactions, including consensus score, analysis data, and timestamp. - users: Stores user credentials with unique usernames and passwords. Logs a success message upon successful initialization or an error message if an exception occurs. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create interactions table cursor.execute(''' CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT NOT NULL, query TEXT NOT NULL, domain TEXT, question_type TEXT, timestamp INTEGER NOT NULL, username TEXT NOT NULL, roles TEXT ) ''') # Create responses table cursor.execute(''' CREATE TABLE IF NOT EXISTS responses ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT NOT NULL, agent_id TEXT NOT NULL, response TEXT NOT NULL, is_aggregator BOOLEAN NOT NULL, timestamp INTEGER NOT NULL, FOREIGN KEY (job_id) REFERENCES interactions(job_id) ) ''') # Create analysis table cursor.execute(''' CREATE TABLE IF NOT EXISTS analysis ( job_id TEXT PRIMARY KEY, consensus_score REAL, analysis_data TEXT, timestamp INTEGER NOT NULL, FOREIGN KEY (job_id) REFERENCES interactions(job_id) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL ) ''') conn.commit() conn.close() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Error initializing database: {str(e)}")
[docs] def get_user_history(self, username: str, limit: int = 50) -> List[Dict[str, Any]]: """ Retrieves the interaction history of a specific user from the database. Args: username (str): The username of the user whose history is to be retrieved. limit (int, optional): The maximum number of interactions to retrieve. Defaults to 50. Returns: List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents an interaction. """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT i.job_id, i.query, i.domain, i.question_type, i.timestamp, i.roles, a.consensus_score FROM interactions i LEFT JOIN analysis a ON i.job_id = a.job_id WHERE i.username = ? ORDER BY i.timestamp DESC LIMIT ? """, (username, limit)) interactions = [] for row in cursor.fetchall(): interaction = dict(row) resp_cursor = conn.cursor() resp_cursor.execute( "SELECT agent_id, response, is_aggregator FROM responses WHERE job_id = ?", (interaction['job_id'],) ) responses = {r['agent_id']: r['response'] for r in resp_cursor.fetchall()} interaction['responses'] = responses interactions.append(interaction) conn.close() return interactions except Exception as e: logger.error(f"Error getting user history: {str(e)}") return []
[docs] def save_interaction(self, job_id: str, query: str, domain: str, question_type: str, username: str, roles: Optional[str] = "") -> bool: """ Saves an interaction record to the database. Args: job_id (str): The unique identifier for the job associated with the interaction. query (str): The query or input provided by the user. domain (str): The domain or category of the interaction. question_type (str): The type of question being asked. username (str): The username of the individual initiating the interaction. roles (Optional[str]): Additional roles or metadata associated with the interaction. Defaults to an empty string. Returns: bool: True if the interaction was successfully saved, False otherwise. Raises: Logs an error message if an exception occurs during the database operation. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() timestamp = int(time.time()) cursor.execute( "INSERT INTO interactions (job_id, query, domain, question_type, timestamp, username, roles) VALUES (?, ?, ?, ?, ?, ?, ?)", (job_id, query, domain, question_type, timestamp, username, roles) ) conn.commit() conn.close() return True except Exception as e: logger.error(f"Error saving interaction: {str(e)}") return False
[docs] def save_responses(self, job_id: str, responses: Dict[str, str], aggregator_id: Optional[str] = None) -> bool: """ Saves agent responses to the database for a specific job. Args: job_id (str): The unique identifier for the job. responses (Dict[str, str]): A dictionary mapping agent IDs to their responses. aggregator_id (Optional[str]): The ID of the aggregator agent, if any. Defaults to None. Returns: bool: True if the responses were successfully saved, False otherwise. Raises: Exception: Logs an error message if an exception occurs during the database operation. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() timestamp = int(time.time()) for agent_id, response in responses.items(): is_aggregator = agent_id == aggregator_id cursor.execute( "INSERT INTO responses (job_id, agent_id, response, is_aggregator, timestamp) VALUES (?, ?, ?, ?, ?)", (job_id, agent_id, response, is_aggregator, timestamp) ) conn.commit() conn.close() return True except Exception as e: logger.error(f"Error saving responses: {str(e)}") return False
[docs] def save_analysis(self, job_id: str, consensus_score: float, analysis_data: Dict[str, Any]) -> bool: """ Saves the analysis data for a specific job into the database. Args: job_id (str): The unique identifier for the job. consensus_score (float): The consensus score associated with the analysis. analysis_data (Dict[str, Any]): A dictionary containing the analysis data to be saved. Returns: bool: True if the analysis data was successfully saved, False otherwise. Raises: Exception: Logs any exception that occurs during the database operation. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() timestamp = int(time.time()) # Convert analysis data to JSON analysis_json = json.dumps(analysis_data) cursor.execute( "INSERT INTO analysis (job_id, consensus_score, analysis_data, timestamp) VALUES (?, ?, ?, ?)", (job_id, consensus_score, analysis_json, timestamp) ) conn.commit() conn.close() return True except Exception as e: logger.error(f"Error saving analysis: {str(e)}") return False
[docs] def get_interaction_history(self, limit: int = 50) -> List[Dict[str, Any]]: """ Retrieves the interaction history from the database, including associated responses. Args: limit (int): The maximum number of interactions to retrieve. Defaults to 50. Returns: List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents an interaction. """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT i.job_id, i.query, i.domain, i.question_type, i.timestamp, i.roles, a.consensus_score FROM interactions i LEFT JOIN analysis a ON i.job_id = a.job_id ORDER BY i.timestamp DESC LIMIT ? """, (limit,)) interactions = [] for row in cursor.fetchall(): interaction = dict(row) # Get responses for this interaction resp_cursor = conn.cursor() resp_cursor.execute( "SELECT agent_id, response, is_aggregator FROM responses WHERE job_id = ?", (interaction['job_id'],) ) responses = {} for resp_row in resp_cursor.fetchall(): responses[resp_row['agent_id']] = resp_row['response'] interaction['responses'] = responses interactions.append(interaction) conn.close() return interactions except Exception as e: logger.error(f"Error getting interaction history: {str(e)}") return []
[docs] def get_interaction(self, job_id: str) -> Optional[Dict[str, Any]]: """ Retrieves interaction details from the database for a given job ID. Args: job_id (str): The unique identifier for the job whose interaction details are to be retrieved. Returns: Optional[Dict[str, Any]]: A dictionary containing the interaction details if found, or None if no interaction exists for the given job ID. Raises: Logs an error and returns None if an exception occurs during database operations. """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT * FROM interactions WHERE job_id = ?", (job_id,) ) row = cursor.fetchone() if not row: conn.close() return None interaction = dict(row) # Get responses resp_cursor = conn.cursor() resp_cursor.execute( "SELECT agent_id, response, is_aggregator FROM responses WHERE job_id = ?", (job_id,) ) responses = {} for resp_row in resp_cursor.fetchall(): responses[resp_row['agent_id']] = { 'response': resp_row['response'], 'is_aggregator': bool(resp_row['is_aggregator']) } interaction['responses'] = responses # Get analysis analysis_cursor = conn.cursor() analysis_cursor.execute( "SELECT consensus_score, analysis_data FROM analysis WHERE job_id = ?", (job_id,) ) analysis_row = analysis_cursor.fetchone() if analysis_row: interaction['consensus_score'] = analysis_row['consensus_score'] interaction['analysis'] = json.loads(analysis_row['analysis_data']) conn.close() return interaction except Exception as e: logger.error(f"Error getting interaction: {str(e)}") return None
[docs] def delete_interaction(self, job_id: str, username: str) -> bool: """ Deletes an interaction and its associated data from the database. This method removes entries from the `responses`, `analysis`, and `interactions` tables in the database for a given job ID, provided the specified username matches the owner of the interaction. Args: job_id (str): The unique identifier of the job to be deleted. username (str): The username of the user attempting to delete the interaction. Returns: bool: True if the interaction and associated data were successfully deleted, False if the username does not match the owner or an error occurs. Raises: None: Any exceptions encountered are logged and handled internally. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check ownership first cursor.execute("SELECT username FROM interactions WHERE job_id = ?", (job_id,)) row = cursor.fetchone() if not row or row[0] != username: conn.close() return False # Delete from responses table first (foreign key constraint) cursor.execute("DELETE FROM responses WHERE job_id = ?", (job_id,)) # Delete from analysis table cursor.execute("DELETE FROM analysis WHERE job_id = ?", (job_id,)) # Delete from interactions table cursor.execute("DELETE FROM interactions WHERE job_id = ?", (job_id,)) conn.commit() conn.close() return True except Exception as e: logger.error(f"Error deleting interaction: {str(e)}") return False
[docs] def create_user(self, username: str, password: str) -> bool: """ Creates a new user in the database with the given username and password. Args: username (str): The username of the new user. password (str): The plaintext password of the new user. Returns: bool: True if the user was successfully created. str: "duplicate" if the username already exists in the database. str: "error" if an unexpected error occurs during user creation. Raises: None: All exceptions are handled internally and logged. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() hashed_password = hashlib.sha256(password.encode()).hexdigest() cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_password)) conn.commit() conn.close() return True except sqlite3.IntegrityError: logger.warning("Attempted to create duplicate user.") return "duplicate" except Exception as e: logger.error(f"Error creating user: {str(e)}") return "error"
[docs] def verify_user(self, username: str, password: str) -> bool: """ Verifies the credentials of a user by checking the provided username and password against the stored records in the database. Args: username (str): The username of the user to verify. password (str): The plaintext password of the user to verify. Returns: bool: True if the username and hashed password match a record in the database, False otherwise. Raises: Exception: Logs an error message if an exception occurs during the verification process. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() hashed_password = hashlib.sha256(password.encode()).hexdigest() cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, hashed_password)) user = cursor.fetchone() conn.close() return user is not None except Exception as e: logger.error(f"Error verifying user: {str(e)}") return False
[docs] def delete_user(self, username: str, password: str) -> bool: """ Deletes a user from the database if the provided username and password match. The password is hashed using SHA-256. Args: username (str): The username of the user to be deleted. password (str): The plaintext password of the user to be deleted. Returns: bool: True if the user was successfully deleted, False otherwise. Raises: Exception: Logs an error if an exception occurs during the deletion process. """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() hashed_password = hashlib.sha256(password.encode()).hexdigest() cursor.execute("DELETE FROM users WHERE username = ? AND password = ?", (username, hashed_password)) conn.commit() conn.close() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error deleting user: {str(e)}") return False