diff --git a/README.md b/README.md index 1cf7173..70e06ac 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,23 @@ picoclaw gateway Config file: `~/.picoclaw/config.json` +### Workspace Layout + +PicoClaw stores data in your configured workspace (default: `~/.picoclaw/workspace`): + +``` +~/.picoclaw/workspace/ +├── sessions/ # Conversation sessions and history +├── memory/ # Long-term memory (MEMORY.md) +├── cron/ # Scheduled jobs database +├── skills/ # Custom skills +├── AGENTS.md # Agent behavior guide +├── IDENTITY.md # Agent identity +├── SOUL.md # Agent soul +├── TOOLS.md # Tool descriptions +└── USER.md # User preferences +``` + ### Providers > [!NOTE] @@ -452,6 +469,18 @@ picoclaw agent -m "Hello" | `picoclaw agent` | Interactive chat mode | | `picoclaw gateway` | Start the gateway | | `picoclaw status` | Show status | +| `picoclaw cron list` | List all scheduled jobs | +| `picoclaw cron add ...` | Add a scheduled job | + +### Scheduled Tasks / Reminders + +PicoClaw supports scheduled reminders and recurring tasks through the `cron` tool: + +- **One-time reminders**: "Remind me in 10 minutes" → triggers once after 10min +- **Recurring tasks**: "Remind me every 2 hours" → triggers every 2 hours +- **Cron expressions**: "Remind me at 9am daily" → uses cron expression + +Jobs are stored in `~/.picoclaw/workspace/cron/` and processed automatically. ## 🤝 Contribute & Roadmap diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 5cdd6a7..40c9ba7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -22,6 +22,7 @@ import ( "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" + "github.com/sipeed/picoclaw/pkg/utils" ) type AgentLoop struct { @@ -38,6 +39,17 @@ type AgentLoop struct { summarizing sync.Map // Tracks which sessions are currently being summarized } +// processOptions configures how a message is processed +type processOptions struct { + SessionKey string // Session identifier for history/context + Channel string // Target channel for tool execution + ChatID string // Target chat ID for tool execution + UserMessage string // User message content (may include prefix) + DefaultResponse string // Response when LLM returns empty + EnableSummary bool // Whether to trigger summarization + SendResponse bool // Whether to send response via bus +} + func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { workspace := cfg.WorkspacePath() os.MkdirAll(workspace, 0755) @@ -151,7 +163,7 @@ func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sess func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { // Add message preview to log - preview := truncate(msg.Content, 80) + preview := utils.Truncate(msg.Content, 80) logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), map[string]interface{}{ "channel": msg.Channel, @@ -165,193 +177,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return al.processSystemMessage(ctx, msg) } - // Update tool contexts - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(msg.Channel, msg.ChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(msg.Channel, msg.ChatID) - } - } - - history := al.sessions.GetHistory(msg.SessionKey) - summary := al.sessions.GetSummary(msg.SessionKey) - - messages := al.contextBuilder.BuildMessages( - history, - summary, - msg.Content, - nil, - msg.Channel, - msg.ChatID, - ) - - // Save user message to session - al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) - - iteration := 0 - var finalContent string - - for iteration < al.maxIterations { - iteration++ - - logger.DebugCF("agent", "LLM iteration", - map[string]interface{}{ - "iteration": iteration, - "max": al.maxIterations, - }) - - toolDefs := al.tools.GetDefinitions() - providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) - for _, td := range toolDefs { - providerToolDefs = append(providerToolDefs, providers.ToolDefinition{ - Type: td["type"].(string), - Function: providers.ToolFunctionDefinition{ - Name: td["function"].(map[string]interface{})["name"].(string), - Description: td["function"].(map[string]interface{})["description"].(string), - Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}), - }, - }) - } - - // Log LLM request details - logger.DebugCF("agent", "LLM request", - map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, - "system_prompt_len": len(messages[0].Content), - }) - - // Log full messages (detailed) - logger.DebugCF("agent", "Full LLM request", - map[string]interface{}{ - "iteration": iteration, - "messages_json": formatMessagesForLog(messages), - "tools_json": formatToolsForLog(providerToolDefs), - }) - - response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ - "max_tokens": 8192, - "temperature": 0.7, - }) - - if err != nil { - logger.ErrorCF("agent", "LLM call failed", - map[string]interface{}{ - "iteration": iteration, - "error": err.Error(), - }) - return "", fmt.Errorf("LLM call failed: %w", err) - } - - if len(response.ToolCalls) == 0 { - finalContent = response.Content - logger.InfoCF("agent", "LLM response without tool calls (direct answer)", - map[string]interface{}{ - "iteration": iteration, - "content_chars": len(finalContent), - }) - break - } - - toolNames := make([]string, 0, len(response.ToolCalls)) - for _, tc := range response.ToolCalls { - toolNames = append(toolNames, tc.Name) - } - logger.InfoCF("agent", "LLM requested tool calls", - map[string]interface{}{ - "tools": toolNames, - "count": len(toolNames), - "iteration": iteration, - }) - - assistantMsg := providers.Message{ - Role: "assistant", - Content: response.Content, - } - - for _, tc := range response.ToolCalls { - argumentsJSON, _ := json.Marshal(tc.Arguments) - assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ - ID: tc.ID, - Type: "function", - Function: &providers.FunctionCall{ - Name: tc.Name, - Arguments: string(argumentsJSON), - }, - }) - } - messages = append(messages, assistantMsg) - - // Save assistant message with tool calls to session - al.sessions.AddFullMessage(msg.SessionKey, assistantMsg) - - for _, tc := range response.ToolCalls { - // Log tool call with arguments preview - argsJSON, _ := json.Marshal(tc.Arguments) - argsPreview := truncate(string(argsJSON), 200) - logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), - map[string]interface{}{ - "tool": tc.Name, - "iteration": iteration, - }) - - result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) - if err != nil { - result = fmt.Sprintf("Error: %v", err) - } - - toolResultMsg := providers.Message{ - Role: "tool", - Content: result, - ToolCallID: tc.ID, - } - messages = append(messages, toolResultMsg) - - // Save tool result message to session - al.sessions.AddFullMessage(msg.SessionKey, toolResultMsg) - } - } - - if finalContent == "" { - finalContent = "I've completed processing but have no response to give." - } - - // Save final assistant message to session - al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) - - // Context compression: Check if we need to summarize - // Trigger if history > 20 messages OR estimated tokens > 75% of context window - newHistory := al.sessions.GetHistory(msg.SessionKey) - tokenEstimate := al.estimateTokens(newHistory) - threshold := al.contextWindow * 75 / 100 - - if len(newHistory) > 20 || tokenEstimate > threshold { - if _, loading := al.summarizing.LoadOrStore(msg.SessionKey, true); !loading { - go func() { - defer al.summarizing.Delete(msg.SessionKey) - al.summarizeSession(msg.SessionKey) - }() - } - } - - // Log response preview - responsePreview := truncate(finalContent, 120) - logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview), - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - return finalContent, nil + // Process as user message + return al.runAgentLoop(ctx, processOptions{ + SessionKey: msg.SessionKey, + Channel: msg.Channel, + ChatID: msg.ChatID, + UserMessage: msg.Content, + DefaultResponse: "I've completed processing but have no response to give.", + EnableSummary: true, + SendResponse: false, + }) } func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { @@ -380,39 +215,96 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Use the origin session for context sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) - // Update tool contexts to original channel/chatID - if tool, ok := al.tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - mt.SetContext(originChannel, originChatID) - } - } - if tool, ok := al.tools.Get("spawn"); ok { - if st, ok := tool.(*tools.SpawnTool); ok { - st.SetContext(originChannel, originChatID) - } - } + // Process as system message with routing back to origin + return al.runAgentLoop(ctx, processOptions{ + SessionKey: sessionKey, + Channel: originChannel, + ChatID: originChatID, + UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content), + DefaultResponse: "Background task completed.", + EnableSummary: false, + SendResponse: true, // Send response back to original channel + }) +} - // Build messages with the announce content - history := al.sessions.GetHistory(sessionKey) - summary := al.sessions.GetSummary(sessionKey) +// runAgentLoop is the core message processing logic. +// It handles context building, LLM calls, tool execution, and response handling. +func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) { + // 1. Update tool contexts + al.updateToolContexts(opts.Channel, opts.ChatID) + + // 2. Build messages + history := al.sessions.GetHistory(opts.SessionKey) + summary := al.sessions.GetSummary(opts.SessionKey) messages := al.contextBuilder.BuildMessages( history, summary, - msg.Content, + opts.UserMessage, nil, - originChannel, - originChatID, + opts.Channel, + opts.ChatID, ) - // Save user message to session with system message marker - al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) + // 3. Save user message to session + al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) + // 4. Run LLM iteration loop + finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts) + if err != nil { + return "", err + } + + // 5. Handle empty response + if finalContent == "" { + finalContent = opts.DefaultResponse + } + + // 6. Save final assistant message to session + al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent) + al.sessions.Save(al.sessions.GetOrCreate(opts.SessionKey)) + + // 7. Optional: summarization + if opts.EnableSummary { + al.maybeSummarize(opts.SessionKey) + } + + // 8. Optional: send response via bus + if opts.SendResponse { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: finalContent, + }) + } + + // 9. Log response + responsePreview := utils.Truncate(finalContent, 120) + logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview), + map[string]interface{}{ + "session_key": opts.SessionKey, + "iterations": iteration, + "final_length": len(finalContent), + }) + + return finalContent, nil +} + +// runLLMIteration executes the LLM call loop with tool handling. +// Returns the final content, iteration count, and any error. +func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) { iteration := 0 var finalContent string for iteration < al.maxIterations { iteration++ + logger.DebugCF("agent", "LLM iteration", + map[string]interface{}{ + "iteration": iteration, + "max": al.maxIterations, + }) + + // Build tool definitions toolDefs := al.tools.GetDefinitions() providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) for _, td := range toolDefs { @@ -429,12 +321,12 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Log LLM request details logger.DebugCF("agent", "LLM request", map[string]interface{}{ - "iteration": iteration, - "model": al.model, - "messages_count": len(messages), - "tools_count": len(providerToolDefs), - "max_tokens": 8192, - "temperature": 0.7, + "iteration": iteration, + "model": al.model, + "messages_count": len(messages), + "tools_count": len(providerToolDefs), + "max_tokens": 8192, + "temperature": 0.7, "system_prompt_len": len(messages[0].Content), }) @@ -446,30 +338,49 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "tools_json": formatToolsForLog(providerToolDefs), }) + // Call LLM response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ "max_tokens": 8192, "temperature": 0.7, }) if err != nil { - logger.ErrorCF("agent", "LLM call failed in system message", + logger.ErrorCF("agent", "LLM call failed", map[string]interface{}{ "iteration": iteration, "error": err.Error(), }) - return "", fmt.Errorf("LLM call failed: %w", err) + return "", iteration, fmt.Errorf("LLM call failed: %w", err) } + // Check if no tool calls - we're done if len(response.ToolCalls) == 0 { finalContent = response.Content + logger.InfoCF("agent", "LLM response without tool calls (direct answer)", + map[string]interface{}{ + "iteration": iteration, + "content_chars": len(finalContent), + }) break } + // Log tool calls + toolNames := make([]string, 0, len(response.ToolCalls)) + for _, tc := range response.ToolCalls { + toolNames = append(toolNames, tc.Name) + } + logger.InfoCF("agent", "LLM requested tool calls", + map[string]interface{}{ + "tools": toolNames, + "count": len(toolNames), + "iteration": iteration, + }) + + // Build assistant message with tool calls assistantMsg := providers.Message{ Role: "assistant", Content: response.Content, } - for _, tc := range response.ToolCalls { argumentsJSON, _ := json.Marshal(tc.Arguments) assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ @@ -484,10 +395,20 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe messages = append(messages, assistantMsg) // Save assistant message with tool calls to session - al.sessions.AddFullMessage(sessionKey, assistantMsg) + al.sessions.AddFullMessage(opts.SessionKey, assistantMsg) + // Execute tool calls for _, tc := range response.ToolCalls { - result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, msg.Channel, msg.ChatID) + // Log tool call with arguments preview + argsJSON, _ := json.Marshal(tc.Arguments) + argsPreview := utils.Truncate(string(argsJSON), 200) + logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + map[string]interface{}{ + "tool": tc.Name, + "iteration": iteration, + }) + + result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID) if err != nil { result = fmt.Sprintf("Error: %v", err) } @@ -500,46 +421,41 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe messages = append(messages, toolResultMsg) // Save tool result message to session - al.sessions.AddFullMessage(sessionKey, toolResultMsg) + al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg) } } - if finalContent == "" { - finalContent = "Background task completed." - } - - // Save final assistant message to session - al.sessions.AddMessage(sessionKey, "assistant", finalContent) - al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) - - logger.InfoCF("agent", "System message processing completed", - map[string]interface{}{ - "iterations": iteration, - "final_length": len(finalContent), - }) - - // Send response back to the original channel - al.bus.PublishOutbound(bus.OutboundMessage{ - Channel: originChannel, - ChatID: originChatID, - Content: finalContent, - }) - - return finalContent, nil + return finalContent, iteration, nil } -// truncate returns a truncated version of s with at most maxLen characters. -// If the string is truncated, "..." is appended to indicate truncation. -// If the string fits within maxLen, it is returned unchanged. -func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s +// updateToolContexts updates the context for tools that need channel/chatID info. +func (al *AgentLoop) updateToolContexts(channel, chatID string) { + if tool, ok := al.tools.Get("message"); ok { + if mt, ok := tool.(*tools.MessageTool); ok { + mt.SetContext(channel, chatID) + } } - // Reserve 3 chars for "..." - if maxLen <= 3 { - return s[:maxLen] + if tool, ok := al.tools.Get("spawn"); ok { + if st, ok := tool.(*tools.SpawnTool); ok { + st.SetContext(channel, chatID) + } + } +} + +// maybeSummarize triggers summarization if the session history exceeds thresholds. +func (al *AgentLoop) maybeSummarize(sessionKey string) { + newHistory := al.sessions.GetHistory(sessionKey) + tokenEstimate := al.estimateTokens(newHistory) + threshold := al.contextWindow * 75 / 100 + + if len(newHistory) > 20 || tokenEstimate > threshold { + if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading { + go func() { + defer al.summarizing.Delete(sessionKey) + al.summarizeSession(sessionKey) + }() + } } - return s[:maxLen-3] + "..." } // GetStartupInfo returns information about loaded tools and skills for logging. @@ -574,12 +490,12 @@ func formatMessagesForLog(messages []providers.Message) string { for _, tc := range msg.ToolCalls { result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name) if tc.Function != nil { - result += fmt.Sprintf(" Arguments: %s\n", truncateString(tc.Function.Arguments, 200)) + result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200)) } } } if msg.Content != "" { - content := truncateString(msg.Content, 200) + content := utils.Truncate(msg.Content, 200) result += fmt.Sprintf(" Content: %s\n", content) } if msg.ToolCallID != "" { @@ -603,24 +519,13 @@ func formatToolsForLog(tools []providers.ToolDefinition) string { result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name) result += fmt.Sprintf(" Description: %s\n", tool.Function.Description) if len(tool.Function.Parameters) > 0 { - result += fmt.Sprintf(" Parameters: %s\n", truncateString(fmt.Sprintf("%v", tool.Function.Parameters), 200)) + result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200)) } } result += "]" return result } -// truncateString truncates a string to max length -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - if maxLen <= 3 { - return s[:maxLen] - } - return s[:maxLen-3] + "..." -} - // summarizeSession summarizes the conversation history for a session. func (al *AgentLoop) summarizeSession(sessionKey string) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 87aaf35..53570a3 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -8,15 +8,9 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/cron" + "github.com/sipeed/picoclaw/pkg/utils" ) -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - return s[:maxLen] -} - // JobExecutor is the interface for executing cron jobs through the agent type JobExecutor interface { ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) @@ -171,8 +165,11 @@ func (t *CronTool) addJob(args map[string]interface{}) (string, error) { deliver = d } + // Truncate message for job name (max 30 chars) + messagePreview := utils.Truncate(message, 30) + job, err := t.cronService.AddJob( - truncateString(message, 30), + messagePreview, schedule, message, deliver,