diff --git a/.gitignore b/.gitignore index dacb665..6ad4d78 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# Binaries bin/ *.exe *.dll @@ -5,12 +6,21 @@ bin/ *.dylib *.test *.out +/picoclaw +/picoclaw-test + +# Picoclaw specific .picoclaw/ config.json sessions/ +build/ + +# Coverage coverage.txt coverage.html -.DS_Store -build -picoclaw +# OS +.DS_Store + +# Ralph workspace +ralph/ diff --git a/README.md b/README.md index 534e5aa..9778918 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,23 @@ picoclaw gateway Config file: `~/.picoclaw/config.json` +### Workspace Layout + +PicoClaw stores data in your configured workspace (default: `~/.picoclaw/workspace`): + +``` +~/.picoclaw/workspace/ +├── sessions/ # Conversation sessions and history +├── memory/ # Long-term memory (MEMORY.md) +├── cron/ # Scheduled jobs database +├── skills/ # Custom skills +├── AGENTS.md # Agent behavior guide +├── IDENTITY.md # Agent identity +├── SOUL.md # Agent soul +├── TOOLS.md # Tool descriptions +└── USER.md # User preferences +``` + ### Providers > [!NOTE] @@ -452,6 +469,18 @@ picoclaw agent -m "Hello" | `picoclaw agent` | Interactive chat mode | | `picoclaw gateway` | Start the gateway | | `picoclaw status` | Show status | +| `picoclaw cron list` | List all scheduled jobs | +| `picoclaw cron add ...` | Add a scheduled job | + +### Scheduled Tasks / Reminders + +PicoClaw supports scheduled reminders and recurring tasks through the `cron` tool: + +- **One-time reminders**: "Remind me in 10 minutes" → triggers once after 10min +- **Recurring tasks**: "Remind me every 2 hours" → triggers every 2 hours +- **Cron expressions**: "Remind me at 9am daily" → uses cron expression + +Jobs are stored in `~/.picoclaw/workspace/cron/` and processed automatically. ## 🤝 Contribute & Roadmap diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 751cdda..c14ec58 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -27,6 +27,7 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" + "github.com/sipeed/picoclaw/pkg/tools" "github.com/sipeed/picoclaw/pkg/voice" ) @@ -550,8 +551,8 @@ func gatewayCmd() { "skills_available": skillsInfo["available"], }) - cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") - cronService := cron.NewCronService(cronStorePath, nil) + // Setup cron tool and service + cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath()) heartbeatService := heartbeat.NewHeartbeatService( cfg.WorkspacePath(), @@ -689,6 +690,25 @@ func getConfigPath() string { return filepath.Join(home, ".picoclaw", "config.json") } +func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) *cron.CronService { + cronStorePath := filepath.Join(workspace, "cron", "jobs.json") + + // Create cron service + cronService := cron.NewCronService(cronStorePath, nil) + + // Create and register CronTool + cronTool := tools.NewCronTool(cronService, agentLoop, msgBus) + agentLoop.RegisterTool(cronTool) + + // Set the onJob handler + cronService.SetOnJob(func(job *cron.CronJob) (string, error) { + result := cronTool.ExecuteJob(context.Background(), job) + return result, nil + }) + + return cronService +} + func loadConfig() (*config.Config, error) { return config.LoadConfig(getConfigPath()) } @@ -701,8 +721,14 @@ func cronCmd() { subcommand := os.Args[2] - dataDir := filepath.Join(filepath.Dir(getConfigPath()), "cron") - cronStorePath := filepath.Join(dataDir, "jobs.json") + // Load config to get workspace path + cfg, err := loadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + + cronStorePath := filepath.Join(cfg.WorkspacePath(), "cron", "jobs.json") switch subcommand { case "list": @@ -745,7 +771,7 @@ func cronHelp() { func cronListCmd(storePath string) { cs := cron.NewCronService(storePath, nil) - jobs := cs.ListJobs(false) + jobs := cs.ListJobs(true) // Show all jobs, including disabled if len(jobs) == 0 { fmt.Println("No scheduled jobs.") diff --git a/go.mod b/go.mod index 23cfa0e..832f1e8 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sipeed/picoclaw go 1.24.0 require ( + github.com/adhocore/gronx v1.19.6 github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.3.1 github.com/chzyer/readline v1.5.1 diff --git a/go.sum b/go.sum index 2f9d5be..f1ce926 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/adhocore/gronx v1.19.6 h1:5KNVcoR9ACgL9HhEqCm5QXsab/gI4QDIybTAWcXDKDc= +github.com/adhocore/gronx v1.19.6/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/bwmarrin/discordgo v0.29.0 h1:FmWeXFaKUwrcL3Cx65c20bTRW+vOb6k8AnaP+EgjDno= github.com/bwmarrin/discordgo v0.29.0/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 7e8612e..e737fbd 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -11,13 +11,14 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" + "github.com/sipeed/picoclaw/pkg/tools" ) type ContextBuilder struct { workspace string skillsLoader *skills.SkillsLoader memory *MemoryStore - toolsSummary func() []string // Function to get tool summaries dynamically + tools *tools.ToolRegistry // Direct reference to tool registry } func getGlobalConfigDir() string { @@ -28,9 +29,9 @@ func getGlobalConfigDir() string { return filepath.Join(home, ".picoclaw") } -func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *ContextBuilder { - // builtin skills: 当前项目的 skills 目录 - // 使用当前工作目录下的 skills/ 目录 +func NewContextBuilder(workspace string) *ContextBuilder { + // builtin skills: skills directory in current project + // Use the skills/ directory under the current working directory wd, _ := os.Getwd() builtinSkillsDir := filepath.Join(wd, "skills") globalSkillsDir := filepath.Join(getGlobalConfigDir(), "skills") @@ -39,10 +40,14 @@ func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *Cont workspace: workspace, skillsLoader: skills.NewSkillsLoader(workspace, globalSkillsDir, builtinSkillsDir), memory: NewMemoryStore(workspace), - toolsSummary: toolsSummaryFunc, } } +// SetToolsRegistry sets the tools registry for dynamic tool summary generation. +func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) { + cb.tools = registry +} + func (cb *ContextBuilder) getIdentity() string { now := time.Now().Format("2006-01-02 15:04 (Monday)") workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace)) @@ -69,23 +74,29 @@ Your workspace is at: %s %s -Always be helpful, accurate, and concise. When using tools, explain what you're doing. -When remembering something, write to %s/memory/MEMORY.md`, +## Important Rules + +1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it. + +2. **Be helpful and accurate** - When using tools, briefly explain what you're doing. + +3. **Memory** - When remembering something, write to %s/memory/MEMORY.md`, now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) } func (cb *ContextBuilder) buildToolsSection() string { - if cb.toolsSummary == nil { + if cb.tools == nil { return "" } - summaries := cb.toolsSummary() + summaries := cb.tools.GetSummaries() if len(summaries) == 0 { return "" } var sb strings.Builder sb.WriteString("## Available Tools\n\n") + sb.WriteString("**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n") sb.WriteString("You have access to the following tools:\n\n") for _, s := range summaries { sb.WriteString(s) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d38848b..40c9ba7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -13,6 +13,8 @@ import ( "os" "path/filepath" "strings" + "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -20,6 +22,7 @@ import ( "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" + "github.com/sipeed/picoclaw/pkg/utils" ) type AgentLoop struct { @@ -27,11 +30,24 @@ type AgentLoop struct { provider providers.LLMProvider workspace string model string + contextWindow int // Maximum context window size in tokens maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder tools *tools.ToolRegistry running bool + summarizing sync.Map // Tracks which sessions are currently being summarized +} + +// processOptions configures how a message is processed +type processOptions struct { + SessionKey string // Session identifier for history/context + Channel string // Target channel for tool execution + ChatID string // Target chat ID for tool execution + UserMessage string // User message content (may include prefix) + DefaultResponse string // Response when LLM returns empty + EnableSummary bool // Whether to trigger summarization + SendResponse bool // Whether to send response via bus } func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { @@ -69,18 +85,24 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers editFileTool := tools.NewEditFileTool(workspace) toolsRegistry.Register(editFileTool) - sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) + sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions")) + + // Create context builder and set tools registry + contextBuilder := NewContextBuilder(workspace) + contextBuilder.SetToolsRegistry(toolsRegistry) return &AgentLoop{ bus: msgBus, provider: provider, workspace: workspace, model: cfg.Agents.Defaults.Model, + contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, - contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), + contextBuilder: contextBuilder, tools: toolsRegistry, running: false, + summarizing: sync.Map{}, } } @@ -119,11 +141,19 @@ func (al *AgentLoop) Stop() { al.running = false } +func (al *AgentLoop) RegisterTool(tool tools.Tool) { + al.tools.Register(tool) +} + func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) { + return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct") +} + +func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) { msg := bus.InboundMessage{ - Channel: "cli", - SenderID: "user", - ChatID: "direct", + Channel: channel, + SenderID: "cron", + ChatID: chatID, Content: content, SessionKey: sessionKey, } @@ -133,7 +163,7 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { // Add message preview to log - preview := truncate(msg.Content, 80) + preview := utils.Truncate(msg.Content, 80) logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), map[string]interface{}{ "channel": msg.Channel, @@ -147,169 +177,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return al.processSystemMessage(ctx, msg) } - // Update tool contexts - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(msg.Channel, msg.ChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(msg.Channel, msg.ChatID) - } - } - - history := al.sessions.GetHistory(msg.SessionKey) - summary := al.sessions.GetSummary(msg.SessionKey) - - messages := al.contextBuilder.BuildMessages( - history, - summary, - msg.Content, - nil, - msg.Channel, - msg.ChatID, - ) - - iteration := 0 - var finalContent string - - for iteration < al.maxIterations { - iteration++ - - logger.DebugCF("agent", "LLM iteration", - map[string]interface{}{ - "iteration": iteration, - "max": al.maxIterations, - }) - - toolDefs := al.tools.GetDefinitions() - providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) - for _, td := range toolDefs { - providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ - Type: td["type"].(string), - Function: providers.ToolFunctionDefinition{ - Name: td["function"].(map[string]interface{})["name"].(string), - Description: td["function"].(map[string]interface{})["description"].(string), - Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}), - }, - }) - } - - // Log LLM request details - logger.DebugCF("agent", "LLM request", - map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, - "system_prompt_len": len(messages[0].Content), - }) - - // Log full messages (detailed) - logger.DebugCF("agent", "Full LLM request", - map[string]interface{}{ - "iteration": iteration, - "messages_json": formatMessagesForLog(messages), - "tools_json": formatToolsForLog(providerToolDefs), - }) - - response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ - "max_tokens": 8192, - "temperature": 0.7, - }) - - if err != nil { - logger.ErrorCF("agent", "LLM call failed", - map[string]interface{}{ - "iteration": iteration, - "error": err.Error(), - }) - return "", fmt.Errorf("LLM call failed: %w", err) - } - - if len(response.ToolCalls) == 0 { - finalContent = response.Content - logger.InfoCF("agent", "LLM response without tool calls (direct answer)", - map[string]interface{}{ - "iteration": iteration, - "content_chars": len(finalContent), - }) - break - } - - toolNames := make([]string, 0, len(response.ToolCalls)) - for _, tc := range response.ToolCalls { - toolNames = append(toolNames, tc.Name) - } - logger.InfoCF("agent", "LLM requested tool calls", - map[string]interface{}{ - "tools": toolNames, - "count": len(toolNames), - "iteration": iteration, - }) - - assistantMsg := providers.Message{ - Role: "assistant", - Content: response.Content, - } - - for _, tc := range response.ToolCalls { - argumentsJSON, _ := json.Marshal(tc.Arguments) - assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ - ID: tc.ID, - Type: "function", - Function: &providers.FunctionCall{ - Name: tc.Name, - Arguments: string(argumentsJSON), - }, - }) - } - messages = append(messages, assistantMsg) - - for _, tc := range response.ToolCalls { - // Log tool call with arguments preview - argsJSON, _ := json.Marshal(tc.Arguments) - argsPreview := truncate(string(argsJSON), 200) - logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), - map[string]interface{}{ - "tool": tc.Name, - "iteration": iteration, - }) - - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) - if err != nil { - result = fmt.Sprintf("Error: %v", err) - } - - toolResultMsg := providers.Message{ - Role: "tool", - Content: result, - ToolCallID: tc.ID, - } - messages = append(messages, toolResultMsg) - } - } - - if finalContent == "" { - finalContent = "I've completed processing but have no response to give." - } - - al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) - al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) - - // Log response preview - responsePreview := truncate(finalContent, 120) - logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview), - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - return finalContent, nil + // Process as user message + return al.runAgentLoop(ctx, processOptions{ + SessionKey: msg.SessionKey, + Channel: msg.Channel, + ChatID: msg.ChatID, + UserMessage: msg.Content, + DefaultResponse: "I've completed processing but have no response to give.", + EnableSummary: true, + SendResponse: false, + }) } func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { @@ -338,36 +215,96 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Use the origin session for context sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) - // Update tool contexts to original channel/chatID - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(originChannel, originChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(originChannel, originChatID) - } - } + // Process as system message with routing back to origin + return al.runAgentLoop(ctx, processOptions{ + SessionKey: sessionKey, + Channel: originChannel, + ChatID: originChatID, + UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content), + DefaultResponse: "Background task completed.", + EnableSummary: false, + SendResponse: true, // Send response back to original channel + }) +} - // Build messages with the announce content - history := al.sessions.GetHistory(sessionKey) - summary := al.sessions.GetSummary(sessionKey) +// runAgentLoop is the core message processing logic. +// It handles context building, LLM calls, tool execution, and response handling. +func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) { + // 1. Update tool contexts + al.updateToolContexts(opts.Channel, opts.ChatID) + + // 2. Build messages + history := al.sessions.GetHistory(opts.SessionKey) + summary := al.sessions.GetSummary(opts.SessionKey) messages := al.contextBuilder.BuildMessages( history, summary, - msg.Content, + opts.UserMessage, nil, - originChannel, - originChatID, + opts.Channel, + opts.ChatID, ) + // 3. Save user message to session + al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) + + // 4. Run LLM iteration loop + finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts) + if err != nil { + return "", err + } + + // 5. Handle empty response + if finalContent == "" { + finalContent = opts.DefaultResponse + } + + // 6. Save final assistant message to session + al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent) + al.sessions.Save(al.sessions.GetOrCreate(opts.SessionKey)) + + // 7. Optional: summarization + if opts.EnableSummary { + al.maybeSummarize(opts.SessionKey) + } + + // 8. Optional: send response via bus + if opts.SendResponse { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: finalContent, + }) + } + + // 9. Log response + responsePreview := utils.Truncate(finalContent, 120) + logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview), + map[string]interface{}{ + "session_key": opts.SessionKey, + "iterations": iteration, + "final_length": len(finalContent), + }) + + return finalContent, nil +} + +// runLLMIteration executes the LLM call loop with tool handling. +// Returns the final content, iteration count, and any error. +func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) { iteration := 0 var finalContent string for iteration < al.maxIterations { iteration++ + logger.DebugCF("agent", "LLM iteration", + map[string]interface{}{ + "iteration": iteration, + "max": al.maxIterations, + }) + + // Build tool definitions toolDefs := al.tools.GetDefinitions() providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) for _, td := range toolDefs { @@ -384,12 +321,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Log LLM request details logger.DebugCF("agent", "LLM request", map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, + "iteration": iteration, + "model": al.model, + "messages_count": len(messages), + "tools_count": len(providerToolDefs), + "max_tokens": 8192, + "temperature": 0.7, "system_prompt_len": len(messages[0].Content), }) @@ -401,30 +338,49 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "tools_json": formatToolsForLog(providerToolDefs), }) + // Call LLM response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ "max_tokens": 8192, "temperature": 0.7, }) if err != nil { - logger.ErrorCF("agent", "LLM call failed in system message", + logger.ErrorCF("agent", "LLM call failed", map[string]interface{}{ "iteration": iteration, "error": err.Error(), }) - return "", fmt.Errorf("LLM call failed: %w", err) + return "", iteration, fmt.Errorf("LLM call failed: %w", err) } + // Check if no tool calls - we're done if len(response.ToolCalls) == 0 { finalContent = response.Content + logger.InfoCF("agent", "LLM response without tool calls (direct answer)", + map[string]interface{}{ + "iteration": iteration, + "content_chars": len(finalContent), + }) break } + // Log tool calls + toolNames := make([]string, 0, len(response.ToolCalls)) + for _, tc := range response.ToolCalls { + toolNames = append(toolNames, tc.Name) + } + logger.InfoCF("agent", "LLM requested tool calls", + map[string]interface{}{ + "tools": toolNames, + "count": len(toolNames), + "iteration": iteration, + }) + + // Build assistant message with tool calls assistantMsg := providers.Message{ Role: "assistant", Content: response.Content, } - for _, tc := range response.ToolCalls { argumentsJSON, _ := json.Marshal(tc.Arguments) assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ @@ -438,8 +394,21 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe } messages = append(messages, assistantMsg) + // Save assistant message with tool calls to session + al.sessions.AddFullMessage(opts.SessionKey, assistantMsg) + + // Execute tool calls for _, tc := range response.ToolCalls { - result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) + // Log tool call with arguments preview + argsJSON, _ := json.Marshal(tc.Arguments) + argsPreview := utils.Truncate(string(argsJSON), 200) + logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + map[string]interface{}{ + "tool": tc.Name, + "iteration": iteration, + }) + + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -450,39 +419,43 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + + // Save tool result message to session + al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg) } } - if finalContent == "" { - finalContent = "Background task completed." - } - - // Save to session with system message marker - al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) - al.sessions.AddMessage(sessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) - - logger.InfoCF("agent", "System message processing completed", - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - return finalContent, nil + return finalContent, iteration, nil } -// truncate returns a truncated version of s with at most maxLen characters. -// If the string is truncated, "..." is appended to indicate truncation. -// If the string fits within maxLen, it is returned unchanged. -func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s +// updateToolContexts updates the context for tools that need channel/chatID info. +func (al *AgentLoop) updateToolContexts(channel, chatID string) { + if tool, ok := al.tools.Get("message"); ok { + if mt, ok := tool.(*tools.MessageTool); ok { + mt.SetContext(channel, chatID) + } } - // Reserve 3 chars for "..." - if maxLen <= 3 { - return s[:maxLen] + if tool, ok := al.tools.Get("spawn"); ok { + if st, ok := tool.(*tools.SpawnTool); ok { + st.SetContext(channel, chatID) + } + } +} + +// maybeSummarize triggers summarization if the session history exceeds thresholds. +func (al *AgentLoop) maybeSummarize(sessionKey string) { + newHistory := al.sessions.GetHistory(sessionKey) + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(sessionKey) + al.summarizeSession(sessionKey) + }() + } } - return s[:maxLen-3] + "..." } // GetStartupInfo returns information about loaded tools and skills for logging. @@ -517,12 +490,12 @@ func formatMessagesForLog(messages []providers.Message) string { for _, tc := range msg.ToolCalls { result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name) if tc.Function != nil { - result += fmt.Sprintf(" Arguments: %s\n", truncateString(tc.Function.Arguments, 200)) + result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200)) } } } if msg.Content != "" { - content := truncateString(msg.Content, 200) + content := utils.Truncate(msg.Content, 200) result += fmt.Sprintf(" Content: %s\n", content) } if msg.ToolCallID != "" { @@ -546,20 +519,114 @@ func formatToolsForLog(tools []providers.ToolDefinition) string { result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name) result += fmt.Sprintf(" Description: %s\n", tool.Function.Description) if len(tool.Function.Parameters) > 0 { - result += fmt.Sprintf(" Parameters: %s\n", truncateString(fmt.Sprintf("%v", tool.Function.Parameters), 200)) + result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200)) } } result += "]" return result } -// truncateString truncates a string to max length -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s +// summarizeSession summarizes the conversation history for a session. +func (al *AgentLoop) summarizeSession(sessionKey string) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + history := al.sessions.GetHistory(sessionKey) + summary := al.sessions.GetSummary(sessionKey) + + // Keep last 4 messages for continuity + if len(history) <= 4 { + return } - if maxLen <= 3 { - return s[:maxLen] + + toSummarize := history[:len(history)-4] + + // Oversized Message Guard + // Skip messages larger than 50% of context window to prevent summarizer overflow + maxMessageTokens := al.contextWindow / 2 + validMessages := make([]providers.Message, 0) + omitted := false + + for _, m := range toSummarize { + if m.Role != "user" && m.Role != "assistant" { + continue + } + // Estimate tokens for this message + msgTokens := len(m.Content) / 4 + if msgTokens > maxMessageTokens { + omitted = true + continue + } + validMessages = append(validMessages, m) + } + + if len(validMessages) == 0 { + return + } + + // Multi-Part Summarization + // Split into two parts if history is significant + var finalSummary string + if len(validMessages) > 10 { + mid := len(validMessages) / 2 + part1 := validMessages[:mid] + part2 := validMessages[mid:] + + s1, _ := al.summarizeBatch(ctx, part1, "") + s2, _ := al.summarizeBatch(ctx, part2, "") + + // Merge them + mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2) + resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err == nil { + finalSummary = resp.Content + } else { + finalSummary = s1 + " " + s2 + } + } else { + finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary) + } + + if omitted && finalSummary != "" { + finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]" + } + + if finalSummary != "" { + al.sessions.SetSummary(sessionKey, finalSummary) + al.sessions.TruncateHistory(sessionKey, 4) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) } - return s[:maxLen-3] + "..." +} + +// summarizeBatch summarizes a batch of messages. +func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) { + prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n" + if existingSummary != "" { + prompt += "Existing context: " + existingSummary + "\n" + } + prompt += "\nCONVERSATION:\n" + for _, m := range batch { + prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content) + } + + response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{ + "max_tokens": 1024, + "temperature": 0.3, + }) + if err != nil { + return "", err + } + return response.Content, nil +} + +// estimateTokens estimates the number of tokens in a message list. +func (al *AgentLoop) estimateTokens(messages []providers.Message) int { + total := 0 + for _, m := range messages { + total += len(m.Content) / 4 // Simple heuristic: 4 chars per token + } + return total } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 5361191..3ade400 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -61,7 +61,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st return } - // 生成 SessionKey: channel:chatID + // Build session key: channel:chatID sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) msg := bus.InboundMessage{ @@ -70,8 +70,8 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st ChatID: chatID, Content: content, Media: media, - Metadata: metadata, SessionKey: sessionKey, + Metadata: metadata, } c.bus.PublishInbound(msg) diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 54f9dcc..9434ed8 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -1,12 +1,17 @@ package cron import ( + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" + "log" "os" "path/filepath" "sync" "time" + + "github.com/adhocore/gronx" ) type CronSchedule struct { @@ -58,6 +63,7 @@ type CronService struct { mu sync.RWMutex running bool stopChan chan struct{} + gronx *gronx.Gronx } func NewCronService(storePath string, onJob JobHandler) *CronService { @@ -65,7 +71,9 @@ func NewCronService(storePath string, onJob JobHandler) *CronService { storePath: storePath, onJob: onJob, stopChan: make(chan struct{}), + gronx: gronx.New(), } + // Initialize and load store on creation cs.loadStore() return cs } @@ -83,7 +91,7 @@ func (cs *CronService) Start() error { } cs.recomputeNextRuns() - if err := cs.saveStore(); err != nil { + if err := cs.saveStoreUnsafe(); err != nil { return fmt.Errorf("failed to save store: %w", err) } @@ -120,30 +128,49 @@ func (cs *CronService) runLoop() { } func (cs *CronService) checkJobs() { - cs.mu.RLock() + cs.mu.Lock() + if !cs.running { - cs.mu.RUnlock() + cs.mu.Unlock() return } now := time.Now().UnixMilli() var dueJobs []*CronJob + // Collect jobs that are due (we need to copy them to execute outside lock) for i := range cs.store.Jobs { job := &cs.store.Jobs[i] if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { - dueJobs = append(dueJobs, job) + // Create a shallow copy of the job for execution + jobCopy := *job + dueJobs = append(dueJobs, &jobCopy) } } - cs.mu.RUnlock() + // Update next run times for due jobs immediately (before executing) + // Use map for O(n) lookup instead of O(n²) nested loop + dueMap := make(map[string]bool, len(dueJobs)) + for _, job := range dueJobs { + dueMap[job.ID] = true + } + for i := range cs.store.Jobs { + if dueMap[cs.store.Jobs[i].ID] { + // Reset NextRunAtMS temporarily so we don't re-execute + cs.store.Jobs[i].State.NextRunAtMS = nil + } + } + + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store: %v", err) + } + + cs.mu.Unlock() + + // Execute jobs outside the lock for _, job := range dueJobs { cs.executeJob(job) } - - cs.mu.Lock() - defer cs.mu.Unlock() - cs.saveStore() } func (cs *CronService) executeJob(job *CronJob) { @@ -154,30 +181,42 @@ func (cs *CronService) executeJob(job *CronJob) { _, err = cs.onJob(job) } + // Now acquire lock to update state cs.mu.Lock() defer cs.mu.Unlock() - job.State.LastRunAtMS = &startTime - job.UpdatedAtMS = time.Now().UnixMilli() + // Find the job in store and update it + for i := range cs.store.Jobs { + if cs.store.Jobs[i].ID == job.ID { + cs.store.Jobs[i].State.LastRunAtMS = &startTime + cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli() - if err != nil { - job.State.LastStatus = "error" - job.State.LastError = err.Error() - } else { - job.State.LastStatus = "ok" - job.State.LastError = "" + if err != nil { + cs.store.Jobs[i].State.LastStatus = "error" + cs.store.Jobs[i].State.LastError = err.Error() + } else { + cs.store.Jobs[i].State.LastStatus = "ok" + cs.store.Jobs[i].State.LastError = "" + } + + // Compute next run time + if cs.store.Jobs[i].Schedule.Kind == "at" { + if cs.store.Jobs[i].DeleteAfterRun { + cs.removeJobUnsafe(job.ID) + } else { + cs.store.Jobs[i].Enabled = false + cs.store.Jobs[i].State.NextRunAtMS = nil + } + } else { + nextRun := cs.computeNextRun(&cs.store.Jobs[i].Schedule, time.Now().UnixMilli()) + cs.store.Jobs[i].State.NextRunAtMS = nextRun + } + break + } } - if job.Schedule.Kind == "at" { - if job.DeleteAfterRun { - cs.removeJobUnsafe(job.ID) - } else { - job.Enabled = false - job.State.NextRunAtMS = nil - } - } else { - nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) - job.State.NextRunAtMS = nextRun + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store: %v", err) } } @@ -197,6 +236,23 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6 return &next } + if schedule.Kind == "cron" { + if schedule.Expr == "" { + return nil + } + + // Use gronx to calculate next run time + now := time.UnixMilli(nowMS) + nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false) + if err != nil { + log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err) + return nil + } + + nextMS := nextTime.UnixMilli() + return &nextMS + } + return nil } @@ -223,9 +279,17 @@ func (cs *CronService) getNextWakeMS() *int64 { } func (cs *CronService) Load() error { + cs.mu.Lock() + defer cs.mu.Unlock() return cs.loadStore() } +func (cs *CronService) SetOnJob(handler JobHandler) { + cs.mu.Lock() + defer cs.mu.Unlock() + cs.onJob = handler +} + func (cs *CronService) loadStore() error { cs.store = &CronStore{ Version: 1, @@ -243,7 +307,7 @@ func (cs *CronService) loadStore() error { return json.Unmarshal(data, cs.store) } -func (cs *CronService) saveStore() error { +func (cs *CronService) saveStoreUnsafe() error { dir := filepath.Dir(cs.storePath) if err := os.MkdirAll(dir, 0755); err != nil { return err @@ -263,6 +327,9 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string now := time.Now().UnixMilli() + // One-time tasks (at) should be deleted after execution + deleteAfterRun := (schedule.Kind == "at") + job := CronJob{ ID: generateID(), Name: name, @@ -280,11 +347,11 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string }, CreatedAtMS: now, UpdatedAtMS: now, - DeleteAfterRun: false, + DeleteAfterRun: deleteAfterRun, } cs.store.Jobs = append(cs.store.Jobs, job) - if err := cs.saveStore(); err != nil { + if err := cs.saveStoreUnsafe(); err != nil { return nil, err } @@ -310,7 +377,9 @@ func (cs *CronService) removeJobUnsafe(jobID string) bool { removed := len(cs.store.Jobs) < before if removed { - cs.saveStore() + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store after remove: %v", err) + } } return removed @@ -332,7 +401,9 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { job.State.NextRunAtMS = nil } - cs.saveStore() + if err := cs.saveStoreUnsafe(); err != nil { + log.Printf("[cron] failed to save store after enable: %v", err) + } return job } } @@ -377,5 +448,11 @@ func (cs *CronService) Status() map[string]interface{} { } func generateID() string { - return fmt.Sprintf("%d", time.Now().UnixNano()) + // Use crypto/rand for better uniqueness under concurrent access + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { + // Fallback to time-based if crypto/rand fails + return fmt.Sprintf("%d", time.Now().UnixNano()) + } + return hex.EncodeToString(b) } diff --git a/pkg/session/manager.go b/pkg/session/manager.go index df86724..b4b8257 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -59,6 +59,15 @@ func (sm *SessionManager) GetOrCreate(key string) *Session { } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { + sm.AddFullMessage(sessionKey, providers.Message{ + Role: role, + Content: content, + }) +} + +// AddFullMessage adds a complete message with tool calls and tool call ID to the session. +// This is used to save the full conversation flow including tool calls and tool results. +func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Message) { sm.mu.Lock() defer sm.mu.Unlock() @@ -72,10 +81,7 @@ func (sm *SessionManager) AddMessage(sessionKey, role, content string) { sm.sessions[sessionKey] = session } - session.Messages = append(session.Messages, providers.Message{ - Role: role, - Content: content, - }) + session.Messages = append(session.Messages, msg) session.Updated = time.Now() } diff --git a/pkg/tools/base.go b/pkg/tools/base.go index 1bf53f7..095ac69 100644 --- a/pkg/tools/base.go +++ b/pkg/tools/base.go @@ -9,6 +9,13 @@ type Tool interface { Execute(ctx context.Context, args map[string]interface{}) (string, error) } +// ContextualTool is an optional interface that tools can implement +// to receive the current message context (channel, chatID) +type ContextualTool interface { + Tool + SetContext(channel, chatID string) +} + func ToolToSchema(tool Tool) map[string]interface{} { return map[string]interface{}{ "type": "function", diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go new file mode 100644 index 0000000..53570a3 --- /dev/null +++ b/pkg/tools/cron.go @@ -0,0 +1,284 @@ +package tools + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/cron" + "github.com/sipeed/picoclaw/pkg/utils" +) + +// JobExecutor is the interface for executing cron jobs through the agent +type JobExecutor interface { + ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) +} + +// CronTool provides scheduling capabilities for the agent +type CronTool struct { + cronService *cron.CronService + executor JobExecutor + msgBus *bus.MessageBus + channel string + chatID string + mu sync.RWMutex +} + +// NewCronTool creates a new CronTool +func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus) *CronTool { + return &CronTool{ + cronService: cronService, + executor: executor, + msgBus: msgBus, + } +} + +// Name returns the tool name +func (t *CronTool) Name() string { + return "cron" +} + +// Description returns the tool description +func (t *CronTool) Description() string { + return "Schedule reminders and tasks. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am)." +} + +// Parameters returns the tool parameters schema +func (t *CronTool) Parameters() map[string]interface{} { + return map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "action": map[string]interface{}{ + "type": "string", + "enum": []string{"add", "list", "remove", "enable", "disable"}, + "description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.", + }, + "message": map[string]interface{}{ + "type": "string", + "description": "The reminder/task message to display when triggered (required for add)", + }, + "at_seconds": map[string]interface{}{ + "type": "integer", + "description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.", + }, + "every_seconds": map[string]interface{}{ + "type": "integer", + "description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.", + }, + "cron_expr": map[string]interface{}{ + "type": "string", + "description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.", + }, + "job_id": map[string]interface{}{ + "type": "string", + "description": "Job ID (for remove/enable/disable)", + }, + "deliver": map[string]interface{}{ + "type": "boolean", + "description": "If true, send message directly to channel. If false, let agent process the message (for complex tasks). Default: true", + }, + }, + "required": []string{"action"}, + } +} + +// SetContext sets the current session context for job creation +func (t *CronTool) SetContext(channel, chatID string) { + t.mu.Lock() + defer t.mu.Unlock() + t.channel = channel + t.chatID = chatID +} + +// Execute runs the tool with given arguments +func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) { + action, ok := args["action"].(string) + if !ok { + return "", fmt.Errorf("action is required") + } + + switch action { + case "add": + return t.addJob(args) + case "list": + return t.listJobs() + case "remove": + return t.removeJob(args) + case "enable": + return t.enableJob(args, true) + case "disable": + return t.enableJob(args, false) + default: + return "", fmt.Errorf("unknown action: %s", action) + } +} + +func (t *CronTool) addJob(args map[string]interface{}) (string, error) { + t.mu.RLock() + channel := t.channel + chatID := t.chatID + t.mu.RUnlock() + + if channel == "" || chatID == "" { + return "Error: no session context (channel/chat_id not set). Use this tool in an active conversation.", nil + } + + message, ok := args["message"].(string) + if !ok || message == "" { + return "Error: message is required for add", nil + } + + var schedule cron.CronSchedule + + // Check for at_seconds (one-time), every_seconds (recurring), or cron_expr + atSeconds, hasAt := args["at_seconds"].(float64) + everySeconds, hasEvery := args["every_seconds"].(float64) + cronExpr, hasCron := args["cron_expr"].(string) + + // Priority: at_seconds > every_seconds > cron_expr + if hasAt { + atMS := time.Now().UnixMilli() + int64(atSeconds)*1000 + schedule = cron.CronSchedule{ + Kind: "at", + AtMS: &atMS, + } + } else if hasEvery { + everyMS := int64(everySeconds) * 1000 + schedule = cron.CronSchedule{ + Kind: "every", + EveryMS: &everyMS, + } + } else if hasCron { + schedule = cron.CronSchedule{ + Kind: "cron", + Expr: cronExpr, + } + } else { + return "Error: one of at_seconds, every_seconds, or cron_expr is required", nil + } + + // Read deliver parameter, default to true + deliver := true + if d, ok := args["deliver"].(bool); ok { + deliver = d + } + + // Truncate message for job name (max 30 chars) + messagePreview := utils.Truncate(message, 30) + + job, err := t.cronService.AddJob( + messagePreview, + schedule, + message, + deliver, + channel, + chatID, + ) + if err != nil { + return fmt.Sprintf("Error adding job: %v", err), nil + } + + return fmt.Sprintf("Created job '%s' (id: %s)", job.Name, job.ID), nil +} + +func (t *CronTool) listJobs() (string, error) { + jobs := t.cronService.ListJobs(false) + + if len(jobs) == 0 { + return "No scheduled jobs.", nil + } + + result := "Scheduled jobs:\n" + for _, j := range jobs { + var scheduleInfo string + if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil { + scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000) + } else if j.Schedule.Kind == "cron" { + scheduleInfo = j.Schedule.Expr + } else if j.Schedule.Kind == "at" { + scheduleInfo = "one-time" + } else { + scheduleInfo = "unknown" + } + result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo) + } + + return result, nil +} + +func (t *CronTool) removeJob(args map[string]interface{}) (string, error) { + jobID, ok := args["job_id"].(string) + if !ok || jobID == "" { + return "Error: job_id is required for remove", nil + } + + if t.cronService.RemoveJob(jobID) { + return fmt.Sprintf("Removed job %s", jobID), nil + } + return fmt.Sprintf("Job %s not found", jobID), nil +} + +func (t *CronTool) enableJob(args map[string]interface{}, enable bool) (string, error) { + jobID, ok := args["job_id"].(string) + if !ok || jobID == "" { + return "Error: job_id is required for enable/disable", nil + } + + job := t.cronService.EnableJob(jobID, enable) + if job == nil { + return fmt.Sprintf("Job %s not found", jobID), nil + } + + status := "enabled" + if !enable { + status = "disabled" + } + return fmt.Sprintf("Job '%s' %s", job.Name, status), nil +} + +// ExecuteJob executes a cron job through the agent +func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { + // Get channel/chatID from job payload + channel := job.Payload.Channel + chatID := job.Payload.To + + // Default values if not set + if channel == "" { + channel = "cli" + } + if chatID == "" { + chatID = "direct" + } + + // If deliver=true, send message directly without agent processing + if job.Payload.Deliver { + t.msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: job.Payload.Message, + }) + return "ok" + } + + // For deliver=false, process through agent (for complex tasks) + sessionKey := fmt.Sprintf("cron-%s", job.ID) + + // Call agent with the job's message + response, err := t.executor.ProcessDirectWithChannel( + ctx, + job.Payload.Message, + sessionKey, + channel, + chatID, + ) + + if err != nil { + return fmt.Sprintf("Error: %v", err) + } + + // Response is automatically sent via MessageBus by AgentLoop + _ = response // Will be sent by AgentLoop + return "ok" +} diff --git a/pkg/tools/registry.go b/pkg/tools/registry.go index d181944..a769664 100644 --- a/pkg/tools/registry.go +++ b/pkg/tools/registry.go @@ -34,6 +34,10 @@ func (r *ToolRegistry) Get(name string) (Tool, bool) { } func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) (string, error) { + return r.ExecuteWithContext(ctx, name, args, "", "") +} + +func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string) (string, error) { logger.InfoCF("tool", "Tool execution started", map[string]interface{}{ "tool": name, @@ -49,6 +53,11 @@ func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string return "", fmt.Errorf("tool '%s' not found", name) } + // If tool implements ContextualTool, set context + if contextualTool, ok := tool.(ContextualTool); ok && channel != "" && chatID != "" { + contextualTool.SetContext(channel, chatID) + } + start := time.Now() result, err := tool.Execute(ctx, args) duration := time.Since(start) diff --git a/pkg/utils/string.go b/pkg/utils/string.go new file mode 100644 index 0000000..0d9837c --- /dev/null +++ b/pkg/utils/string.go @@ -0,0 +1,16 @@ +package utils + +// Truncate returns a truncated version of s with at most maxLen runes. +// Handles multi-byte Unicode characters properly. +// If the string is truncated, "..." is appended to indicate truncation. +func Truncate(s string, maxLen int) string { + runes := []rune(s) + if len(runes) <= maxLen { + return s + } + // Reserve 3 chars for "..." + if maxLen <= 3 { + return string(runes[:maxLen]) + } + return string(runes[:maxLen-3]) + "..." +}