Files
demo/pymesbot/backend/main.py
Renato 53b632c07c Agrega verificación de stock para ventas de kits múltiples
- Nueva tool verificar_stock_kit() para calcular cuántos kits se pueden armar
- Nueva función db_verificar_stock_kit() que verifica stock de todos los productos
- Calcula kits_posibles basado en el producto con menor stock relativo
- Actualiza skill armar_kits.md con proceso de verificación obligatoria
- Ahora advierte antes de vender: 'Solo podés armar X kits, pediste Y'
- Prevents ventas parciales no deseadas de kits
2026-02-15 22:48:48 +01:00

1333 lines
45 KiB
Python

import json
import os
import sqlite3
import logging
import unicodedata
from datetime import datetime
from pathlib import Path
from typing import Optional, List
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def normalize_text(text: str) -> str:
"""Remove accents and normalize text for searching"""
# Normalize to NFKD and remove combining characters (accents)
normalized = unicodedata.normalize("NFKD", text)
return "".join(c for c in normalized if not unicodedata.combining(c))
from fastapi import (
FastAPI,
WebSocket,
WebSocketDisconnect,
HTTPException,
Request,
)
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
import httpx
# Anthropic API Configuration (Z.ai)
ANTHROPIC_API_KEY = "6fef8efda3d24eb9ad3d718daf1ae9a1.RcFc7QPe5uZLr2mS"
ANTHROPIC_BASE_URL = "https://api.z.ai/api/anthropic"
# Skills directory
SKILLS_DIR = Path(__file__).parent.parent / "skills"
def load_skills() -> str:
"""Carga todos los skills disponibles y los combina en un string"""
skills_content = []
if SKILLS_DIR.exists():
for skill_file in sorted(SKILLS_DIR.glob("*.md")):
try:
with open(skill_file, "r", encoding="utf-8") as f:
content = f.read()
skills_content.append(
f"\n{'=' * 60}\nSKILL: {skill_file.stem}\n{'=' * 60}\n{content}"
)
except Exception as e:
logger.warning(f"Error cargando skill {skill_file}: {e}")
return "\n\n".join(skills_content) if skills_content else ""
# Base system prompt
BASE_SYSTEM_PROMPT = """Sos el asistente de ventas de Demo Librería, una librería escolar en Argentina.
## REGLA CRÍTICA - SIEMPRE USAR HERRAMIENTAS
**NUNCA respondas de memoria.** Antes de dar cualquier información sobre productos, precios o stock, DEBÉS usar una herramienta.
## HERRAMIENTAS DISPONIBLES
1. `buscar_productos` - Busca productos específicos por nombre
Úsala cuando el usuario mencione un producto específico: "cuadernos", "lápices", "biromes", etc.
2. `listar_todo_el_stock` - Muestra TODO el inventario completo
Úsala cuando el usuario quiera ver todo: "mostrame todo", "qué tienen", "stock"
3. `confirmar_venta` - Registra una venta
Úsala solo cuando el usuario confirme que se vendió algo
## INSTRUCCIONES IMPORTANTES
1. **NUNCA inventar precios.** Siempre usar herramientas primero.
2. **MÚLTIPLES PRODUCTOS**: Cuando pidan varios productos ("bloc, lápices y fibras"), buscar CADA UNO por separado.
3. Si no hay stock, ofrecer alternativas.
4. Ser conciso (máximo 3-4 líneas).
5. Usar español argentino: "vos", "tenés", "querés".
6. Preguntar siempre al final: "¿Se concretó la venta?"
## MAPEO DE SINÓNIMOS
- "fibras" → buscar "marcadores" o "colores"
- "bloc" → buscar "papel"
- "lapices de colores" → buscar "caja colores"
- "biromes" → buscar "birome"
## EJEMPLOS
Usuario: "bloc de hojas, lápices de 12 y fibras"
Acción:
1. buscar_productos("bloc")
2. buscar_productos("caja colores")
3. buscar_productos("fibras")
Respuesta: Resultados de los 3 productos"""
# Combinar base prompt con skills
SKILLS_CONTENT = load_skills()
SYSTEM_PROMPT = f"{BASE_SYSTEM_PROMPT}\n\n{SKILLS_CONTENT}"
def db_buscar_productos(query: str) -> list:
"""Busca productos en la base de datos"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query_normalized = normalize_text(query.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(marca), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(categoria), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
ORDER BY stock DESC
LIMIT 10
""",
(search_term, search_term, search_term),
)
productos = cursor.fetchall()
conn.close()
return [
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
for p in productos
]
def db_listar_todo_el_stock() -> dict:
"""Lista todo el inventario organizado por categorías"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND stock > 0
ORDER BY categoria, nombre
""")
productos = cursor.fetchall()
conn.close()
# Organizar por categorías
categorias = {}
for p in productos:
cat = p["categoria"]
if cat not in categorias:
categorias[cat] = []
categorias[cat].append(
{
"nombre": p["nombre"],
"marca": p["marca"] or "",
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"categorias": categorias, "total_productos": len(productos)}
def db_confirmar_venta(producto_nombre: str, cantidad: int) -> dict:
"""Registra una venta en la base de datos"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query_normalized = normalize_text(producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
conn.close()
return {"error": f"Producto no encontrado: {producto_nombre}"}
if producto["stock"] < cantidad:
conn.close()
return {
"error": f"Stock insuficiente. Stock actual: {producto['stock']}, solicitado: {cantidad}",
"producto": producto["nombre"],
"stock_disponible": producto["stock"],
}
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(producto["id"], cantidad, producto["precio"], "AI Bot"),
)
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(cantidad, producto["id"]),
)
conn.commit()
cursor.execute("SELECT stock FROM productos WHERE id = ?", (producto["id"],))
nuevo_stock = cursor.fetchone()["stock"]
conn.close()
return {
"success": True,
"producto": producto["nombre"],
"cantidad": cantidad,
"precio_unitario": producto["precio"],
"total": cantidad * producto["precio"],
"stock_nuevo": nuevo_stock,
}
def db_confirmar_venta_kit(items: list) -> dict:
"""Registra la venta de múltiples productos (un kit completo)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
resultados = []
total_venta = 0
errores = []
for item in items:
producto_nombre = item.get("producto_nombre", "")
cantidad = item.get("cantidad", 1)
query_normalized = normalize_text(producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
errores.append(f"Producto no encontrado: {producto_nombre}")
continue
if producto["stock"] < cantidad:
errores.append(
f"Stock insuficiente para {producto['nombre']}: tenemos {producto['stock']}, pidió {cantidad}"
)
continue
# Registrar venta
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(producto["id"], cantidad, producto["precio"], "AI Bot"),
)
# Actualizar stock
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(cantidad, producto["id"]),
)
subtotal = cantidad * producto["precio"]
total_venta += subtotal
resultados.append(
{
"producto": producto["nombre"],
"cantidad": cantidad,
"precio_unitario": producto["precio"],
"subtotal": subtotal,
}
)
conn.commit()
conn.close()
if errores and not resultados:
return {"error": "No se pudo completar la venta", "detalles": errores}
return {
"success": True,
"items_vendidos": len(resultados),
"total": total_venta,
"productos": resultados,
"errores": errores if errores else None,
}
def db_verificar_stock_kit(items: list, cantidad_kits: int) -> dict:
"""Verifica si hay stock suficiente para armar N kits completos"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
productos_info = []
kits_posibles = float("inf")
faltantes = []
for item in items:
producto_nombre = item.get("producto_nombre", "")
cantidad_por_kit = item.get("cantidad", 1)
query_normalized = normalize_text(producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT nombre, stock FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
faltantes.append(f"Producto no encontrado: {producto_nombre}")
continue
stock_disponible = producto["stock"]
kits_con_este_producto = stock_disponible // cantidad_por_kit
productos_info.append(
{
"nombre": producto["nombre"],
"stock": stock_disponible,
"cantidad_por_kit": cantidad_por_kit,
"kits_posibles": kits_con_este_producto,
}
)
# El límite es el producto con menos kits posibles
if kits_con_este_producto < kits_posibles:
kits_posibles = kits_con_este_producto
conn.close()
# Si no hay productos válidos
if not productos_info:
return {
"puede_vender": False,
"kits_solicitados": cantidad_kits,
"kits_posibles": 0,
"mensaje": "No se encontraron los productos del kit",
"detalles": faltantes,
}
# Si puede vender todos los kits solicitados
if kits_posibles >= cantidad_kits:
return {
"puede_vender": True,
"kits_solicitados": cantidad_kits,
"kits_posibles": cantidad_kits,
"mensaje": f"✅ Stock suficiente para {cantidad_kits} kits",
"productos": productos_info,
}
# Si no alcanza el stock
return {
"puede_vender": False,
"kits_solicitados": cantidad_kits,
"kits_posibles": kits_posibles,
"mensaje": f"⚠️ Solo podés armar {kits_posibles} kits completos (pediste {cantidad_kits})",
"productos": productos_info,
"sugerencia": f"Con el stock actual solo alcanza para {kits_posibles} kits. ¿Vendés {kits_posibles} o preferís ver otros productos?",
}
async def chat_with_ai(message: str, session_id: str = "pymesbot") -> Optional[str]:
"""Send message to Anthropic API with tools"""
# Definir las herramientas disponibles
tools = [
{
"name": "buscar_productos",
"description": "Busca productos específicos en el inventario por nombre o descripción. Usar SIEMPRE antes de dar información sobre productos.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Nombre o descripción del producto a buscar (ej: 'cuaderno', 'birome', 'lápices')",
}
},
"required": ["query"],
},
},
{
"name": "listar_todo_el_stock",
"description": "Muestra TODO el inventario completo de la librería organizado por categorías. Usar cuando el usuario quiera ver todo el stock disponible.",
"input_schema": {
"type": "object",
"properties": {},
},
},
{
"name": "confirmar_venta",
"description": "Registra la venta de UN solo producto. Usar cuando el usuario confirme que vendió un producto específico.",
"input_schema": {
"type": "object",
"properties": {
"producto_nombre": {
"type": "string",
"description": "Nombre exacto o similar del producto vendido",
},
"cantidad": {
"type": "integer",
"description": "Cantidad de unidades vendidas",
},
},
"required": ["producto_nombre", "cantidad"],
},
},
{
"name": "confirmar_venta_kit",
"description": "Registra la venta de MÚLTIPLES productos a la vez (un kit completo). Usar cuando el usuario confirme que vendió un kit o varios productos juntos.",
"input_schema": {
"type": "object",
"properties": {
"items": {
"type": "array",
"description": "Lista de productos vendidos con sus cantidades",
"items": {
"type": "object",
"properties": {
"producto_nombre": {
"type": "string",
"description": "Nombre del producto",
},
"cantidad": {
"type": "integer",
"description": "Cantidad vendida",
},
},
"required": ["producto_nombre", "cantidad"],
},
},
},
"required": ["items"],
},
},
{
"name": "verificar_stock_kit",
"description": "Verifica si hay stock suficiente para armar N kits completos ANTES de vender. Usar SIEMPRE cuando el usuario quiera vender múltiples kits para calcular cuántos se pueden armar con el stock actual.",
"input_schema": {
"type": "object",
"properties": {
"items": {
"type": "array",
"description": "Lista de productos que componen un kit",
"items": {
"type": "object",
"properties": {
"producto_nombre": {
"type": "string",
"description": "Nombre del producto",
},
"cantidad": {
"type": "integer",
"description": "Cantidad que lleva el kit de este producto",
},
},
"required": ["producto_nombre", "cantidad"],
},
},
"cantidad_kits": {
"type": "integer",
"description": "Cantidad de kits que quiere vender",
},
},
"required": ["items", "cantidad_kits"],
},
},
]
messages = [{"role": "user", "content": message}]
max_iterations = 5
async with httpx.AsyncClient(timeout=60.0) as client:
for iteration in range(max_iterations):
try:
response = await client.post(
f"{ANTHROPIC_BASE_URL}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"system": SYSTEM_PROMPT,
"messages": messages,
"tools": tools,
},
)
if response.status_code != 200:
logger.error(f"API error: {response.status_code} - {response.text}")
return (
f"Error al procesar la consulta. Por favor, intentá de nuevo."
)
result = response.json()
content = result.get("content", [])
# Verificar si hay tool calls
tool_calls = [
block for block in content if block.get("type") == "tool_use"
]
text_response = [
block for block in content if block.get("type") == "text"
]
if not tool_calls:
# No hay tool calls, devolver la respuesta de texto
if text_response:
return text_response[0].get(
"text", "No pude generar una respuesta."
)
return "No pude generar una respuesta."
# Procesar tool calls
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.get("name")
tool_input = tool_call.get("input", {})
if tool_name == "buscar_productos":
query = tool_input.get("query", "")
productos = db_buscar_productos(query)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps({"productos": productos}),
}
)
elif tool_name == "listar_todo_el_stock":
inventario = db_listar_todo_el_stock()
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(inventario),
}
)
elif tool_name == "confirmar_venta":
producto_nombre = tool_input.get("producto_nombre", "")
cantidad = tool_input.get("cantidad", 1)
resultado = db_confirmar_venta(producto_nombre, cantidad)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(resultado),
}
)
elif tool_name == "confirmar_venta_kit":
items = tool_input.get("items", [])
resultado = db_confirmar_venta_kit(items)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(resultado),
}
)
elif tool_name == "verificar_stock_kit":
items = tool_input.get("items", [])
cantidad_kits = tool_input.get("cantidad_kits", 1)
resultado = db_verificar_stock_kit(items, cantidad_kits)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_call.get("id"),
"content": json.dumps(resultado),
}
)
# Agregar el tool use y los resultados a los mensajes
messages.append({"role": "assistant", "content": json.dumps(content)})
messages.append({"role": "user", "content": json.dumps(tool_results)})
except Exception as e:
logger.error(f"Error in chat_with_ai: {e}")
return f"Error al procesar la consulta: {str(e)}"
# Si llegamos aquí, se agotaron las iteraciones
return "Lo siento, no pude completar la consulta en el tiempo permitido."
# Alias para compatibilidad
chat_with_openclaw = chat_with_ai
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "stock.db"
TEMPLATES_DIR = Path(__file__).parent / "templates"
def get_db():
if not DB_PATH.exists():
init_db()
return DB_PATH
def init_db():
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS productos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
marca TEXT,
categoria TEXT NOT NULL DEFAULT 'general',
precio REAL NOT NULL,
stock INTEGER NOT NULL DEFAULT 0,
variantes TEXT,
codigo TEXT,
activo INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS ventas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
producto_id INTEGER NOT NULL,
cantidad INTEGER NOT NULL,
precio_vendido REAL NOT NULL,
vendedor TEXT,
notas TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (producto_id) REFERENCES productos(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS config (
clave TEXT PRIMARY KEY,
valor TEXT NOT NULL
)
""")
default_config = [
("nombre_negocio", "Demo Librería"),
("moneda", "ARS"),
("moneda_simbolo", "$"),
("alerta_stock_minimo", "5"),
("vendedor_pin", "1234"),
("admin_password", "admin123"),
("rubro", "libreria"),
]
for clave, valor in default_config:
cursor.execute(
"INSERT OR IGNORE INTO config (clave, valor) VALUES (?, ?)", (clave, valor)
)
sample_products = [
(
"Birome Bic Cristal Azul",
"Bic",
"escritura",
850,
50,
'{"color": ["azul"]}',
"7501031311309",
),
(
"Birome Bic Cristal Rojo",
"Bic",
"escritura",
850,
30,
'{"color": ["rojo"]}',
"7501031311316",
),
(
"Birome Bic Cristal Negro",
"Bic",
"escritura",
850,
45,
'{"color": ["negro"]}',
"7501031311323",
),
(
"Cuaderno Rivadavia 48 Hojas",
"Rivadavia",
"cuadernos",
2500,
20,
'{"tipo": ["rayado", "blanco"]}',
None,
),
("Lápiz Faber Castell 2B", "Faber Castell", "escritura", 450, 100, None, None),
("Goma de borrar Staedtler", "Staedtler", "escritura", 320, 25, None, None),
("Regla 30cm", "Maped", "geometria", 650, 15, None, None),
("Compás Prisma", "Prisma", "geometria", 2500, 8, None, None),
("Caja de colores 12", "Maped", "colores", 3200, 18, None, None),
("Papel glasé x 20", "Laprida", "colores", 850, 40, None, None),
]
for nombre, marca, categoria, precio, stock, variantes, codigo in sample_products:
v = variantes if variantes else None
cursor.execute(
"""
INSERT INTO productos (nombre, marca, categoria, precio, stock, variantes, codigo)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(nombre, marca, categoria, precio, stock, v, codigo),
)
conn.commit()
conn.close()
class BuscarStockRequest(BaseModel):
query: str
limit: int = 5
class ConfirmarVentaRequest(BaseModel):
producto_id: int
cantidad: int
precio_vendido: float
vendedor: str = "vendedor"
class LoginRequest(BaseModel):
pin: str
app = FastAPI(title="PymesBot Backend")
if TEMPLATES_DIR.exists():
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
else:
templates = None
@app.on_event("startup")
async def startup():
init_db()
@app.get("/health")
async def health():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT valor FROM config WHERE clave = 'nombre_negocio'")
nombre = cursor.fetchone()
conn.close()
return {"status": "ok", "negocio": nombre[0] if nombre else "Demo"}
@app.get("/stock/search")
async def buscar_stock(q: str, limit: int = 5):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
q_lower = q.lower()
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
LOWER(nombre) LIKE ? OR
LOWER(marca) LIKE ? OR
LOWER(categoria) LIKE ?
)
ORDER BY
CASE
WHEN LOWER(nombre) LIKE ? THEN 1
ELSE 2
END,
stock DESC
LIMIT ?
""",
(f"%{q_lower}%", f"%{q_lower}%", f"%{q_lower}%", f"%{q_lower}%", limit),
)
rows = cursor.fetchall()
resultados = []
for row in rows:
resultados.append(
{
"id": row["id"],
"nombre": row["nombre"],
"marca": row["marca"],
"categoria": row["categoria"],
"precio": row["precio"],
"precio_formateado": f"${row['precio']:.0f}",
"stock": row["stock"],
"variantes": json.loads(row["variantes"]) if row["variantes"] else {},
"hay_stock": row["stock"] > 0,
"promo_activa": None,
}
)
conn.close()
return {"resultados": resultados, "total": len(resultados), "query": q}
@app.get("/stock/producto/{producto_id}")
async def get_producto(producto_id: int):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM productos WHERE id = ?", (producto_id,))
row = cursor.fetchone()
conn.close()
if not row:
raise HTTPException(
status_code=404, detail=f"Producto con id {producto_id} no encontrado"
)
return {
"id": row["id"],
"nombre": row["nombre"],
"marca": row["marca"],
"categoria": row["categoria"],
"precio": row["precio"],
"stock": row["stock"],
"variantes": json.loads(row["variantes"]) if row["variantes"] else {},
"codigo": row["codigo"],
"activo": bool(row["activo"]),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
@app.post("/venta/confirmar")
async def confirmar_venta(req: ConfirmarVentaRequest):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM productos WHERE id = ? AND activo = 1", (req.producto_id,)
)
producto = cursor.fetchone()
if not producto:
conn.close()
raise HTTPException(status_code=404, detail="Producto no encontrado")
if producto[4] < req.cantidad:
conn.close()
raise HTTPException(
status_code=400,
detail=f"Stock insuficiente. Stock actual: {producto[4]}, solicitado: {req.cantidad}",
)
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(req.producto_id, req.cantidad, req.precio_vendido, req.vendedor),
)
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(req.cantidad, req.producto_id),
)
conn.commit()
cursor.execute("SELECT stock FROM productos WHERE id = ?", (req.producto_id,))
nuevo_stock = cursor.fetchone()[0]
venta_id = cursor.lastrowid
conn.close()
return {
"venta_id": venta_id,
"producto": producto[1],
"cantidad": req.cantidad,
"precio_vendido": req.precio_vendido,
"total": req.cantidad * req.precio_vendido,
"stock_anterior": producto[4],
"stock_nuevo": nuevo_stock,
"timestamp": datetime.now().isoformat(),
}
# ============================================================================
# TOOLS PARA PICOCLAW
# ============================================================================
class BuscarProductosTool(BaseModel):
query: str
class ConfirmarVentaTool(BaseModel):
producto_nombre: str
cantidad: int
@app.get("/api/tools/listar_productos")
async def tool_listar_productos():
"""Tool para PicoClaw: Lista todos los productos disponibles"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND stock > 0
ORDER BY categoria, nombre
""")
productos = cursor.fetchall()
conn.close()
result = []
for p in productos:
result.append(
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"productos": result, "total": len(result)}
@app.post("/api/tools/buscar_productos")
async def tool_buscar_productos(req: BuscarProductosTool):
"""Tool para PicoClaw: Busca productos por nombre o descripción"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Normalizar búsqueda (quitar tildes)
query_normalized = normalize_text(req.query.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT id, nombre, marca, categoria, precio, stock
FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(marca), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ? OR
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(categoria), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
ORDER BY stock DESC
LIMIT 10
""",
(search_term, search_term, search_term),
)
productos = cursor.fetchall()
conn.close()
if not productos:
return {
"productos": [],
"mensaje": f"No se encontraron productos para: {req.query}",
}
result = []
for p in productos:
result.append(
{
"id": p["id"],
"nombre": p["nombre"],
"marca": p["marca"] or "",
"categoria": p["categoria"],
"precio": p["precio"],
"stock": p["stock"],
}
)
return {"productos": result, "query": req.query}
@app.post("/api/tools/confirmar_venta")
async def tool_confirmar_venta(req: ConfirmarVentaTool):
"""Tool para PicoClaw: Registra una venta y descuenta stock"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Buscar producto por nombre
query_normalized = normalize_text(req.producto_nombre.lower())
search_term = f"%{query_normalized}%"
cursor.execute(
"""
SELECT * FROM productos
WHERE activo = 1 AND (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER(nombre), 'á', 'a'), 'é', 'e'), 'í', 'i'), 'ó', 'o'), 'ú', 'u') LIKE ?
)
LIMIT 1
""",
(search_term,),
)
producto = cursor.fetchone()
if not producto:
conn.close()
return {"error": f"Producto no encontrado: {req.producto_nombre}"}
if producto["stock"] < req.cantidad:
conn.close()
return {
"error": f"Stock insuficiente. Stock actual: {producto['stock']}, solicitado: {req.cantidad}",
"producto": producto["nombre"],
"stock_disponible": producto["stock"],
}
# Registrar venta
cursor.execute(
"""
INSERT INTO ventas (producto_id, cantidad, precio_vendido, vendedor)
VALUES (?, ?, ?, ?)
""",
(producto["id"], req.cantidad, producto["precio"], "PicoClaw Bot"),
)
# Actualizar stock
cursor.execute(
"""
UPDATE productos SET stock = stock - ?, updated_at = datetime('now')
WHERE id = ?
""",
(req.cantidad, producto["id"]),
)
conn.commit()
# Obtener stock actualizado
cursor.execute("SELECT stock FROM productos WHERE id = ?", (producto["id"],))
nuevo_stock = cursor.fetchone()["stock"]
conn.close()
return {
"success": True,
"producto": producto["nombre"],
"cantidad": req.cantidad,
"precio_unitario": producto["precio"],
"total": req.cantidad * producto["precio"],
"stock_anterior": producto["stock"],
"stock_nuevo": nuevo_stock,
}
# ============================================================================
# FIN TOOLS PICOCLAW
# ============================================================================
@app.get("/stats/ventas")
async def ventas_stats(periodo: str = "hoy"):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if periodo == "hoy":
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE date(timestamp) = date('now')
""")
elif periodo == "semana":
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE timestamp >= datetime('now', '-7 days')
""")
else:
cursor.execute("""
SELECT COUNT(*), SUM(cantidad * precio_vendido), AVG(cantidad * precio_vendido)
FROM ventas
WHERE timestamp >= datetime('now', '-30 days')
""")
row = cursor.fetchone()
conn.close()
return {
"periodo": periodo,
"total_ventas": row[0] or 0,
"total_pesos": row[1] or 0,
"ticket_promedio": row[2] or 0,
}
@app.get("/")
async def root(request: Request):
if templates:
return templates.TemplateResponse(
"chat.html", {"request": request, "nombre_negocio": "Demo Librería"}
)
return HTMLResponse("""
<html>
<head>
<title>PymesBot - Demo</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; padding: 20px; }
.chat-box { background: white; border-radius: 12px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); height: 70vh; display: flex; flex-direction: column; }
.chat-header { padding: 16px 20px; border-bottom: 1px solid #eee; display: flex; align-items: center; gap: 12px; }
.chat-header h1 { font-size: 18px; color: #333; }
.chat-messages { flex: 1; overflow-y: auto; padding: 20px; }
.message { margin-bottom: 16px; max-width: 80%; }
.message.bot { margin-right: auto; }
.message.user { margin-left: auto; text-align: right; }
.message .bubble { display: inline-block; padding: 12px 16px; border-radius: 18px; }
.message.bot .bubble { background: #f0f0f0; color: #333; }
.message.user .bubble { background: #007bff; color: white; }
.typing { color: #888; font-style: italic; padding: 8px; }
.chat-input { padding: 16px 20px; border-top: 1px solid #eee; display: flex; gap: 12px; }
.chat-input input { flex: 1; padding: 12px 16px; border: 1px solid #ddd; border-radius: 24px; outline: none; font-size: 16px; }
.chat-input button { padding: 12px 24px; background: #007bff; color: white; border: none; border-radius: 24px; cursor: pointer; font-size: 16px; }
.chat-input button:hover { background: #0056b3; }
.sidebar { background: white; border-radius: 12px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
.sidebar h2 { font-size: 16px; margin-bottom: 12px; color: #333; }
.stat { display: flex; justify-content: space-between; padding: 8px 0; border-bottom: 1px solid #eee; }
.stat-value { font-weight: 600; color: #007bff; }
</style>
</head>
<body>
<div class="container">
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div class="chat-box">
<div class="chat-header">
<h1>🛒 Demo Librería</h1>
</div>
<div class="chat-messages" id="messages">
<div class="message bot">
<div class="bubble">¡Hola! Soy el asistente de ventas de Demo Librería. ¿Qué estás buscando?</div>
</div>
</div>
<div class="chat-input">
<input type="text" id="msgInput" placeholder="Escribí tu consulta..." autocomplete="off">
<button onclick="sendMessage()">Enviar</button>
</div>
</div>
<div>
<div class="sidebar">
<h2>📊 Hoy</h2>
<div class="stat">
<span>Ventas</span>
<span class="stat-value" id="ventas-hoy">0</span>
</div>
<div class="stat">
<span>Total</span>
<span class="stat-value" id="total-hoy">$0</span>
</div>
</div>
<div class="sidebar">
<h2>⚠️ Stock bajo</h2>
<div id="alertas-stock"></div>
</div>
</div>
</div>
</div>
<script>
let ws = null;
const sessionId = Math.random().toString(36).substring(7);
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(`${protocol}//${window.location.host}/chat/ws/${sessionId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.msg) {
addMessage(data.msg, 'bot');
}
if (data.stats) {
document.getElementById('ventas-hoy').textContent = data.stats.total_ventas;
document.getElementById('total-hoy').textContent = '$' + data.stats.total_pesos.toLocaleString();
}
};
}
function addMessage(text, type) {
const div = document.createElement('div');
div.className = `message ${type}`;
div.innerHTML = `<div class="bubble">${text}</div>`;
document.getElementById('messages').appendChild(div);
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
}
function sendMessage() {
const input = document.getElementById('msgInput');
const text = input.value.trim();
if (!text || !ws) return;
addMessage(text, 'user');
ws.send(JSON.stringify({ msg: text, session_id: sessionId }));
input.value = '';
}
document.getElementById('msgInput').addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
connect();
fetch('/stats/ventas?periodo=hoy')
.then(r => r.json())
.then(data => {
document.getElementById('ventas-hoy').textContent = data.total_ventas;
document.getElementById('total-hoy').textContent = '$' + data.total_pesos.toLocaleString();
});
</script>
</body>
</html>
""")
chat_sessions = {}
@app.websocket("/chat/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
await websocket.accept()
logger.info(f"[WS] Accepted connection for session: {session_id}")
if session_id not in chat_sessions:
chat_sessions[session_id] = []
try:
while True:
data = await websocket.receive_json()
logger.info(f"[WS] Received: {data}")
mensaje = data.get("msg", "")
session_id = data.get("session_id", session_id)
await websocket.send_json({"typing": True})
logger.info(f"[WS] Processing message: {mensaje}")
try:
# Usar AI con tools para respuestas inteligentes
logger.info(f"[WS] Processing: {mensaje}")
respuesta = await chat_with_ai(mensaje, session_id)
chat_sessions[session_id].append({"role": "user", "content": mensaje})
chat_sessions[session_id].append(
{"role": "assistant", "content": respuesta}
)
await websocket.send_json({"typing": False})
await websocket.send_json({"msg": respuesta})
logger.info("[WS] Response sent")
except Exception as e:
logger.error(f"[WS] Error: {e}")
await websocket.send_json({"typing": False})
await websocket.send_json(
{"msg": "Ups, algo salió mal. ¿Podés repetir la consulta?"}
)
except WebSocketDisconnect:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)