Spaces:
Sleeping
Sleeping
| import os | |
| import sqlite3 | |
| import streamlit as st | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from langchain_groq import ChatGroq | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.document_loaders.csv_loader import CSVLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.vectorstores import InMemoryVectorStore | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.tools import Tool | |
| from langchain.agents import AgentExecutor, create_tool_calling_agent | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from langchain.docstore.document import Document | |
| # --- Database Setup --- | |
| def init_db(): | |
| conn = sqlite3.connect('users.db', check_same_thread=False) | |
| c = conn.cursor() | |
| # Users table | |
| c.execute('''CREATE TABLE IF NOT EXISTS users | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| password TEXT NOT NULL, | |
| previous_chat_history TEXT, | |
| previous_products_bought TEXT)''') | |
| # Company settings table | |
| c.execute('''CREATE TABLE IF NOT EXISTS company_settings | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| business TEXT NOT NULL, | |
| agent_name TEXT NOT NULL, | |
| key_features TEXT NOT NULL)''') | |
| # Products table with inventory | |
| c.execute('''CREATE TABLE IF NOT EXISTS products | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| category TEXT NOT NULL, | |
| price REAL NOT NULL, | |
| description TEXT NOT NULL, | |
| features TEXT NOT NULL, | |
| stock INTEGER NOT NULL DEFAULT 0)''') | |
| # Check and update schema if needed | |
| c.execute("PRAGMA table_info(products)") | |
| columns = [column[1] for column in c.fetchall()] | |
| if 'stock' not in columns: | |
| c.execute('ALTER TABLE products ADD COLUMN stock INTEGER NOT NULL DEFAULT 0') | |
| # Insert default company settings if empty | |
| c.execute('SELECT COUNT(*) FROM company_settings') | |
| if c.fetchone()[0] == 0: | |
| c.execute('''INSERT INTO company_settings | |
| (name, business, agent_name, key_features) | |
| VALUES (?, ?, ?, ?)''', | |
| ('TechElectronics', | |
| 'Consumer Electronics Retailer', | |
| 'Alex', | |
| 'Cutting-edge technology, Competitive pricing, Excellent customer service')) | |
| conn.commit() | |
| return conn | |
| conn = init_db() | |
| # --- Admin Classes --- | |
| class Company: | |
| def get_settings(): | |
| c = conn.cursor() | |
| c.execute('SELECT * FROM company_settings LIMIT 1') | |
| return c.fetchone() | |
| def update_settings(name, business, agent_name, key_features): | |
| c = conn.cursor() | |
| c.execute('''UPDATE company_settings | |
| SET name=?, business=?, agent_name=?, key_features=? | |
| WHERE id=1''', | |
| (name, business, agent_name, key_features)) | |
| conn.commit() | |
| class Product: | |
| def get_all(): | |
| c = conn.cursor() | |
| c.execute('SELECT * FROM products') | |
| return c.fetchall() | |
| def get_by_id(product_id): | |
| c = conn.cursor() | |
| c.execute('SELECT * FROM products WHERE id = ?', (product_id,)) | |
| return c.fetchone() | |
| def add(name, category, price, description, features, stock): | |
| c = conn.cursor() | |
| c.execute('''INSERT INTO products | |
| (name, category, price, description, features, stock) | |
| VALUES (?, ?, ?, ?, ?, ?)''', | |
| (name, category, price, description, features, stock)) | |
| conn.commit() | |
| def delete(product_id): | |
| c = conn.cursor() | |
| c.execute('DELETE FROM products WHERE id=?', (product_id,)) | |
| conn.commit() | |
| def update_stock(product_id, new_stock): | |
| c = conn.cursor() | |
| c.execute('UPDATE products SET stock=? WHERE id=?', (new_stock, product_id)) | |
| conn.commit() | |
| # --- User Class --- | |
| class User: | |
| def __init__(self, id, username, password, chat_history=None, products_bought=None): | |
| self.id = id | |
| self.username = username | |
| self.password = password | |
| self.chat_history = chat_history or [] | |
| self.products_bought = products_bought or [] | |
| def create(cls, username, password): | |
| hashed_pw = generate_password_hash(password) | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| c.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, hashed_pw)) | |
| user_id = c.lastrowid | |
| conn.commit() | |
| conn.close() | |
| return cls(user_id, username, hashed_pw) | |
| def get_by_username(cls, username): | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| c.execute('SELECT * FROM users WHERE username = ?', (username,)) | |
| user = c.fetchone() | |
| conn.close() | |
| if user: | |
| return cls(user[0], user[1], user[2], | |
| eval(user[3]) if user[3] else [], | |
| eval(user[4]) if user[4] else []) | |
| return None | |
| def update_chat_history(self, new_messages): | |
| updated_history = self.chat_history + new_messages | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| c.execute('UPDATE users SET previous_chat_history = ? WHERE id = ?', | |
| (str(updated_history), self.id)) | |
| conn.commit() | |
| conn.close() | |
| self.chat_history = updated_history # Update in-memory | |
| def update_products_bought(self, new_products): | |
| updated_products = self.products_bought + new_products | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', | |
| (str(updated_products), self.id)) | |
| conn.commit() | |
| conn.close() | |
| self.products_bought = updated_products # Update in-memory | |
| # --- AI Setup --- | |
| os.environ["GROQ_API_KEY"] = st.secrets["GROQ_API_KEY"] | |
| llm = ChatGroq( | |
| temperature=0.1, | |
| model_name="llama3-8b-8192", | |
| api_key=st.secrets["GROQ_API_KEY"], | |
| ) | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| def load_data(): | |
| products = Product.get_all() | |
| docs = [] | |
| for p in products: | |
| content = f"Name: {p[1]}\nCategory: {p[2]}\nPrice: {p[3]}\nDescription: {p[4]}\nFeatures: {p[5]}\nStock: {p[6]}" | |
| metadata = {"id": p[0], "name": p[1], "category": p[2], "price": p[3], "stock": p[6]} | |
| docs.append(Document(page_content=content, metadata=metadata)) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=20) | |
| splits = text_splitter.split_documents(docs) | |
| vectorstore = InMemoryVectorStore.from_documents(documents=splits, embedding=embeddings) | |
| return vectorstore.as_retriever() | |
| retriever = load_data() | |
| def retrieve_query(query: str): | |
| docs = retriever.get_relevant_documents(query) | |
| st.session_state.last_retrieved_docs = docs | |
| return docs | |
| tool = Tool( | |
| name="product_retriever", | |
| func=retrieve_query, | |
| description="Useful for retrieving product information including current stock levels" | |
| ) | |
| # --- Admin Dashboard --- | |
| def admin_dashboard(): | |
| st.header("Admin Dashboard") | |
| with st.expander("Company Settings"): | |
| current_settings = Company.get_settings() | |
| with st.form("Company Settings Form"): | |
| name = st.text_input("Company Name", value=current_settings[1]) | |
| business = st.text_input("Business", value=current_settings[2]) | |
| agent_name = st.text_input("Agent Name", value=current_settings[3]) | |
| key_features = st.text_area("Key Features", value=current_settings[4]) | |
| if st.form_submit_button("Update Settings"): | |
| Company.update_settings(name, business, agent_name, key_features) | |
| st.success("Settings updated!") | |
| with st.expander("Product Management"): | |
| with st.form("Add Product"): | |
| st.subheader("Add New Product") | |
| name = st.text_input("Product Name") | |
| category = st.text_input("Category") | |
| price = st.number_input("Price", min_value=0.0) | |
| description = st.text_area("Description") | |
| features = st.text_area("Features") | |
| stock = st.number_input("Initial Stock", min_value=0, value=0) | |
| if st.form_submit_button("Add Product"): | |
| Product.add(name, category, price, description, features, stock) | |
| st.success("Product added!") | |
| load_data.clear() | |
| st.subheader("Manage Inventory") | |
| products = Product.get_all() | |
| if products: | |
| for p in products: | |
| cols = st.columns([3,2,2,1]) | |
| cols[0].write(f"**{p[1]}** ({p[2]})") | |
| cols[1].write(f"Price: ${p[3]}") | |
| new_stock = cols[2].number_input( | |
| "Stock", | |
| min_value=0, | |
| value=p[6], | |
| key=f"stock_{p[0]}" | |
| ) | |
| if new_stock != p[6]: | |
| Product.update_stock(p[0], new_stock) | |
| st.rerun() | |
| if cols[3].button("❌", key=f"del_{p[0]}"): | |
| Product.delete(p[0]) | |
| st.rerun() | |
| else: | |
| st.info("No products found in database") | |
| # --- Main App --- | |
| def main(): | |
| company_settings = Company.get_settings() | |
| company_name = company_settings[1] | |
| st.title("AI Sales Assistant 🤖") | |
| if 'user' not in st.session_state: | |
| st.session_state.user = None | |
| st.session_state.chat_history = [] | |
| st.session_state.admin_mode = False | |
| st.session_state.last_retrieved_docs = [] | |
| # Authentication | |
| if not st.session_state.user and not st.session_state.admin_mode: | |
| st.header("Login/Register Admin") | |
| tab1, tab2, tab3 = st.tabs(["Login", "Register", "Admin"]) | |
| with tab1: | |
| with st.form("Login"): | |
| username = st.text_input("Username") | |
| password = st.text_input("Password", type="password") | |
| if st.form_submit_button("Login"): | |
| user = User.get_by_username(username) | |
| if user and check_password_hash(user.password, password): | |
| st.session_state.user = user | |
| st.session_state.chat_history = user.chat_history | |
| st.rerun() | |
| else: | |
| st.error("Invalid credentials") | |
| with tab2: | |
| with st.form("Register"): | |
| new_user = st.text_input("New Username") | |
| new_pass = st.text_input("New Password", type="password") | |
| if st.form_submit_button("Register"): | |
| if User.get_by_username(new_user): | |
| st.error("Username already exists") | |
| else: | |
| user = User.create(new_user, new_pass) | |
| st.session_state.user = user | |
| st.session_state.chat_history = [] | |
| st.rerun() | |
| with tab3: | |
| with st.form("Admin Login"): | |
| admin_pin = st.text_input("Admin PIN", type="password") | |
| if st.form_submit_button("Admin Login"): | |
| if admin_pin == st.secrets["ADMIN_PIN"]: | |
| st.session_state.admin_mode = True | |
| st.rerun() | |
| else: | |
| st.error("Invalid Admin PIN") | |
| elif st.session_state.admin_mode: | |
| admin_dashboard() | |
| if st.button("Exit Admin Mode"): | |
| st.session_state.admin_mode = False | |
| st.rerun() | |
| else: | |
| # Chat Interface | |
| st.header(f"Welcome to {company_name}, {st.session_state.user.username} 😊!") | |
| st.subheader("Chat with our AI Sales Assistant") | |
| # Display Chat History | |
| for msg in st.session_state.chat_history: | |
| if msg["type"] == "human": | |
| with st.chat_message("user"): | |
| st.write(msg["content"]) | |
| else: | |
| with st.chat_message("assistant"): | |
| st.write(msg["content"]) | |
| # Enhanced System Prompt | |
| company_settings = Company.get_settings() | |
| current_products = "\n".join([f"- {p[1]} (${p[3]}, Stock: {p[6]})" for p in Product.get_all()]) | |
| system_prompt = f""" | |
| You are {company_settings[3]}, the AI Sales Assistant for {company_settings[1]} ({company_settings[2]}). | |
| Company Profile: | |
| - Company Name: {company_settings[1]} | |
| - Business: {company_settings[2]} | |
| - Key Features: {company_settings[4]} | |
| Product Policy & Inventory Management: | |
| 1. Product Recommendations: | |
| - Only recommend products with available stock (stock > 0) | |
| - Always check current stock levels before making recommendations | |
| - For out-of-stock items, suggest similar in-stock alternatives | |
| - Never promote or discuss products not in our inventory | |
| 2. Stock Management: | |
| - Track real-time inventory levels for all products | |
| - Update stock automatically after successful purchases | |
| - Alert customers about low stock items (stock ≤ 3) | |
| - Inform when items are temporarily out of stock | |
| Current Inventory: | |
| {current_products} | |
| Sales Process & Conversation Flow: | |
| 1. Greeting & Need Assessment | |
| - Warm, professional welcome | |
| - Ask relevant questions to understand customer needs | |
| - Listen actively and acknowledge customer preferences | |
| 2. Product Consultation | |
| - Recommend suitable products based on customer needs | |
| - Highlight relevant features and benefits | |
| - Provide accurate pricing and stock information | |
| - Compare products when appropriate | |
| 3. Objection Handling | |
| - Address concerns professionally | |
| - Provide factual information | |
| - Offer alternatives when necessary | |
| - Focus on value proposition | |
| 4. Purchase Process | |
| - Clear explanation of payment process | |
| - Generate payment link when customer is ready [https://www.example.com/payment] | |
| - Confirm stock availability before purchase | |
| - Update inventory after successful transaction | |
| Communication Style: | |
| - Professional yet friendly tone | |
| - Clear and concise explanations | |
| - Empathetic to customer needs | |
| - Use simple, non-technical language | |
| - Maintain consistent branding | |
| - Limited emoji use (max 1-2 per message) | |
| Response Templates: | |
| For out-of-stock items: | |
| "I apologize, but [product] is currently out of stock. However, I can recommend [alternative product] which has similar features and is available now. Would you like to learn more about it?" | |
| For low stock items: | |
| "Just to let you know, we only have [X] units of [product] remaining in stock. I'd recommend making your decision soon if you're interested." | |
| For unrelated products: | |
| "I apologize, but as a {company_settings[2]}, we don't carry that item. However, I can suggest [relevant alternative] from our current inventory that might meet your needs. Would you like to learn more?" | |
| For payment process: | |
| "I'm glad you're interested in [product]. I can generate a secure payment link for you to complete your purchase. The current price is $[price]. Would you like to proceed with the purchase of [product]?" | |
| Remember to: | |
| 1. Always verify stock before recommending products | |
| 2. Keep track of customer preferences | |
| 3. Provide accurate product information | |
| 4. Update inventory after successful purchases | |
| 5. Maintain professional communication | |
| """ | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), | |
| MessagesPlaceholder(variable_name="chat_history"), | |
| ("human", "{input}"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad") | |
| ]) | |
| tools = [tool] | |
| agent = create_tool_calling_agent(llm, tools, prompt) | |
| agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
| if prompt_input := st.chat_input("Type your message here..."): | |
| with st.chat_message("user"): | |
| st.write(prompt_input) | |
| with st.chat_message("assistant"): | |
| response = agent_executor.invoke({ | |
| "input": prompt_input, | |
| "chat_history": [HumanMessage(content=msg["content"]) if msg["type"] == "human" else AIMessage(content=msg["content"]) | |
| for msg in st.session_state.chat_history] | |
| })["output"] | |
| st.write(response) | |
| # Handle inventory update on purchase | |
| if "https://www.example.com/payment" in response: | |
| # Extract product name from the agent's response | |
| product_name = None | |
| if "proceed with the purchase of" in response: | |
| start = response.find("proceed with the purchase of") + len("proceed with the purchase of") | |
| end = response.find("?", start) | |
| product_name = response[start:end].strip() | |
| if product_name: | |
| # Retrieve current product details | |
| docs = retrieve_query(product_name) | |
| if docs: | |
| product_doc = docs[0] | |
| product_id = product_doc.metadata.get("id") | |
| current_stock = product_doc.metadata.get("stock") | |
| if product_id and current_stock > 0: | |
| try: | |
| conn = sqlite3.connect('users.db') | |
| c = conn.cursor() | |
| # Verify current stock | |
| c.execute('SELECT stock FROM products WHERE id = ?', (product_id,)) | |
| actual_stock = c.fetchone()[0] | |
| if actual_stock > 0: | |
| # Update product stock | |
| c.execute('UPDATE products SET stock = ? WHERE id = ?', | |
| (actual_stock - 1, product_id)) | |
| # Update user's purchase history | |
| c.execute('SELECT previous_products_bought FROM users WHERE id = ?', | |
| (st.session_state.user.id,)) | |
| current_purchases = c.fetchone()[0] | |
| updated_purchases = eval(current_purchases) + [product_name] if current_purchases else [product_name] | |
| c.execute('UPDATE users SET previous_products_bought = ? WHERE id = ?', | |
| (str(updated_purchases), st.session_state.user.id)) | |
| conn.commit() | |
| st.session_state.user.products_bought.append(product_name) | |
| st.success(f"Successfully purchased {product_name}! Stock updated.") | |
| load_data.clear() | |
| # Check for low stock | |
| if (actual_stock - 1) <= 3 and (actual_stock - 1) > 0: | |
| st.warning(f"Low stock alert: Only {actual_stock - 1} units of {product_name} remaining!") | |
| else: | |
| st.error(f"Sorry, {product_name} is now out of stock!") | |
| except Exception as e: | |
| st.error(f"Error processing purchase: {str(e)}") | |
| if conn: | |
| conn.rollback() | |
| finally: | |
| if conn: | |
| conn.close() | |
| else: | |
| st.error("Product is out of stock or not found in the inventory.") | |
| else: | |
| st.error("Could not retrieve product details. Please try again.") | |
| else: | |
| st.error("Please specify which product you'd like to purchase.") | |
| new_messages = [ | |
| {"type": "human", "content": prompt_input}, | |
| {"type": "ai", "content": response} | |
| ] | |
| st.session_state.user.update_chat_history(new_messages) | |
| st.session_state.chat_history += new_messages | |
| if __name__ == "__main__": | |
| main() |