Files
demo/pymesbot/backend/main.py
Renato 0d4e281829 Implementa sistema de skills para búsqueda inteligente
- Carga skills desde archivos .md en workspace/skills/
- Agrega skill vendedor_libreria con contexto completo
- Agrega skill busqueda_productos con manejo de sinónimos
- Agrega skill gestion_ventas para registrar ventas
- Nueva tool listar_todo_el_stock para ver inventario completo
- Mejora system prompt con reglas claras de uso de tools
- Agrega mapeo de sinónimos (fibras→marcadores, bloc→papel)
- Procesa múltiples productos en una sola consulta
- Ofrece alternativas inteligentes cuando no hay stock
2026-02-15 22:25:24 +01:00

1084 lines
36 KiB
Python

import json
import os
import sqlite3
import logging
import unicodedata
from datetime import datetime
from pathlib import Path
from typing import Optional, List
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def normalize_text(text: str) -> str:
"""Remove accents and normalize text for searching"""
# Normalize to NFKD and remove combining characters (accents)
normalized = unicodedata.normalize("NFKD", text)
return "".join(c for c in normalized if not unicodedata.combining(c))
from fastapi import (
FastAPI,
WebSocket,
WebSocketDisconnect,
HTTPException,
Request,
)
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
import httpx
# Anthropic API Configuration (Z.ai)
ANTHROPIC_API_KEY = "6fef8efda3d24eb9ad3d718daf1ae9a1.RcFc7QPe5uZLr2mS"
ANTHROPIC_BASE_URL = "https://api.z.ai/api/anthropic"
# Skills directory
SKILLS_DIR = Path(__file__).parent.parent / "picoclaw" / "workspace" / "skills"
def load_skills() -> str:
"""Carga todos los skills disponibles y los combina en un string"""
skills_content = []
if SKILLS_DIR.exists():
for skill_file in sorted(SKILLS_DIR.glob("*.md")):
try:
with open(skill_file, "r", encoding="utf-8") as f:
content = f.read()
skills_content.append(
f"\n{'=' * 60}\nSKILL: {skill_file.stem}\n{'=' * 60}\n{content}"
)
except Exception as e:
logger.warning(f"Error cargando skill {skill_file}: {e}")
return "\n\n".join(skills_content) if skills_content else ""
# Base system prompt
BASE_SYSTEM_PROMPT = """Sos el asistente de ventas de Demo Librería, una librería escolar en Argentina.
## REGLA CRÍTICA - SIEMPRE USAR HERRAMIENTAS
**NUNCA respondas de memoria.** Antes de dar cualquier información sobre productos, precios o stock, DEBÉS usar una herramienta.
## HERRAMIENTAS DISPONIBLES
1. `buscar_productos` - Busca productos específicos por nombre
Úsala cuando el usuario mencione un producto específico: "cuadernos", "lápices", "biromes", etc.
2. `listar_todo_el_stock` - Muestra TODO el inventario completo
Úsala cuando el usuario quiera ver todo: "mostrame todo", "qué tienen", "stock"
3. `confirmar_venta` - Registra una venta
Úsala solo cuando el usuario confirme que se vendió algo
## INSTRUCCIONES IMPORTANTES
1. **NUNCA inventar precios.** Siempre usar herramientas primero.
2. **MÚLTIPLES PRODUCTOS**: Cuando pidan varios productos ("bloc, lápices y fibras"), buscar CADA UNO por separado.
3. Si no hay stock, ofrecer alternativas.
4. Ser conciso (máximo 3-4 líneas).
5. Usar español argentino: "vos", "tenés", "querés".
6. Preguntar siempre al final: "¿Se concretó la venta?"
## MAPEO DE SINÓNIMOS
- "fibras" → buscar "marcadores" o "colores"
- "bloc" → buscar "papel"
- "lapices de colores" → buscar "caja colores"
- "biromes" → buscar "birome"
## EJEMPLOS
Usuario: "bloc de hojas, lápices de 12 y fibras"
Acción:
1. buscar_productos("bloc")
2. buscar_productos("caja colores")
3. buscar_productos("fibras")
Respuesta: Resultados de los 3 productos"""
# Combinar base prompt con skills
SKILLS_CONTENT = load_skills()
SYSTEM_PROMPT = f"{BASE_SYSTEM_PROMPT}\n\n{SKILLS_CONTENT}"
def db_buscar_productos(query: str) -> list:
"""Busca productos en la base de datos"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query_normalized = normalize_text(query.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(marca), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(categoria), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
ORDER BY stock DESC
LIMIT 10
""",
(search_term, search_term, search_term),
)
productos = cursor.fetchall()
conn.close()
return [
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
for p in productos
]
def db_listar_todo_el_stock() -> dict:
"""Lista todo el inventario organizado por categorías"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND stock > 0
ORDER BY categoria, nombre
""")
productos = cursor.fetchall()
conn.close()
# Organizar por categorías
categorias = {}
for p in productos:
cat = p["categoria"]
if cat not in categorias:
categorias[cat] = []
categorias[cat].append(
{
"nombre": p["nombre"],
"marca": p["marca"] or "",
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"categorias": categorias, "total_productos": len(productos)}
def db_confirmar_venta(producto_nombre: str, cantidad: int) -> dict:
"""Registra una venta en la base de datos"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query_normalized = normalize_text(producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
conn.close()
return {"error": f"Producto no encontrado: {producto_nombre}"}
if producto["stock"] < cantidad:
conn.close()
return {
"error": f"Stock insuficiente. Stock actual: {producto['stock']}, solicitado: {cantidad}",
"producto": producto["nombre"],
"stock_disponible": producto["stock"],
}
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(producto["id"], cantidad, producto["precio"], "AI Bot"),
)
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(cantidad, producto["id"]),
)
conn.commit()
cursor.execute("SELECT stock FROM productos WHERE id = ?", (producto["id"],))
nuevo_stock = cursor.fetchone()["stock"]
conn.close()
return {
"success": True,
"producto": producto["nombre"],
"cantidad": cantidad,
"precio_unitario": producto["precio"],
"total": cantidad * producto["precio"],
"stock_nuevo": nuevo_stock,
}
async def chat_with_ai(message: str, session_id: str = "pymesbot") -> Optional[str]:
"""Send message to Anthropic API with tools"""
# Definir las herramientas disponibles
tools = [
{
"name": "buscar_productos",
"description": "Busca productos específicos en el inventario por nombre o descripción. Usar SIEMPRE antes de dar información sobre productos.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Nombre o descripción del producto a buscar (ej: 'cuaderno', 'birome', 'lápices')",
}
},
"required": ["query"],
},
},
{
"name": "listar_todo_el_stock",
"description": "Muestra TODO el inventario completo de la librería organizado por categorías. Usar cuando el usuario quiera ver todo el stock disponible.",
"input_schema": {
"type": "object",
"properties": {},
},
},
{
"name": "confirmar_venta",
"description": "Registra una venta y descuenta el stock del inventario. Usar SOLO cuando el usuario confirme explícitamente que se vendió algo.",
"input_schema": {
"type": "object",
"properties": {
"producto_nombre": {
"type": "string",
"description": "Nombre exacto o similar del producto vendido",
},
"cantidad": {
"type": "integer",
"description": "Cantidad de unidades vendidas",
},
},
"required": ["producto_nombre", "cantidad"],
},
},
]
messages = [{"role": "user", "content": message}]
max_iterations = 5
async with httpx.AsyncClient(timeout=60.0) as client:
for iteration in range(max_iterations):
try:
response = await client.post(
f"{ANTHROPIC_BASE_URL}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"system": SYSTEM_PROMPT,
"messages": messages,
"tools": tools,
},
)
if response.status_code != 200:
logger.error(f"API error: {response.status_code} - {response.text}")
return (
f"Error al procesar la consulta. Por favor, intentá de nuevo."
)
result = response.json()
content = result.get("content", [])
# Verificar si hay tool calls
tool_calls = [
block for block in content if block.get("type") == "tool_use"
]
text_response = [
block for block in content if block.get("type") == "text"
]
if not tool_calls:
# No hay tool calls, devolver la respuesta de texto
if text_response:
return text_response[0].get(
"text", "No pude generar una respuesta."
)
return "No pude generar una respuesta."
# Procesar tool calls
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.get("name")
tool_input = tool_call.get("input", {})
if tool_name == "buscar_productos":
query = tool_input.get("query", "")
productos = db_buscar_productos(query)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps({"productos": productos}),
}
)
elif tool_name == "listar_todo_el_stock":
inventario = db_listar_todo_el_stock()
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(inventario),
}
)
elif tool_name == "confirmar_venta":
producto_nombre = tool_input.get("producto_nombre", "")
cantidad = tool_input.get("cantidad", 1)
resultado = db_confirmar_venta(producto_nombre, cantidad)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(resultado),
}
)
# Agregar el tool use y los resultados a los mensajes
messages.append({"role": "assistant", "content": json.dumps(content)})
messages.append({"role": "user", "content": json.dumps(tool_results)})
except Exception as e:
logger.error(f"Error in chat_with_ai: {e}")
return f"Error al procesar la consulta: {str(e)}"
# Si llegamos aquí, se agotaron las iteraciones
return "Lo siento, no pude completar la consulta en el tiempo permitido."
# Alias para compatibilidad
chat_with_openclaw = chat_with_ai
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "stock.db"
TEMPLATES_DIR = Path(__file__).parent / "templates"
def get_db():
if not DB_PATH.exists():
init_db()
return DB_PATH
def init_db():
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS productos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
marca TEXT,
categoria TEXT NOT NULL DEFAULT 'general',
precio REAL NOT NULL,
stock INTEGER NOT NULL DEFAULT 0,
variantes TEXT,
codigo TEXT,
activo INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS ventas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
producto_id INTEGER NOT NULL,
cantidad INTEGER NOT NULL,
precio_vendido REAL NOT NULL,
vendedor TEXT,
notas TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (producto_id) REFERENCES productos(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS config (
clave TEXT PRIMARY KEY,
valor TEXT NOT NULL
)
""")
default_config = [
("nombre_negocio", "Demo Librería"),
("moneda", "ARS"),
("moneda_simbolo", "$"),
("alerta_stock_minimo", "5"),
("vendedor_pin", "1234"),
("admin_password", "admin123"),
("rubro", "libreria"),
]
for clave, valor in default_config:
cursor.execute(
"INSERT OR IGNORE INTO config (clave, valor) VALUES (?, ?)", (clave, valor)
)
sample_products = [
(
"Birome Bic Cristal Azul",
"Bic",
"escritura",
850,
50,
'{"color": ["azul"]}',
"7501031311309",
),
(
"Birome Bic Cristal Rojo",
"Bic",
"escritura",
850,
30,
'{"color": ["rojo"]}',
"7501031311316",
),
(
"Birome Bic Cristal Negro",
"Bic",
"escritura",
850,
45,
'{"color": ["negro"]}',
"7501031311323",
),
(
"Cuaderno Rivadavia 48 Hojas",
"Rivadavia",
"cuadernos",
2500,
20,
'{"tipo": ["rayado", "blanco"]}',
None,
),
("Lápiz Faber Castell 2B", "Faber Castell", "escritura", 450, 100, None, None),
("Goma de borrar Staedtler", "Staedtler", "escritura", 320, 25, None, None),
("Regla 30cm", "Maped", "geometria", 650, 15, None, None),
("Compás Prisma", "Prisma", "geometria", 2500, 8, None, None),
("Caja de colores 12", "Maped", "colores", 3200, 18, None, None),
("Papel glasé x 20", "Laprida", "colores", 850, 40, None, None),
]
for nombre, marca, categoria, precio, stock, variantes, codigo in sample_products:
v = variantes if variantes else None
cursor.execute(
"""
INSERT INTO productos (nombre, marca, categoria, precio, stock, variantes, codigo)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(nombre, marca, categoria, precio, stock, v, codigo),
)
conn.commit()
conn.close()
class BuscarStockRequest(BaseModel):
query: str
limit: int = 5
class ConfirmarVentaRequest(BaseModel):
producto_id: int
cantidad: int
precio_vendido: float
vendedor: str = "vendedor"
class LoginRequest(BaseModel):
pin: str
app = FastAPI(title="PymesBot Backend")
if TEMPLATES_DIR.exists():
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
else:
templates = None
@app.on_event("startup")
async def startup():
init_db()
@app.get("/health")
async def health():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT valor FROM config WHERE clave = 'nombre_negocio'")
nombre = cursor.fetchone()
conn.close()
return {"status": "ok", "negocio": nombre[0] if nombre else "Demo"}
@app.get("/stock/search")
async def buscar_stock(q: str, limit: int = 5):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
q_lower = q.lower()
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
LOWER(nombre) LIKE ? OR
LOWER(marca) LIKE ? OR
LOWER(categoria) LIKE ?
)
ORDER BY
CASE
WHEN LOWER(nombre) LIKE ? THEN 1
ELSE 2
END,
stock DESC
LIMIT ?
""",
(f"%{q_lower}%", f"%{q_lower}%", f"%{q_lower}%", f"%{q_lower}%", limit),
)
rows = cursor.fetchall()
resultados = []
for row in rows:
resultados.append(
{
"id": row["id"],
"nombre": row["nombre"],
"marca": row["marca"],
"categoria": row["categoria"],
"precio": row["precio"],
"precio_formateado": f"${row['precio']:.0f}",
"stock": row["stock"],
"variantes": json.loads(row["variantes"]) if row["variantes"] else {},
"hay_stock": row["stock"] > 0,
"promo_activa": None,
}
)
conn.close()
return {"resultados": resultados, "total": len(resultados), "query": q}
@app.get("/stock/producto/{producto_id}")
async def get_producto(producto_id: int):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM productos WHERE id = ?", (producto_id,))
row = cursor.fetchone()
conn.close()
if not row:
raise HTTPException(
status_code=404, detail=f"Producto con id {producto_id} no encontrado"
)
return {
"id": row["id"],
"nombre": row["nombre"],
"marca": row["marca"],
"categoria": row["categoria"],
"precio": row["precio"],
"stock": row["stock"],
"variantes": json.loads(row["variantes"]) if row["variantes"] else {},
"codigo": row["codigo"],
"activo": bool(row["activo"]),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
@app.post("/venta/confirmar")
async def confirmar_venta(req: ConfirmarVentaRequest):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM productos WHERE id = ? AND activo = 1", (req.producto_id,)
)
producto = cursor.fetchone()
if not producto:
conn.close()
raise HTTPException(status_code=404, detail="Producto no encontrado")
if producto[4] < req.cantidad:
conn.close()
raise HTTPException(
status_code=400,
detail=f"Stock insuficiente. Stock actual: {producto[4]}, solicitado: {req.cantidad}",
)
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(req.producto_id, req.cantidad, req.precio_vendido, req.vendedor),
)
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(req.cantidad, req.producto_id),
)
conn.commit()
cursor.execute("SELECT stock FROM productos WHERE id = ?", (req.producto_id,))
nuevo_stock = cursor.fetchone()[0]
venta_id = cursor.lastrowid
conn.close()
return {
"venta_id": venta_id,
"producto": producto[1],
"cantidad": req.cantidad,
"precio_vendido": req.precio_vendido,
"total": req.cantidad * req.precio_vendido,
"stock_anterior": producto[4],
"stock_nuevo": nuevo_stock,
"timestamp": datetime.now().isoformat(),
}
# ============================================================================
# TOOLS PARA PICOCLAW
# ============================================================================
class BuscarProductosTool(BaseModel):
query: str
class ConfirmarVentaTool(BaseModel):
producto_nombre: str
cantidad: int
@app.get("/api/tools/listar_productos")
async def tool_listar_productos():
"""Tool para PicoClaw: Lista todos los productos disponibles"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND stock > 0
ORDER BY categoria, nombre
""")
productos = cursor.fetchall()
conn.close()
result = []
for p in productos:
result.append(
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"productos": result, "total": len(result)}
@app.post("/api/tools/buscar_productos")
async def tool_buscar_productos(req: BuscarProductosTool):
"""Tool para PicoClaw: Busca productos por nombre o descripción"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Normalizar búsqueda (quitar tildes)
query_normalized = normalize_text(req.query.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(marca), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(categoria), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
ORDER BY stock DESC
LIMIT 10
""",
(search_term, search_term, search_term),
)
productos = cursor.fetchall()
conn.close()
if not productos:
return {
"productos": [],
"mensaje": f"No se encontraron productos para: {req.query}",
}
result = []
for p in productos:
result.append(
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"productos": result, "query": req.query}
@app.post("/api/tools/confirmar_venta")
async def tool_confirmar_venta(req: ConfirmarVentaTool):
"""Tool para PicoClaw: Registra una venta y descuenta stock"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Buscar producto por nombre
query_normalized = normalize_text(req.producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
conn.close()
return {"error": f"Producto no encontrado: {req.producto_nombre}"}
if producto["stock"] < req.cantidad:
conn.close()
return {
"error": f"Stock insuficiente. Stock actual: {producto['stock']}, solicitado: {req.cantidad}",
"producto": producto["nombre"],
"stock_disponible": producto["stock"],
}
# Registrar venta
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(producto["id"], req.cantidad, producto["precio"], "PicoClaw Bot"),
)
# Actualizar stock
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(req.cantidad, producto["id"]),
)
conn.commit()
# Obtener stock actualizado
cursor.execute("SELECT stock FROM productos WHERE id = ?", (producto["id"],))
nuevo_stock = cursor.fetchone()["stock"]
conn.close()
return {
"success": True,
"producto": producto["nombre"],
"cantidad": req.cantidad,
"precio_unitario": producto["precio"],
"total": req.cantidad * producto["precio"],
"stock_anterior": producto["stock"],
"stock_nuevo": nuevo_stock,
}
# ============================================================================
# FIN TOOLS PICOCLAW
# ============================================================================
@app.get("/stats/ventas")
async def ventas_stats(periodo: str = "hoy"):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if periodo == "hoy":
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE date(timestamp) = date('now')
""")
elif periodo == "semana":
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE timestamp >= datetime('now', '-7 days')
""")
else:
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE timestamp >= datetime('now', '-30 days')
""")
row = cursor.fetchone()
conn.close()
return {
"periodo": periodo,
"total_ventas": row[0] or 0,
"total_pesos": row[1] or 0,
"ticket_promedio": row[2] or 0,
}
@app.get("/")
async def root(request: Request):
if templates:
return templates.TemplateResponse(
"chat.html", {"request": request, "nombre_negocio": "Demo Librería"}
)
return HTMLResponse("""
<html>
<head>
<title>PymesBot - Demo</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; padding: 20px; }
.chat-box { background: white; border-radius: 12px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); height: 70vh; display: flex; flex-direction: column; }
.chat-header { padding: 16px 20px; border-bottom: 1px solid #eee; display: flex; align-items: center; gap: 12px; }
.chat-header h1 { font-size: 18px; color: #333; }
.chat-messages { flex: 1; overflow-y: auto; padding: 20px; }
.message { margin-bottom: 16px; max-width: 80%; }
.message.bot { margin-right: auto; }
.message.user { margin-left: auto; text-align: right; }
.message .bubble { display: inline-block; padding: 12px 16px; border-radius: 18px; }
.message.bot .bubble { background: #f0f0f0; color: #333; }
.message.user .bubble { background: #007bff; color: white; }
.typing { color: #888; font-style: italic; padding: 8px; }
.chat-input { padding: 16px 20px; border-top: 1px solid #eee; display: flex; gap: 12px; }
.chat-input input { flex: 1; padding: 12px 16px; border: 1px solid #ddd; border-radius: 24px; outline: none; font-size: 16px; }
.chat-input button { padding: 12px 24px; background: #007bff; color: white; border: none; border-radius: 24px; cursor: pointer; font-size: 16px; }
.chat-input button:hover { background: #0056b3; }
.sidebar { background: white; border-radius: 12px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
.sidebar h2 { font-size: 16px; margin-bottom: 12px; color: #333; }
.stat { display: flex; justify-content: space-between; padding: 8px 0; border-bottom: 1px solid #eee; }
.stat-value { font-weight: 600; color: #007bff; }
</style>
</head>
<body>
<div class="container">
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div class="chat-box">
<div class="chat-header">
<h1>🛒 Demo Librería</h1>
</div>
<div class="chat-messages" id="messages">
<div class="message bot">
<div class="bubble">¡Hola! Soy el asistente de ventas de Demo Librería. ¿Qué estás buscando?</div>
</div>
</div>
<div class="chat-input">
<input type="text" id="msgInput" placeholder="Escribí tu consulta..." autocomplete="off">
<button onclick="sendMessage()">Enviar</button>
</div>
</div>
<div>
<div class="sidebar">
<h2>📊 Hoy</h2>
<div class="stat">
<span>Ventas</span>
<span class="stat-value" id="ventas-hoy">0</span>
</div>
<div class="stat">
<span>Total</span>
<span class="stat-value" id="total-hoy">$0</span>
</div>
</div>
<div class="sidebar">
<h2>⚠️ Stock bajo</h2>
<div id="alertas-stock"></div>
</div>
</div>
</div>
</div>
<script>
let ws = null;
const sessionId = Math.random().toString(36).substring(7);
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(`${protocol}//${window.location.host}/chat/ws/${sessionId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.msg) {
addMessage(data.msg, 'bot');
}
if (data.stats) {
document.getElementById('ventas-hoy').textContent = data.stats.total_ventas;
document.getElementById('total-hoy').textContent = '$' + data.stats.total_pesos.toLocaleString();
}
};
}
function addMessage(text, type) {
const div = document.createElement('div');
div.className = `message ${type}`;
div.innerHTML = `<div class="bubble">${text}</div>`;
document.getElementById('messages').appendChild(div);
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
}
function sendMessage() {
const input = document.getElementById('msgInput');
const text = input.value.trim();
if (!text || !ws) return;
addMessage(text, 'user');
ws.send(JSON.stringify({ msg: text, session_id: sessionId }));
input.value = '';
}
document.getElementById('msgInput').addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
connect();
fetch('/stats/ventas?periodo=hoy')
.then(r => r.json())
.then(data => {
document.getElementById('ventas-hoy').textContent = data.total_ventas;
document.getElementById('total-hoy').textContent = '$' + data.total_pesos.toLocaleString();
});
</script>
</body>
</html>
""")
chat_sessions = {}
@app.websocket("/chat/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
await websocket.accept()
logger.info(f"[WS] Accepted connection for session: {session_id}")
if session_id not in chat_sessions:
chat_sessions[session_id] = []
try:
while True:
data = await websocket.receive_json()
logger.info(f"[WS] Received: {data}")
mensaje = data.get("msg", "")
session_id = data.get("session_id", session_id)
await websocket.send_json({"typing": True})
logger.info(f"[WS] Processing message: {mensaje}")
try:
# Usar AI con tools para respuestas inteligentes
logger.info(f"[WS] Processing: {mensaje}")
respuesta = await chat_with_ai(mensaje, session_id)
chat_sessions[session_id].append({"role": "user", "content": mensaje})
chat_sessions[session_id].append(
{"role": "assistant", "content": respuesta}
)
await websocket.send_json({"typing": False})
await websocket.send_json({"msg": respuesta})
logger.info("[WS] Response sent")
except Exception as e:
logger.error(f"[WS] Error: {e}")
await websocket.send_json({"typing": False})
await websocket.send_json(
{"msg": "Ups, algo salió mal. ¿Podés repetir la consulta?"}
)
except WebSocketDisconnect:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)