import os import sqlite3 import streamlit as st from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime from langchain_groq import ChatGroq from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.document_loaders.csv_loader import CSVLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.vectorstores import InMemoryVectorStore from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.tools import Tool from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_core.messages import HumanMessage, AIMessage from langchain.docstore.document import Document # --- Database Setup --- @st.cache_resource def init_db(): conn = sqlite3.connect('users.db', check_same_thread=False) c = conn.cursor() # Users table c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, previous_chat_history TEXT, previous_products_bought TEXT)''') # Company settings table c.execute('''CREATE TABLE IF NOT EXISTS company_settings (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, business TEXT NOT NULL, agent_name TEXT NOT NULL, key_features TEXT NOT NULL)''') # Products table with inventory c.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT NOT NULL, price REAL NOT NULL, description TEXT NOT NULL, features TEXT NOT NULL, stock INTEGER NOT NULL DEFAULT 0)''') # Inventory log table c.execute('''CREATE TABLE IF NOT EXISTS inventory_log (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, product_name TEXT NOT NULL, found INTEGER NOT NULL, username TEXT)''') # Insert default company settings if empty c.execute('SELECT COUNT(*) FROM company_settings') if c.fetchone()[0] == 0: c.execute('''INSERT INTO company_settings (name, business, agent_name, key_features) VALUES (?, ?, ?, ?)''', ('TechElectronics', 'Consumer Electronics Retailer', 'Alex', 'Cutting-edge technology, Competitive pricing, Excellent customer service')) conn.commit() return conn conn = init_db() # --- Inventory Management Functions --- def log_inventory_access(product_name: str, found: bool, username: str = None): """Log all inventory access attempts""" c = conn.cursor() c.execute('''INSERT INTO inventory_log (timestamp, product_name, found, username) VALUES (?, ?, ?, ?)''', (datetime.now().isoformat(), product_name, int(found), username)) conn.commit() def validate_product_in_inventory(product_name: str) -> bool: """Check if product exists and is in stock""" c = conn.cursor() c.execute('''SELECT id FROM products WHERE LOWER(name) LIKE LOWER(?) AND stock > 0''', (f'%{product_name}%',)) return c.fetchone() is not None def get_inventory_report(): """Generate inventory report for admin""" c = conn.cursor() c.execute('''SELECT name, category, price, stock FROM products ORDER BY category, name''') return c.fetchall() def get_inventory_alerts(): """Get low stock alerts""" c = conn.cursor() c.execute('''SELECT name, stock FROM products WHERE stock <= 3 AND stock > 0 ORDER BY stock ASC''') return c.fetchall() # --- Admin Classes --- class Company: @staticmethod def get_settings(): c = conn.cursor() c.execute('SELECT * FROM company_settings LIMIT 1') return c.fetchone() @staticmethod def update_settings(name, business, agent_name, key_features): c = conn.cursor() c.execute('''UPDATE company_settings SET name=?, business=?, agent_name=?, key_features=? WHERE id=1''', (name, business, agent_name, key_features)) conn.commit() class Product: @staticmethod def get_all(): c = conn.cursor() c.execute('SELECT * FROM products') return c.fetchall() @staticmethod def search(query): c = conn.cursor() c.execute('''SELECT * FROM products WHERE LOWER(name) LIKE LOWER(?) OR LOWER(category) LIKE LOWER(?)''', (f'%{query}%', f'%{query}%')) return c.fetchall() @staticmethod def get_by_id(product_id): c = conn.cursor() c.execute('SELECT * FROM products WHERE id = ?', (product_id,)) return c.fetchone() @staticmethod def add(name, category, price, description, features, stock): c = conn.cursor() c.execute('''INSERT INTO products (name, category, price, description, features, stock) VALUES (?, ?, ?, ?, ?, ?)''', (name, category, price, description, features, stock)) conn.commit() @staticmethod def delete(product_id): c = conn.cursor() c.execute('DELETE FROM products WHERE id=?', (product_id,)) conn.commit() @staticmethod def update_stock(product_id, new_stock): c = conn.cursor() c.execute('UPDATE products SET stock=? WHERE id=?', (new_stock, product_id)) conn.commit() # --- User Class --- class User: def __init__(self, id, username, password, chat_history=None, products_bought=None): self.id = id self.username = username self.password = password self.chat_history = chat_history or [] self.products_bought = products_bought or [] @classmethod def create(cls, username, password): hashed_pw = generate_password_hash(password) c = conn.cursor() c.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, hashed_pw)) user_id = c.lastrowid conn.commit() return cls(user_id, username, hashed_pw) @classmethod def get_by_username(cls, username): c = conn.cursor() c.execute('SELECT * FROM users WHERE username = ?', (username,)) user = c.fetchone() if user: return cls(user[0], user[1], user[2], eval(user[3]) if user[3] else [], eval(user[4]) if user[4] else []) return None def update_chat_history(self, new_messages): updated_history = self.chat_history + new_messages c = conn.cursor() c.execute('UPDATE users SET previous_chat_history = ? WHERE id = ?', (str(updated_history), self.id)) conn.commit() self.chat_history = updated_history def update_products_bought(self, new_products): updated_products = self.products_bought + new_products c = conn.cursor() c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', (str(updated_products), self.id)) conn.commit() self.products_bought = updated_products # --- AI Setup --- os.environ["GROQ_API_KEY"] = st.secrets["GROQ_API_KEY"] llm = ChatGroq( temperature=0.1, model_name="llama3-8b-8192", api_key=st.secrets["GROQ_API_KEY"], ) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") @st.cache_resource(show_spinner=False) def load_data(): products = Product.get_all() docs = [] for p in products: content = f"Name: {p[1]}\nCategory: {p[2]}\nPrice: {p[3]}\nDescription: {p[4]}\nFeatures: {p[5]}\nStock: {p[6]}" metadata = {"id": p[0], "name": p[1], "category": p[2], "price": p[3], "stock": p[6]} docs.append(Document(page_content=content, metadata=metadata)) text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=20) splits = text_splitter.split_documents(docs) vectorstore = InMemoryVectorStore.from_documents(documents=splits, embedding=embeddings) return vectorstore.as_retriever() retriever = load_data() def retrieve_query(query: str): """Enhanced product retrieval with strict inventory checks""" docs = retriever.get_relevant_documents(query) valid_docs = [doc for doc in docs if doc.metadata.get('stock', 0) > 0] if not valid_docs: return [Document( page_content=f"No matching in-stock products found for '{query}'", metadata={"id": -1, "stock": 0} )] st.session_state.last_retrieved_docs = valid_docs return valid_docs tool = Tool( name="product_retriever", func=retrieve_query, description="Useful for retrieving product information from CURRENT INVENTORY only" ) # --- Admin Dashboard --- def admin_dashboard(): st.header("Admin Dashboard") with st.expander("Company Settings"): current_settings = Company.get_settings() with st.form("Company Settings Form"): name = st.text_input("Company Name", value=current_settings[1]) business = st.text_input("Business", value=current_settings[2]) agent_name = st.text_input("Agent Name", value=current_settings[3]) key_features = st.text_area("Key Features", value=current_settings[4]) if st.form_submit_button("Update Settings"): Company.update_settings(name, business, agent_name, key_features) st.success("Settings updated!") st.rerun() with st.expander("Product Management"): with st.form("Add Product"): st.subheader("Add New Product") name = st.text_input("Product Name") category = st.text_input("Category") price = st.number_input("Price", min_value=0.0) description = st.text_area("Description") features = st.text_area("Features") stock = st.number_input("Initial Stock", min_value=0, value=0) if st.form_submit_button("Add Product"): Product.add(name, category, price, description, features, stock) st.success("Product added!") load_data.clear() st.rerun() st.subheader("Manage Inventory") products = Product.get_all() if products: for p in products: cols = st.columns([3,2,2,1]) cols[0].write(f"**{p[1]}** ({p[2]})") cols[1].write(f"Price: ${p[3]}") new_stock = cols[2].number_input( "Stock", min_value=0, value=p[6], key=f"stock_{p[0]}" ) if new_stock != p[6]: Product.update_stock(p[0], new_stock) load_data.clear() st.rerun() if cols[3].button("❌", key=f"del_{p[0]}"): Product.delete(p[0]) load_data.clear() st.rerun() else: st.info("No products found in database") with st.expander("Inventory Reports"): st.subheader("Current Inventory Status") inventory = get_inventory_report() if inventory: st.table(inventory) else: st.warning("No inventory data available") st.subheader("Low Stock Alerts") alerts = get_inventory_alerts() if alerts: for item in alerts: st.warning(f"{item[0]} - Only {item[1]} left!") else: st.info("No low stock alerts") st.subheader("Inventory Access Logs") c = conn.cursor() c.execute('''SELECT timestamp, product_name, found, username FROM inventory_log ORDER BY timestamp DESC LIMIT 100''') logs = c.fetchall() if logs: st.table(logs) else: st.info("No inventory access logs yet") # --- Main App --- def main(): company_settings = Company.get_settings() company_name = company_settings[1] st.title(f"{company_name} AI Sales Assistant 🤖") if 'user' not in st.session_state: st.session_state.user = None st.session_state.chat_history = [] st.session_state.admin_mode = False st.session_state.last_retrieved_docs = [] # Authentication if not st.session_state.user and not st.session_state.admin_mode: st.header("Login/Register") tab1, tab2, tab3 = st.tabs(["Login", "Register", "Admin"]) with tab1: with st.form("Login"): username = st.text_input("Username") password = st.text_input("Password", type="password") if st.form_submit_button("Login"): user = User.get_by_username(username) if user and check_password_hash(user.password, password): st.session_state.user = user st.session_state.chat_history = user.chat_history st.rerun() else: st.error("Invalid credentials") with tab2: with st.form("Register"): new_user = st.text_input("New Username") new_pass = st.text_input("New Password", type="password") if st.form_submit_button("Register"): if User.get_by_username(new_user): st.error("Username already exists") else: user = User.create(new_user, new_pass) st.session_state.user = user st.session_state.chat_history = [] st.rerun() with tab3: with st.form("Admin Login"): admin_pin = st.text_input("Admin PIN", type="password") if st.form_submit_button("Admin Login"): if admin_pin == st.secrets["ADMIN_PIN"]: st.session_state.admin_mode = True st.rerun() else: st.error("Invalid Admin PIN") elif st.session_state.admin_mode: admin_dashboard() if st.button("Exit Admin Mode"): st.session_state.admin_mode = False st.rerun() else: # Chat Interface st.header(f"Welcome, {st.session_state.user.username}!") st.caption(f"You're chatting with {company_settings[3]}, your AI sales assistant") # Display Chat History for msg in st.session_state.chat_history: if msg["type"] == "human": with st.chat_message("user"): st.write(msg["content"]) else: with st.chat_message("assistant"): st.write(msg["content"]) # Enhanced System Prompt company_settings = Company.get_settings() current_products = "\n".join([f"- {p[1]} (${p[3]}, Stock: {p[6]})" for p in Product.get_all()]) system_prompt = f""" You are {company_settings[3]}, the AI Sales Assistant for {company_settings[1]} ({company_settings[2]}). Your primary role is to assist customers with product inquiries, make appropriate recommendations, and facilitate purchases while strictly adhering to company policies. ## Company Profile - Company Name: {company_settings[1]} - Business Type: {company_settings[2]} - Key Features: {company_settings[4]} - Agent Name: {company_settings[3]} CURRENT INVENTORY: {current_products} STRICT INVENTORY POLICY: - You MUST ONLY recommend products that exist in our CURRENT INVENTORY - NEVER suggest, mention, or describe products not in our database - If asked for unavailable items, respond: "I apologize, we don't currently carry that product." When recommending products: 1. FIRST verify the product exists in the above inventory 2. ONLY discuss products that appear in the inventory list 3. For unavailable items, DO NOT suggest alternatives unless they exist in inventory ## Inventory Management Policy 1. **Stock Verification**: - ALWAYS check current stock before recommending any product - Never suggest out-of-stock items (stock = 0) - For low stock items (stock ≤ 3), mention: "Only [X] left in stock!" 2. **Product Recommendations**: - Only recommend products from our current inventory - If asked for unavailable items, respond with: "I apologize, we don't currently carry that item. As a {company_settings[2]}, we specialize in [relevant products]. May I suggest [alternative]?" - When suggesting alternatives, ensure they're in stock 3. **Purchase Process**: - Confirm product availability before purchase - Generate payment link only after stock verification - Update inventory immediately after successful purchase ## Conversation Flow Examples ### 1. Greeting & Need Assessment User: "Hi, I need a new laptop" You: "Hello! I'd be happy to help you find the perfect laptop. Could you tell me what you'll primarily be using it for and your budget range?" ### 2. Product Recommendation (In-Stock) User: "I need a gaming laptop under $1500" You: "We have the XYZ Gaming Laptop available for $1399 (3 in stock). It features [key specs]. Would you like more details?" ### 3. Product Recommendation (Out-of-Stock) User: "Do you have ABC Smartphone?" You: "I apologize, the ABC Smartphone is currently out of stock. However, we have the DEF Smartphone with similar features available for $599 (5 in stock). Would you like me to tell you more about it?" ### 4. Handling Off-Topic Requests User: "Do you sell snacks?" You: "I apologize, as a {company_settings[2]}, we specialize in [tech products]. We don't carry food items. Is there a tech product I can assist you with today?" ### 5. Purchase Process User: "I want to buy the XYZ Laptop" You: "Great choice! The XYZ Laptop is available for $1399 (2 in stock). I can generate a secure payment link for you. Would you like to proceed with the purchase? [https://www.example.com/payment]" ## Communication Guidelines - Tone: Professional yet friendly (like a knowledgeable salesperson) - Language: Clear, concise, avoid technical jargon unless requested - Emojis: Use sparingly (1-2 per message max) - Branding: Consistently reference {company_settings[1]} when appropriate ## Important Reminders 1. NEVER recommend products not in our inventory 2. ALWAYS verify stock before discussing any product 3. Update inventory immediately after purchases 4. Politely redirect off-topic requests to our product offerings 5. For complex queries, offer to connect with human support """ prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) tools = [tool] agent = create_tool_calling_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) if prompt_input := st.chat_input("Type your message here..."): # Log inventory access attempt log_inventory_access( product_name=prompt_input, found=validate_product_in_inventory(prompt_input), username=st.session_state.user.username ) with st.chat_message("user"): st.write(prompt_input) with st.chat_message("assistant"): response = agent_executor.invoke({ "input": prompt_input, "chat_history": [HumanMessage(content=msg["content"]) if msg["type"] == "human" else AIMessage(content=msg["content"]) for msg in st.session_state.chat_history] })["output"] # Post-process response to ensure inventory compliance if "last_retrieved_docs" in st.session_state: if st.session_state.last_retrieved_docs[0].metadata.get("id") == -1: response = f"I apologize, we don't currently carry that product in our inventory." elif not st.session_state.last_retrieved_docs: response = f"I couldn't find that product in our current inventory." st.write(response) # Handle inventory update on purchase if "https://www.example.com/payment" in response: product_name = None if "proceed with the purchase of" in response: start = response.find("proceed with the purchase of") + len("proceed with the purchase of") end = response.find("?", start) product_name = response[start:end].strip() if product_name: docs = retrieve_query(product_name) if docs and docs[0].metadata.get("id") != -1: product_doc = docs[0] product_id = product_doc.metadata.get("id") current_stock = product_doc.metadata.get("stock") if product_id and current_stock > 0: try: # Verify current stock c = conn.cursor() c.execute('SELECT stock FROM products WHERE id = ?', (product_id,)) actual_stock = c.fetchone()[0] if actual_stock > 0: # Update product stock c.execute('UPDATE products SET stock = ? WHERE id = ?', (actual_stock - 1, product_id)) # Update user's purchase history st.session_state.user.products_bought.append(product_name) st.session_state.user.update_products_bought([product_name]) conn.commit() st.success(f"Successfully purchased {product_name}! Stock updated.") load_data.clear() # Check for low stock if (actual_stock - 1) <= 3: st.warning(f"Low stock alert: Only {actual_stock - 1} units remaining!") else: st.error(f"Sorry, {product_name} is now out of stock!") except Exception as e: st.error(f"Error processing purchase: {str(e)}") conn.rollback() finally: c.close() new_messages = [ {"type": "human", "content": prompt_input}, {"type": "ai", "content": response} ] st.session_state.user.update_chat_history(new_messages) st.session_state.chat_history += new_messages if __name__ == "__main__": main()