import os import sqlite3 import streamlit as st from werkzeug.security import generate_password_hash, check_password_hash from langchain_groq import ChatGroq from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.document_loaders.csv_loader import CSVLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.vectorstores import InMemoryVectorStore from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.tools import Tool from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_core.messages import HumanMessage, AIMessage from langchain.docstore.document import Document # --- Database Setup --- @st.cache_resource def init_db(): conn = sqlite3.connect('users.db', check_same_thread=False) c = conn.cursor() # Users table c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, previous_chat_history TEXT, previous_products_bought TEXT)''') # Company settings table c.execute('''CREATE TABLE IF NOT EXISTS company_settings (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, business TEXT NOT NULL, agent_name TEXT NOT NULL, key_features TEXT NOT NULL)''') # Products table with inventory c.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT NOT NULL, price REAL NOT NULL, description TEXT NOT NULL, features TEXT NOT NULL, stock INTEGER NOT NULL DEFAULT 0)''') # Check and update schema if needed c.execute("PRAGMA table_info(products)") columns = [column[1] for column in c.fetchall()] if 'stock' not in columns: c.execute('ALTER TABLE products ADD COLUMN stock INTEGER NOT NULL DEFAULT 0') # Insert default company settings if empty c.execute('SELECT COUNT(*) FROM company_settings') if c.fetchone()[0] == 0: c.execute('''INSERT INTO company_settings (name, business, agent_name, key_features) VALUES (?, ?, ?, ?)''', ('TechElectronics', 'Consumer Electronics Retailer', 'Alex', 'Cutting-edge technology, Competitive pricing, Excellent customer service')) conn.commit() return conn conn = init_db() # --- Admin Classes --- class Company: @staticmethod def get_settings(): c = conn.cursor() c.execute('SELECT * FROM company_settings LIMIT 1') return c.fetchone() @staticmethod def update_settings(name, business, agent_name, key_features): c = conn.cursor() c.execute('''UPDATE company_settings SET name=?, business=?, agent_name=?, key_features=? WHERE id=1''', (name, business, agent_name, key_features)) conn.commit() class Product: @staticmethod def get_all(): c = conn.cursor() c.execute('SELECT * FROM products') return c.fetchall() @staticmethod def get_by_id(product_id): c = conn.cursor() c.execute('SELECT * FROM products WHERE id = ?', (product_id,)) return c.fetchone() @staticmethod def add(name, category, price, description, features, stock): c = conn.cursor() c.execute('''INSERT INTO products (name, category, price, description, features, stock) VALUES (?, ?, ?, ?, ?, ?)''', (name, category, price, description, features, stock)) conn.commit() @staticmethod def delete(product_id): c = conn.cursor() c.execute('DELETE FROM products WHERE id=?', (product_id,)) conn.commit() @staticmethod def update_stock(product_id, new_stock): c = conn.cursor() c.execute('UPDATE products SET stock=? WHERE id=?', (new_stock, product_id)) conn.commit() # --- User Class --- class User: def __init__(self, id, username, password, chat_history=None, products_bought=None): self.id = id self.username = username self.password = password self.chat_history = chat_history or [] self.products_bought = products_bought or [] @classmethod def create(cls, username, password): hashed_pw = generate_password_hash(password) conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, hashed_pw)) user_id = c.lastrowid conn.commit() conn.close() return cls(user_id, username, hashed_pw) @classmethod def get_by_username(cls, username): conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('SELECT * FROM users WHERE username = ?', (username,)) user = c.fetchone() conn.close() if user: return cls(user[0], user[1], user[2], eval(user[3]) if user[3] else [], eval(user[4]) if user[4] else []) return None def update_chat_history(self, new_messages): updated_history = self.chat_history + new_messages conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('UPDATE users SET previous_chat_history = ? WHERE id = ?', (str(updated_history), self.id)) conn.commit() conn.close() self.chat_history = updated_history # Update in-memory def update_products_bought(self, new_products): updated_products = self.products_bought + new_products conn = sqlite3.connect('users.db') c = conn.cursor() c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', (str(updated_products), self.id)) conn.commit() conn.close() self.products_bought = updated_products # Update in-memory # --- AI Setup --- os.environ["GROQ_API_KEY"] = st.secrets["GROQ_API_KEY"] llm = ChatGroq( temperature=0.1, model_name="llama3-8b-8192", api_key=st.secrets["GROQ_API_KEY"], ) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") @st.cache_resource(show_spinner=False) def load_data(): products = Product.get_all() docs = [] for p in products: content = f"Name: {p[1]}\nCategory: {p[2]}\nPrice: {p[3]}\nDescription: {p[4]}\nFeatures: {p[5]}\nStock: {p[6]}" metadata = {"id": p[0], "name": p[1], "category": p[2], "price": p[3], "stock": p[6]} docs.append(Document(page_content=content, metadata=metadata)) text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=20) splits = text_splitter.split_documents(docs) vectorstore = InMemoryVectorStore.from_documents(documents=splits, embedding=embeddings) return vectorstore.as_retriever() retriever = load_data() def retrieve_query(query: str): docs = retriever.get_relevant_documents(query) st.session_state.last_retrieved_docs = docs return docs tool = Tool( name="product_retriever", func=retrieve_query, description="Useful for retrieving product information including current stock levels" ) # --- Admin Dashboard --- def admin_dashboard(): st.header("Admin Dashboard") with st.expander("Company Settings"): current_settings = Company.get_settings() with st.form("Company Settings Form"): name = st.text_input("Company Name", value=current_settings[1]) business = st.text_input("Business", value=current_settings[2]) agent_name = st.text_input("Agent Name", value=current_settings[3]) key_features = st.text_area("Key Features", value=current_settings[4]) if st.form_submit_button("Update Settings"): Company.update_settings(name, business, agent_name, key_features) st.success("Settings updated!") with st.expander("Product Management"): with st.form("Add Product"): st.subheader("Add New Product") name = st.text_input("Product Name") category = st.text_input("Category") price = st.number_input("Price", min_value=0.0) description = st.text_area("Description") features = st.text_area("Features") stock = st.number_input("Initial Stock", min_value=0, value=0) if st.form_submit_button("Add Product"): Product.add(name, category, price, description, features, stock) st.success("Product added!") load_data.clear() st.subheader("Manage Inventory") products = Product.get_all() if products: for p in products: cols = st.columns([3,2,2,1]) cols[0].write(f"**{p[1]}** ({p[2]})") cols[1].write(f"Price: ${p[3]}") new_stock = cols[2].number_input( "Stock", min_value=0, value=p[6], key=f"stock_{p[0]}" ) if new_stock != p[6]: Product.update_stock(p[0], new_stock) st.rerun() if cols[3].button("❌", key=f"del_{p[0]}"): Product.delete(p[0]) st.rerun() else: st.info("No products found in database") # --- Main App --- def main(): company_settings = Company.get_settings() company_name = company_settings[1] st.title("AI Sales Assistant 🤖") if 'user' not in st.session_state: st.session_state.user = None st.session_state.chat_history = [] st.session_state.admin_mode = False st.session_state.last_retrieved_docs = [] # Authentication if not st.session_state.user and not st.session_state.admin_mode: st.header("Login/Register Admin") tab1, tab2, tab3 = st.tabs(["Login", "Register", "Admin"]) with tab1: with st.form("Login"): username = st.text_input("Username") password = st.text_input("Password", type="password") if st.form_submit_button("Login"): user = User.get_by_username(username) if user and check_password_hash(user.password, password): st.session_state.user = user st.session_state.chat_history = user.chat_history st.rerun() else: st.error("Invalid credentials") with tab2: with st.form("Register"): new_user = st.text_input("New Username") new_pass = st.text_input("New Password", type="password") if st.form_submit_button("Register"): if User.get_by_username(new_user): st.error("Username already exists") else: user = User.create(new_user, new_pass) st.session_state.user = user st.session_state.chat_history = [] st.rerun() with tab3: with st.form("Admin Login"): admin_pin = st.text_input("Admin PIN", type="password") if st.form_submit_button("Admin Login"): if admin_pin == st.secrets["ADMIN_PIN"]: st.session_state.admin_mode = True st.rerun() else: st.error("Invalid Admin PIN") elif st.session_state.admin_mode: admin_dashboard() if st.button("Exit Admin Mode"): st.session_state.admin_mode = False st.rerun() else: # Chat Interface st.header(f"Welcome to {company_name}, {st.session_state.user.username} 😊!") st.subheader("Chat with our AI Sales Assistant") # Display Chat History for msg in st.session_state.chat_history: if msg["type"] == "human": with st.chat_message("user"): st.write(msg["content"]) else: with st.chat_message("assistant"): st.write(msg["content"]) # Enhanced System Prompt company_settings = Company.get_settings() current_products = "\n".join([f"- {p[1]} (${p[3]}, Stock: {p[6]})" for p in Product.get_all()]) system_prompt = f""" You are {company_settings[3]}, the AI Sales Assistant for {company_settings[1]} ({company_settings[2]}). Company Profile: - Company Name: {company_settings[1]} - Business: {company_settings[2]} - Key Features: {company_settings[4]} Product Policy & Inventory Management: 1. Product Recommendations: - Only recommend products with available stock (stock > 0) - Always check current stock levels before making recommendations - For out-of-stock items, suggest similar in-stock alternatives - Never promote or discuss products not in our inventory 2. Stock Management: - Track real-time inventory levels for all products - Update stock automatically after successful purchases - Alert customers about low stock items (stock ≤ 3) - Inform when items are temporarily out of stock Current Inventory: {current_products} Sales Process & Conversation Flow: 1. Greeting & Need Assessment - Warm, professional welcome - Ask relevant questions to understand customer needs - Listen actively and acknowledge customer preferences 2. Product Consultation - Recommend suitable products based on customer needs - Highlight relevant features and benefits - Provide accurate pricing and stock information - Compare products when appropriate 3. Objection Handling - Address concerns professionally - Provide factual information - Offer alternatives when necessary - Focus on value proposition 4. Purchase Process - Clear explanation of payment process - Generate payment link when customer is ready [https://www.example.com/payment] - Confirm stock availability before purchase - Update inventory after successful transaction Communication Style: - Professional yet friendly tone - Clear and concise explanations - Empathetic to customer needs - Use simple, non-technical language - Maintain consistent branding - Limited emoji use (max 1-2 per message) Response Templates: For out-of-stock items: "I apologize, but [product] is currently out of stock. However, I can recommend [alternative product] which has similar features and is available now. Would you like to learn more about it?" For low stock items: "Just to let you know, we only have [X] units of [product] remaining in stock. I'd recommend making your decision soon if you're interested." For unrelated products: "I apologize, but as a {company_settings[2]}, we don't carry that item. However, I can suggest [relevant alternative] from our current inventory that might meet your needs. Would you like to learn more?" For payment process: "I'm glad you're interested in [product]. I can generate a secure payment link for you to complete your purchase. The current price is $[price]. Would you like to proceed with the purchase of [product]?" Remember to: 1. Always verify stock before recommending products 2. Keep track of customer preferences 3. Provide accurate product information 4. Update inventory after successful purchases 5. Maintain professional communication """ prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) tools = [tool] agent = create_tool_calling_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) if prompt_input := st.chat_input("Type your message here..."): with st.chat_message("user"): st.write(prompt_input) with st.chat_message("assistant"): response = agent_executor.invoke({ "input": prompt_input, "chat_history": [HumanMessage(content=msg["content"]) if msg["type"] == "human" else AIMessage(content=msg["content"]) for msg in st.session_state.chat_history] })["output"] st.write(response) # Handle inventory update on purchase if "https://www.example.com/payment" in response: # Extract product name from the agent's response product_name = None if "proceed with the purchase of" in response: start = response.find("proceed with the purchase of") + len("proceed with the purchase of") end = response.find("?", start) product_name = response[start:end].strip() if product_name: # Retrieve current product details docs = retrieve_query(product_name) if docs: product_doc = docs[0] product_id = product_doc.metadata.get("id") current_stock = product_doc.metadata.get("stock") if product_id and current_stock > 0: try: conn = sqlite3.connect('users.db') c = conn.cursor() # Verify current stock c.execute('SELECT stock FROM products WHERE id = ?', (product_id,)) actual_stock = c.fetchone()[0] if actual_stock > 0: # Update product stock c.execute('UPDATE products SET stock = ? WHERE id = ?', (actual_stock - 1, product_id)) # Update user's purchase history c.execute('SELECT previous_products_bought FROM users WHERE id = ?', (st.session_state.user.id,)) current_purchases = c.fetchone()[0] updated_purchases = eval(current_purchases) + [product_name] if current_purchases else [product_name] c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', (str(updated_purchases), st.session_state.user.id)) conn.commit() st.session_state.user.products_bought.append(product_name) st.success(f"Successfully purchased {product_name}! Stock updated.") load_data.clear() # Check for low stock if (actual_stock - 1) <= 3 and (actual_stock - 1) > 0: st.warning(f"Low stock alert: Only {actual_stock - 1} units of {product_name} remaining!") else: st.error(f"Sorry, {product_name} is now out of stock!") except Exception as e: st.error(f"Error processing purchase: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close() else: st.error("Product is out of stock or not found in the inventory.") else: st.error("Could not retrieve product details. Please try again.") else: st.error("Please specify which product you'd like to purchase.") new_messages = [ {"type": "human", "content": prompt_input}, {"type": "ai", "content": response} ] st.session_state.user.update_chat_history(new_messages) st.session_state.chat_history += new_messages if __name__ == "__main__": main()