import os import sqlite3 import streamlit as st from werkzeug.security import generate_password_hash, check_password_hash from langchain_groq import ChatGroq from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.document_loaders.csv_loader import CSVLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.vectorstores import InMemoryVectorStore from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.tools import Tool from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_core.messages import HumanMessage, AIMessage from langchain.docstore.document import Document # --- Database Setup --- @st.cache_resource def init_db(): conn = sqlite3.connect('users.db', check_same_thread=False) c = conn.cursor() # Users table c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, previous_chat_history TEXT, previous_products_bought TEXT)''') # Company settings table c.execute('''CREATE TABLE IF NOT EXISTS company_settings (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, business TEXT NOT NULL, agent_name TEXT NOT NULL, key_features TEXT NOT NULL)''') # Products table with inventory c.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT NOT NULL, price REAL NOT NULL, description TEXT NOT NULL, features TEXT NOT NULL, stock INTEGER NOT NULL DEFAULT 0)''') # Check and update schema if needed c.execute("PRAGMA table_info(products)") columns = [column[1] for column in c.fetchall()] if 'stock' not in columns: c.execute('ALTER TABLE products ADD COLUMN stock INTEGER NOT NULL DEFAULT 0') # Insert default company settings if empty c.execute('SELECT COUNT(*) FROM company_settings') if c.fetchone()[0] == 0: c.execute('''INSERT INTO company_settings (name, business, agent_name, key_features) VALUES (?, ?, ?, ?)''', ('TechElectronics', 'Consumer Electronics Retailer', 'Alex', 'Cutting-edge technology, Competitive pricing, Excellent customer service')) conn.commit() return conn conn = init_db() # --- Admin Classes --- class Company: @staticmethod def get_settings(): c = conn.cursor() c.execute('SELECT * FROM company_settings LIMIT 1') return c.fetchone() @staticmethod def update_settings(name, business, agent_name, key_features): c = conn.cursor() c.execute('''UPDATE company_settings SET name=?, business=?, agent_name=?, key_features=? WHERE id=1''', (name, business, agent_name, key_features)) conn.commit() class Product: @staticmethod def get_all(): c = conn.cursor() c.execute('SELECT * FROM products') return c.fetchall() @staticmethod def get_by_id(product_id): c = conn.cursor() c.execute('SELECT * FROM products WHERE id = ?', (product_id,)) return c.fetchone() @staticmethod def add(name, category, price, description, features, stock): c = conn.cursor() c.execute('''INSERT INTO products (name, category, price, description, features, stock) VALUES (?, ?, ?, ?, ?, ?)''', (name, category, price, description, features, stock)) conn.commit() @staticmethod def delete(product_id): c = conn.cursor() c.execute('DELETE FROM products WHERE id=?', (product_id,)) conn.commit() @staticmethod def update_stock(product_id, new_stock): c = conn.cursor() c.execute('UPDATE products SET stock=? WHERE id=?', (new_stock, product_id)) conn.commit() # --- User Class --- class User: def __init__(self, id, username, password, chat_history=None, products_bought=None): self.id = id self.username = username self.password = password self.chat_history = chat_history or [] self.products_bought = products_bought or [] @classmethod def create(cls, username, password): hashed_pw = generate_password_hash(password) conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, hashed_pw)) user_id = c.lastrowid conn.commit() conn.close() return cls(user_id, username, hashed_pw) @classmethod def get_by_username(cls, username): conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('SELECT * FROM users WHERE username = ?', (username,)) user = c.fetchone() conn.close() if user: return cls(user[0], user[1], user[2], eval(user[3]) if user[3] else [], eval(user[4]) if user[4] else []) return None def update_chat_history(self, new_messages): updated_history = self.chat_history + new_messages conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('UPDATE users SET previous_chat_history = ? WHERE id = ?', (str(updated_history), self.id)) conn.commit() conn.close() self.chat_history = updated_history # Update in-memory def update_products_bought(self, new_products): updated_products = self.products_bought + new_products conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', (str(updated_products), self.id)) conn.commit() conn.close() self.products_bought = updated_products # Update in-memory # --- AI Setup --- os.environ["GROQ_API_KEY"] = st.secrets["GROQ_API_KEY"] llm = ChatGroq( temperature=0.1, model_name="llama3-8b-8192", api_key=st.secrets["GROQ_API_KEY"], ) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") @st.cache_resource(show_spinner=False) def load_data(): products = Product.get_all() docs = [] for p in products: content = f"Name: {p[1]}\nCategory: {p[2]}\nPrice: {p[3]}\nDescription: {p[4]}\nFeatures: {p[5]}\nStock: {p[6]}" metadata = {"id": p[0], "name": p[1], "category": p[2], "price": p[3], "stock": p[6]} docs.append(Document(page_content=content, metadata=metadata)) text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=20) splits = text_splitter.split_documents(docs) vectorstore = InMemoryVectorStore.from_documents(documents=splits, embedding=embeddings) return vectorstore.as_retriever() retriever = load_data() def retrieve_query(query: str): docs = retriever.get_relevant_documents(query) st.session_state.last_retrieved_docs = docs return docs tool = Tool( name="product_retriever", func=retrieve_query, description="Useful for retrieving product information including current stock levels" ) # --- Admin Dashboard --- def admin_dashboard(): st.header("Admin Dashboard") with st.expander("Company Settings"): current_settings = Company.get_settings() with st.form("Company Settings Form"): name = st.text_input("Company Name", value=current_settings[1]) business = st.text_input("Business", value=current_settings[2]) agent_name = st.text_input("Agent Name", value=current_settings[3]) key_features = st.text_area("Key Features", value=current_settings[4]) if st.form_submit_button("Update Settings"): Company.update_settings(name, business, agent_name, key_features) st.success("Settings updated!") st.rerun() with st.expander("Product Management"): with st.form("Add Product"): st.subheader("Add New Product") name = st.text_input("Product Name") category = st.text_input("Category") price = st.number_input("Price", min_value=0.0) description = st.text_area("Description") features = st.text_area("Features") stock = st.number_input("Initial Stock", min_value=0, value=0) if st.form_submit_button("Add Product"): Product.add(name, category, price, description, features, stock) st.success("Product added!") load_data.clear() st.rerun() st.subheader("Manage Inventory") products = Product.get_all() if products: for p in products: cols = st.columns([3,2,2,1]) cols[0].write(f"**{p[1]}** ({p[2]})") cols[1].write(f"Price: ${p[3]}") new_stock = cols[2].number_input( "Stock", min_value=0, value=p[6], key=f"stock_{p[0]}" ) if new_stock != p[6]: Product.update_stock(p[0], new_stock) load_data.clear() st.rerun() if cols[3].button("❌", key=f"del_{p[0]}"): Product.delete(p[0]) load_data.clear() st.rerun() else: st.info("No products found in database") # --- Main App --- def main(): company_settings = Company.get_settings() company_name = company_settings[1] st.title("AI Sales Assistant 🤖") if 'user' not in st.session_state: st.session_state.user = None st.session_state.chat_history = [] st.session_state.admin_mode = False st.session_state.last_retrieved_docs = [] # Authentication if not st.session_state.user and not st.session_state.admin_mode: st.header("Login/Register Admin") tab1, tab2, tab3 = st.tabs(["Login", "Register", "Admin"]) with tab1: with st.form("Login"): username = st.text_input("Username") password = st.text_input("Password", type="password") if st.form_submit_button("Login"): user = User.get_by_username(username) if user and check_password_hash(user.password, password): st.session_state.user = user st.session_state.chat_history = user.chat_history st.rerun() else: st.error("Invalid credentials") with tab2: with st.form("Register"): new_user = st.text_input("New Username") new_pass = st.text_input("New Password", type="password") if st.form_submit_button("Register"): if User.get_by_username(new_user): st.error("Username already exists") else: user = User.create(new_user, new_pass) st.session_state.user = user st.session_state.chat_history = [] st.rerun() with tab3: with st.form("Admin Login"): admin_pin = st.text_input("Admin PIN", type="password") if st.form_submit_button("Admin Login"): if admin_pin == st.secrets["ADMIN_PIN"]: st.session_state.admin_mode = True st.rerun() else: st.error("Invalid Admin PIN") elif st.session_state.admin_mode: admin_dashboard() if st.button("Exit Admin Mode"): st.session_state.admin_mode = False st.rerun() else: # Chat Interface st.header(f"Welcome to {company_name}, {st.session_state.user.username}!") st.subheader("Chat with our AI Sales Assistant") # Display Chat History for msg in st.session_state.chat_history: if msg["type"] == "human": with st.chat_message("user"): st.write(msg["content"]) else: with st.chat_message("assistant"): st.write(msg["content"]) # Enhanced System Prompt company_settings = Company.get_settings() current_products = "\n".join([f"- {p[1]} (${p[3]}, Stock: {p[6]})" for p in Product.get_all()]) system_prompt = f""" You are {company_settings[3]}, the AI Sales Assistant for {company_settings[1]} ({company_settings[2]}). Your primary role is to assist customers with product inquiries, make appropriate recommendations, and facilitate purchases while strictly adhering to company policies. ## Company Profile - Company Name: {company_settings[1]} - Business Type: {company_settings[2]} - Key Features: {company_settings[4]} - Agent Name: {company_settings[3]} ## Inventory Management Policy 1. **Stock Verification**: - ALWAYS check current stock before recommending any product - Never suggest out-of-stock items (stock = 0) - For low stock items (stock ≤ 3), mention: "Only [X] left in stock!" 2. **Product Recommendations**: - Only recommend products from our current inventory - If asked for unavailable items, respond with: "I apologize, we don't currently carry that item. As a {company_settings[2]}, we specialize in [relevant products]. May I suggest [alternative]?" - When suggesting alternatives, ensure they're in stock 3. **Purchase Process**: - Confirm product availability before purchase - Generate payment link only after stock verification - Update inventory immediately after successful purchase ## Conversation Flow Examples ### 1. Greeting & Need Assessment User: "Hi, I need a new laptop" You: "Hello! I'd be happy to help you find the perfect laptop. Could you tell me what you'll primarily be using it for and your budget range?" ### 2. Product Recommendation (In-Stock) User: "I need a gaming laptop under $1500" You: "We have the XYZ Gaming Laptop available for $1399 (3 in stock). It features [key specs]. Would you like more details?" ### 3. Product Recommendation (Out-of-Stock) User: "Do you have ABC Smartphone?" You: "I apologize, the ABC Smartphone is currently out of stock. However, we have the DEF Smartphone with similar features available for $599 (5 in stock). Would you like me to tell you more about it?" ### 4. Handling Off-Topic Requests User: "Do you sell snacks?" You: "I apologize, as a {company_settings[2]}, we specialize in [tech products]. We don't carry food items. Is there a tech product I can assist you with today?" ### 5. Purchase Process User: "I want to buy the XYZ Laptop" You: "Great choice! The XYZ Laptop is available for $1399 (2 in stock). I can generate a secure payment link for you. Would you like to proceed with the purchase? [https://www.example.com/payment]" ## Communication Guidelines - Tone: Professional yet friendly (like a knowledgeable salesperson) - Language: Clear, concise, avoid technical jargon unless requested - Emojis: Use sparingly (1-2 per message max) - Branding: Consistently reference {company_settings[1]} when appropriate ## Current Inventory Status {current_products} ## Important Reminders 1. NEVER recommend products not in our inventory 2. ALWAYS verify stock before discussing any product 3. Update inventory immediately after purchases 4. Politely redirect off-topic requests to our product offerings 5. For complex queries, offer to connect with human support """ prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) tools = [tool] agent = create_tool_calling_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) if prompt_input := st.chat_input("Type your message here..."): with st.chat_message("user"): st.write(prompt_input) with st.chat_message("assistant"): response = agent_executor.invoke({ "input": prompt_input, "chat_history": [HumanMessage(content=msg["content"]) if msg["type"] == "human" else AIMessage(content=msg["content"]) for msg in st.session_state.chat_history] })["output"] st.write(response) # Handle inventory update on purchase if "https://www.example.com/payment" in response: # Extract product name from the agent's response product_name = None if "proceed with the purchase of" in response: start = response.find("proceed with the purchase of") + len("proceed with the purchase of") end = response.find("?", start) product_name = response[start:end].strip() if product_name: # Retrieve current product details docs = retrieve_query(product_name) if docs: product_doc = docs[0] product_id = product_doc.metadata.get("id") current_stock = product_doc.metadata.get("stock") if product_id and current_stock > 0: try: conn = sqlite3.connect('users.db') c = conn.cursor() # Verify current stock c.execute('SELECT stock FROM products WHERE id = ?', (product_id,)) actual_stock = c.fetchone()[0] if actual_stock > 0: # Update product stock c.execute('UPDATE products SET stock = ? WHERE id = ?', (actual_stock - 1, product_id)) # Update user's purchase history c.execute('SELECT previous_products_bought FROM users WHERE id = ?', (st.session_state.user.id,)) current_purchases = c.fetchone()[0] updated_purchases = eval(current_purchases) + [product_name] if current_purchases else [product_name] c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', (str(updated_purchases), st.session_state.user.id)) conn.commit() st.session_state.user.products_bought.append(product_name) st.success(f"Successfully purchased {product_name}! Stock updated.") load_data.clear() # Check for low stock if (actual_stock - 1) <= 3 and (actual_stock - 1) > 0: st.warning(f"Low stock alert: Only {actual_stock - 1} units of {product_name} remaining!") else: st.error(f"Sorry, {product_name} is now out of stock!") except Exception as e: st.error(f"Error processing purchase: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close() else: st.error("Product is out of stock or not found in the inventory.") else: st.error("Could not retrieve product details. Please try again.") else: st.error("Please specify which product you'd like to purchase.") new_messages = [ {"type": "human", "content": prompt_input}, {"type": "ai", "content": response} ] st.session_state.user.update_chat_history(new_messages) st.session_state.chat_history += new_messages if __name__ == "__main__": main()