Merge pull request #23 from yinwm/fix/gateway-cron-tool

feat: Cron tool and agent integration
This commit is contained in:
lxowalle
2026-02-11 21:25:06 +08:00
committed by GitHub
14 changed files with 833 additions and 288 deletions

16
.gitignore vendored
View File

@@ -1,3 +1,4 @@
# Binaries
bin/ bin/
*.exe *.exe
*.dll *.dll
@@ -5,12 +6,21 @@ bin/
*.dylib *.dylib
*.test *.test
*.out *.out
/picoclaw
/picoclaw-test
# Picoclaw specific
.picoclaw/ .picoclaw/
config.json config.json
sessions/ sessions/
build/
# Coverage
coverage.txt coverage.txt
coverage.html coverage.html
.DS_Store
build
picoclaw # OS
.DS_Store
# Ralph workspace
ralph/

View File

@@ -333,6 +333,23 @@ picoclaw gateway
Config file: `~/.picoclaw/config.json` Config file: `~/.picoclaw/config.json`
### Workspace Layout
PicoClaw stores data in your configured workspace (default: `~/.picoclaw/workspace`):
```
~/.picoclaw/workspace/
├── sessions/ # Conversation sessions and history
├── memory/ # Long-term memory (MEMORY.md)
├── cron/ # Scheduled jobs database
├── skills/ # Custom skills
├── AGENTS.md # Agent behavior guide
├── IDENTITY.md # Agent identity
├── SOUL.md # Agent soul
├── TOOLS.md # Tool descriptions
└── USER.md # User preferences
```
### Providers ### Providers
> [!NOTE] > [!NOTE]
@@ -452,6 +469,18 @@ picoclaw agent -m "Hello"
| `picoclaw agent` | Interactive chat mode | | `picoclaw agent` | Interactive chat mode |
| `picoclaw gateway` | Start the gateway | | `picoclaw gateway` | Start the gateway |
| `picoclaw status` | Show status | | `picoclaw status` | Show status |
| `picoclaw cron list` | List all scheduled jobs |
| `picoclaw cron add ...` | Add a scheduled job |
### Scheduled Tasks / Reminders
PicoClaw supports scheduled reminders and recurring tasks through the `cron` tool:
- **One-time reminders**: "Remind me in 10 minutes" → triggers once after 10min
- **Recurring tasks**: "Remind me every 2 hours" → triggers every 2 hours
- **Cron expressions**: "Remind me at 9am daily" → uses cron expression
Jobs are stored in `~/.picoclaw/workspace/cron/` and processed automatically.
## 🤝 Contribute & Roadmap ## 🤝 Contribute & Roadmap

View File

@@ -27,6 +27,7 @@ import (
"github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/skills" "github.com/sipeed/picoclaw/pkg/skills"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/voice" "github.com/sipeed/picoclaw/pkg/voice"
) )
@@ -550,8 +551,8 @@ func gatewayCmd() {
"skills_available": skillsInfo["available"], "skills_available": skillsInfo["available"],
}) })
cronStorePath := filepath.Join(filepath.Dir(getConfigPath()), "cron", "jobs.json") // Setup cron tool and service
cronService := cron.NewCronService(cronStorePath, nil) cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath())
heartbeatService := heartbeat.NewHeartbeatService( heartbeatService := heartbeat.NewHeartbeatService(
cfg.WorkspacePath(), cfg.WorkspacePath(),
@@ -689,6 +690,25 @@ func getConfigPath() string {
return filepath.Join(home, ".picoclaw", "config.json") return filepath.Join(home, ".picoclaw", "config.json")
} }
func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string) *cron.CronService {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
// Create cron service
cronService := cron.NewCronService(cronStorePath, nil)
// Create and register CronTool
cronTool := tools.NewCronTool(cronService, agentLoop, msgBus)
agentLoop.RegisterTool(cronTool)
// Set the onJob handler
cronService.SetOnJob(func(job *cron.CronJob) (string, error) {
result := cronTool.ExecuteJob(context.Background(), job)
return result, nil
})
return cronService
}
func loadConfig() (*config.Config, error) { func loadConfig() (*config.Config, error) {
return config.LoadConfig(getConfigPath()) return config.LoadConfig(getConfigPath())
} }
@@ -701,8 +721,14 @@ func cronCmd() {
subcommand := os.Args[2] subcommand := os.Args[2]
dataDir := filepath.Join(filepath.Dir(getConfigPath()), "cron") // Load config to get workspace path
cronStorePath := filepath.Join(dataDir, "jobs.json") cfg, err := loadConfig()
if err != nil {
fmt.Printf("Error loading config: %v\n", err)
return
}
cronStorePath := filepath.Join(cfg.WorkspacePath(), "cron", "jobs.json")
switch subcommand { switch subcommand {
case "list": case "list":
@@ -745,7 +771,7 @@ func cronHelp() {
func cronListCmd(storePath string) { func cronListCmd(storePath string) {
cs := cron.NewCronService(storePath, nil) cs := cron.NewCronService(storePath, nil)
jobs := cs.ListJobs(false) jobs := cs.ListJobs(true) // Show all jobs, including disabled
if len(jobs) == 0 { if len(jobs) == 0 {
fmt.Println("No scheduled jobs.") fmt.Println("No scheduled jobs.")

1
go.mod
View File

@@ -3,6 +3,7 @@ module github.com/sipeed/picoclaw
go 1.24.0 go 1.24.0
require ( require (
github.com/adhocore/gronx v1.19.6
github.com/bwmarrin/discordgo v0.29.0 github.com/bwmarrin/discordgo v0.29.0
github.com/caarlos0/env/v11 v11.3.1 github.com/caarlos0/env/v11 v11.3.1
github.com/chzyer/readline v1.5.1 github.com/chzyer/readline v1.5.1

2
go.sum
View File

@@ -1,4 +1,6 @@
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/adhocore/gronx v1.19.6 h1:5KNVcoR9ACgL9HhEqCm5QXsab/gI4QDIybTAWcXDKDc=
github.com/adhocore/gronx v1.19.6/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/bwmarrin/discordgo v0.29.0 h1:FmWeXFaKUwrcL3Cx65c20bTRW+vOb6k8AnaP+EgjDno= github.com/bwmarrin/discordgo v0.29.0 h1:FmWeXFaKUwrcL3Cx65c20bTRW+vOb6k8AnaP+EgjDno=
github.com/bwmarrin/discordgo v0.29.0/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY= github.com/bwmarrin/discordgo v0.29.0/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY=
github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA=

View File

@@ -11,13 +11,14 @@ import (
"github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/skills" "github.com/sipeed/picoclaw/pkg/skills"
"github.com/sipeed/picoclaw/pkg/tools"
) )
type ContextBuilder struct { type ContextBuilder struct {
workspace string workspace string
skillsLoader *skills.SkillsLoader skillsLoader *skills.SkillsLoader
memory *MemoryStore memory *MemoryStore
toolsSummary func() []string // Function to get tool summaries dynamically tools *tools.ToolRegistry // Direct reference to tool registry
} }
func getGlobalConfigDir() string { func getGlobalConfigDir() string {
@@ -28,9 +29,9 @@ func getGlobalConfigDir() string {
return filepath.Join(home, ".picoclaw") return filepath.Join(home, ".picoclaw")
} }
func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *ContextBuilder { func NewContextBuilder(workspace string) *ContextBuilder {
// builtin skills: 当前项目的 skills 目录 // builtin skills: skills directory in current project
// 使用当前工作目录下的 skills/ 目录 // Use the skills/ directory under the current working directory
wd, _ := os.Getwd() wd, _ := os.Getwd()
builtinSkillsDir := filepath.Join(wd, "skills") builtinSkillsDir := filepath.Join(wd, "skills")
globalSkillsDir := filepath.Join(getGlobalConfigDir(), "skills") globalSkillsDir := filepath.Join(getGlobalConfigDir(), "skills")
@@ -39,10 +40,14 @@ func NewContextBuilder(workspace string, toolsSummaryFunc func() []string) *Cont
workspace: workspace, workspace: workspace,
skillsLoader: skills.NewSkillsLoader(workspace, globalSkillsDir, builtinSkillsDir), skillsLoader: skills.NewSkillsLoader(workspace, globalSkillsDir, builtinSkillsDir),
memory: NewMemoryStore(workspace), memory: NewMemoryStore(workspace),
toolsSummary: toolsSummaryFunc,
} }
} }
// SetToolsRegistry sets the tools registry for dynamic tool summary generation.
func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) {
cb.tools = registry
}
func (cb *ContextBuilder) getIdentity() string { func (cb *ContextBuilder) getIdentity() string {
now := time.Now().Format("2006-01-02 15:04 (Monday)") now := time.Now().Format("2006-01-02 15:04 (Monday)")
workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace)) workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace))
@@ -69,23 +74,29 @@ Your workspace is at: %s
%s %s
Always be helpful, accurate, and concise. When using tools, explain what you're doing. ## Important Rules
When remembering something, write to %s/memory/MEMORY.md`,
1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it.
2. **Be helpful and accurate** - When using tools, briefly explain what you're doing.
3. **Memory** - When remembering something, write to %s/memory/MEMORY.md`,
now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath) now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath)
} }
func (cb *ContextBuilder) buildToolsSection() string { func (cb *ContextBuilder) buildToolsSection() string {
if cb.toolsSummary == nil { if cb.tools == nil {
return "" return ""
} }
summaries := cb.toolsSummary() summaries := cb.tools.GetSummaries()
if len(summaries) == 0 { if len(summaries) == 0 {
return "" return ""
} }
var sb strings.Builder var sb strings.Builder
sb.WriteString("## Available Tools\n\n") sb.WriteString("## Available Tools\n\n")
sb.WriteString("**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n")
sb.WriteString("You have access to the following tools:\n\n") sb.WriteString("You have access to the following tools:\n\n")
for _, s := range summaries { for _, s := range summaries {
sb.WriteString(s) sb.WriteString(s)

View File

@@ -13,6 +13,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/config"
@@ -20,6 +22,7 @@ import (
"github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/tools" "github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/utils"
) )
type AgentLoop struct { type AgentLoop struct {
@@ -27,11 +30,24 @@ type AgentLoop struct {
provider providers.LLMProvider provider providers.LLMProvider
workspace string workspace string
model string model string
contextWindow int // Maximum context window size in tokens
maxIterations int maxIterations int
sessions *session.SessionManager sessions *session.SessionManager
contextBuilder *ContextBuilder contextBuilder *ContextBuilder
tools *tools.ToolRegistry tools *tools.ToolRegistry
running bool running bool
summarizing sync.Map // Tracks which sessions are currently being summarized
}
// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
UserMessage string // User message content (may include prefix)
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
} }
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop { func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
@@ -69,18 +85,24 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
editFileTool := tools.NewEditFileTool(workspace) editFileTool := tools.NewEditFileTool(workspace)
toolsRegistry.Register(editFileTool) toolsRegistry.Register(editFileTool)
sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions"))
// Create context builder and set tools registry
contextBuilder := NewContextBuilder(workspace)
contextBuilder.SetToolsRegistry(toolsRegistry)
return &AgentLoop{ return &AgentLoop{
bus: msgBus, bus: msgBus,
provider: provider, provider: provider,
workspace: workspace, workspace: workspace,
model: cfg.Agents.Defaults.Model, model: cfg.Agents.Defaults.Model,
contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization
maxIterations: cfg.Agents.Defaults.MaxToolIterations, maxIterations: cfg.Agents.Defaults.MaxToolIterations,
sessions: sessionsManager, sessions: sessionsManager,
contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), contextBuilder: contextBuilder,
tools: toolsRegistry, tools: toolsRegistry,
running: false, running: false,
summarizing: sync.Map{},
} }
} }
@@ -119,11 +141,19 @@ func (al *AgentLoop) Stop() {
al.running = false al.running = false
} }
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
al.tools.Register(tool)
}
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) { func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct")
}
func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) {
msg := bus.InboundMessage{ msg := bus.InboundMessage{
Channel: "cli", Channel: channel,
SenderID: "user", SenderID: "cron",
ChatID: "direct", ChatID: chatID,
Content: content, Content: content,
SessionKey: sessionKey, SessionKey: sessionKey,
} }
@@ -133,7 +163,7 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
// Add message preview to log // Add message preview to log
preview := truncate(msg.Content, 80) preview := utils.Truncate(msg.Content, 80)
logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview), logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview),
map[string]interface{}{ map[string]interface{}{
"channel": msg.Channel, "channel": msg.Channel,
@@ -147,169 +177,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
return al.processSystemMessage(ctx, msg) return al.processSystemMessage(ctx, msg)
} }
// Update tool contexts // Process as user message
if tool, ok := al.tools.Get("message"); ok { return al.runAgentLoop(ctx, processOptions{
if mt, ok := tool.(*tools.MessageTool); ok { SessionKey: msg.SessionKey,
mt.SetContext(msg.Channel, msg.ChatID) Channel: msg.Channel,
} ChatID: msg.ChatID,
} UserMessage: msg.Content,
if tool, ok := al.tools.Get("spawn"); ok { DefaultResponse: "I've completed processing but have no response to give.",
if st, ok := tool.(*tools.SpawnTool); ok { EnableSummary: true,
st.SetContext(msg.Channel, msg.ChatID) SendResponse: false,
}
}
history := al.sessions.GetHistory(msg.SessionKey)
summary := al.sessions.GetSummary(msg.SessionKey)
messages := al.contextBuilder.BuildMessages(
history,
summary,
msg.Content,
nil,
msg.Channel,
msg.ChatID,
)
iteration := 0
var finalContent string
for iteration < al.maxIterations {
iteration++
logger.DebugCF("agent", "LLM iteration",
map[string]interface{}{
"iteration": iteration,
"max": al.maxIterations,
}) })
toolDefs := al.tools.GetDefinitions()
providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs))
for _, td := range toolDefs {
providerToolDefs = append(providerToolDefs, providers.ToolDefinition{
Type: td["type"].(string),
Function: providers.ToolFunctionDefinition{
Name: td["function"].(map[string]interface{})["name"].(string),
Description: td["function"].(map[string]interface{})["description"].(string),
Parameters: td["function"].(map[string]interface{})["parameters"].(map[string]interface{}),
},
})
}
// Log LLM request details
logger.DebugCF("agent", "LLM request",
map[string]interface{}{
"iteration": iteration,
"model": al.model,
"messages_count": len(messages),
"tools_count": len(providerToolDefs),
"max_tokens": 8192,
"temperature": 0.7,
"system_prompt_len": len(messages[0].Content),
})
// Log full messages (detailed)
logger.DebugCF("agent", "Full LLM request",
map[string]interface{}{
"iteration": iteration,
"messages_json": formatMessagesForLog(messages),
"tools_json": formatToolsForLog(providerToolDefs),
})
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{
"max_tokens": 8192,
"temperature": 0.7,
})
if err != nil {
logger.ErrorCF("agent", "LLM call failed",
map[string]interface{}{
"iteration": iteration,
"error": err.Error(),
})
return "", fmt.Errorf("LLM call failed: %w", err)
}
if len(response.ToolCalls) == 0 {
finalContent = response.Content
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]interface{}{
"iteration": iteration,
"content_chars": len(finalContent),
})
break
}
toolNames := make([]string, 0, len(response.ToolCalls))
for _, tc := range response.ToolCalls {
toolNames = append(toolNames, tc.Name)
}
logger.InfoCF("agent", "LLM requested tool calls",
map[string]interface{}{
"tools": toolNames,
"count": len(toolNames),
"iteration": iteration,
})
assistantMsg := providers.Message{
Role: "assistant",
Content: response.Content,
}
for _, tc := range response.ToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments)
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
ID: tc.ID,
Type: "function",
Function: &providers.FunctionCall{
Name: tc.Name,
Arguments: string(argumentsJSON),
},
})
}
messages = append(messages, assistantMsg)
for _, tc := range response.ToolCalls {
// Log tool call with arguments preview
argsJSON, _ := json.Marshal(tc.Arguments)
argsPreview := truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
map[string]interface{}{
"tool": tc.Name,
"iteration": iteration,
})
result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments)
if err != nil {
result = fmt.Sprintf("Error: %v", err)
}
toolResultMsg := providers.Message{
Role: "tool",
Content: result,
ToolCallID: tc.ID,
}
messages = append(messages, toolResultMsg)
}
}
if finalContent == "" {
finalContent = "I've completed processing but have no response to give."
}
al.sessions.AddMessage(msg.SessionKey, "user", msg.Content)
al.sessions.AddMessage(msg.SessionKey, "assistant", finalContent)
al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey))
// Log response preview
responsePreview := truncate(finalContent, 120)
logger.InfoCF("agent", fmt.Sprintf("Response to %s:%s: %s", msg.Channel, msg.SenderID, responsePreview),
map[string]interface{}{
"iterations": iteration,
"final_length": len(finalContent),
})
return finalContent, nil
} }
func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
@@ -338,36 +215,96 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
// Use the origin session for context // Use the origin session for context
sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID)
// Update tool contexts to original channel/chatID // Process as system message with routing back to origin
if tool, ok := al.tools.Get("message"); ok { return al.runAgentLoop(ctx, processOptions{
if mt, ok := tool.(*tools.MessageTool); ok { SessionKey: sessionKey,
mt.SetContext(originChannel, originChatID) Channel: originChannel,
} ChatID: originChatID,
} UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content),
if tool, ok := al.tools.Get("spawn"); ok { DefaultResponse: "Background task completed.",
if st, ok := tool.(*tools.SpawnTool); ok { EnableSummary: false,
st.SetContext(originChannel, originChatID) SendResponse: true, // Send response back to original channel
} })
} }
// Build messages with the announce content // runAgentLoop is the core message processing logic.
history := al.sessions.GetHistory(sessionKey) // It handles context building, LLM calls, tool execution, and response handling.
summary := al.sessions.GetSummary(sessionKey) func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) {
// 1. Update tool contexts
al.updateToolContexts(opts.Channel, opts.ChatID)
// 2. Build messages
history := al.sessions.GetHistory(opts.SessionKey)
summary := al.sessions.GetSummary(opts.SessionKey)
messages := al.contextBuilder.BuildMessages( messages := al.contextBuilder.BuildMessages(
history, history,
summary, summary,
msg.Content, opts.UserMessage,
nil, nil,
originChannel, opts.Channel,
originChatID, opts.ChatID,
) )
// 3. Save user message to session
al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
// 4. Run LLM iteration loop
finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts)
if err != nil {
return "", err
}
// 5. Handle empty response
if finalContent == "" {
finalContent = opts.DefaultResponse
}
// 6. Save final assistant message to session
al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
al.sessions.Save(al.sessions.GetOrCreate(opts.SessionKey))
// 7. Optional: summarization
if opts.EnableSummary {
al.maybeSummarize(opts.SessionKey)
}
// 8. Optional: send response via bus
if opts.SendResponse {
al.bus.PublishOutbound(bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: finalContent,
})
}
// 9. Log response
responsePreview := utils.Truncate(finalContent, 120)
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
map[string]interface{}{
"session_key": opts.SessionKey,
"iterations": iteration,
"final_length": len(finalContent),
})
return finalContent, nil
}
// runLLMIteration executes the LLM call loop with tool handling.
// Returns the final content, iteration count, and any error.
func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) {
iteration := 0 iteration := 0
var finalContent string var finalContent string
for iteration < al.maxIterations { for iteration < al.maxIterations {
iteration++ iteration++
logger.DebugCF("agent", "LLM iteration",
map[string]interface{}{
"iteration": iteration,
"max": al.maxIterations,
})
// Build tool definitions
toolDefs := al.tools.GetDefinitions() toolDefs := al.tools.GetDefinitions()
providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs)) providerToolDefs := make([]providers.ToolDefinition, 0, len(toolDefs))
for _, td := range toolDefs { for _, td := range toolDefs {
@@ -401,30 +338,49 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
"tools_json": formatToolsForLog(providerToolDefs), "tools_json": formatToolsForLog(providerToolDefs),
}) })
// Call LLM
response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{ response, err := al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{
"max_tokens": 8192, "max_tokens": 8192,
"temperature": 0.7, "temperature": 0.7,
}) })
if err != nil { if err != nil {
logger.ErrorCF("agent", "LLM call failed in system message", logger.ErrorCF("agent", "LLM call failed",
map[string]interface{}{ map[string]interface{}{
"iteration": iteration, "iteration": iteration,
"error": err.Error(), "error": err.Error(),
}) })
return "", fmt.Errorf("LLM call failed: %w", err) return "", iteration, fmt.Errorf("LLM call failed: %w", err)
} }
// Check if no tool calls - we're done
if len(response.ToolCalls) == 0 { if len(response.ToolCalls) == 0 {
finalContent = response.Content finalContent = response.Content
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]interface{}{
"iteration": iteration,
"content_chars": len(finalContent),
})
break break
} }
// Log tool calls
toolNames := make([]string, 0, len(response.ToolCalls))
for _, tc := range response.ToolCalls {
toolNames = append(toolNames, tc.Name)
}
logger.InfoCF("agent", "LLM requested tool calls",
map[string]interface{}{
"tools": toolNames,
"count": len(toolNames),
"iteration": iteration,
})
// Build assistant message with tool calls
assistantMsg := providers.Message{ assistantMsg := providers.Message{
Role: "assistant", Role: "assistant",
Content: response.Content, Content: response.Content,
} }
for _, tc := range response.ToolCalls { for _, tc := range response.ToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments) argumentsJSON, _ := json.Marshal(tc.Arguments)
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{ assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
@@ -438,8 +394,21 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
} }
messages = append(messages, assistantMsg) messages = append(messages, assistantMsg)
// Save assistant message with tool calls to session
al.sessions.AddFullMessage(opts.SessionKey, assistantMsg)
// Execute tool calls
for _, tc := range response.ToolCalls { for _, tc := range response.ToolCalls {
result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) // Log tool call with arguments preview
argsJSON, _ := json.Marshal(tc.Arguments)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
map[string]interface{}{
"tool": tc.Name,
"iteration": iteration,
})
result, err := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID)
if err != nil { if err != nil {
result = fmt.Sprintf("Error: %v", err) result = fmt.Sprintf("Error: %v", err)
} }
@@ -450,39 +419,43 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
ToolCallID: tc.ID, ToolCallID: tc.ID,
} }
messages = append(messages, toolResultMsg) messages = append(messages, toolResultMsg)
// Save tool result message to session
al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
} }
} }
if finalContent == "" { return finalContent, iteration, nil
finalContent = "Background task completed."
} }
// Save to session with system message marker // updateToolContexts updates the context for tools that need channel/chatID info.
al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) func (al *AgentLoop) updateToolContexts(channel, chatID string) {
al.sessions.AddMessage(sessionKey, "assistant", finalContent) if tool, ok := al.tools.Get("message"); ok {
al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) if mt, ok := tool.(*tools.MessageTool); ok {
mt.SetContext(channel, chatID)
logger.InfoCF("agent", "System message processing completed", }
map[string]interface{}{ }
"iterations": iteration, if tool, ok := al.tools.Get("spawn"); ok {
"final_length": len(finalContent), if st, ok := tool.(*tools.SpawnTool); ok {
}) st.SetContext(channel, chatID)
}
return finalContent, nil }
} }
// truncate returns a truncated version of s with at most maxLen characters. // maybeSummarize triggers summarization if the session history exceeds thresholds.
// If the string is truncated, "..." is appended to indicate truncation. func (al *AgentLoop) maybeSummarize(sessionKey string) {
// If the string fits within maxLen, it is returned unchanged. newHistory := al.sessions.GetHistory(sessionKey)
func truncate(s string, maxLen int) string { tokenEstimate := al.estimateTokens(newHistory)
if len(s) <= maxLen { threshold := al.contextWindow * 75 / 100
return s
if len(newHistory) > 20 || tokenEstimate > threshold {
if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading {
go func() {
defer al.summarizing.Delete(sessionKey)
al.summarizeSession(sessionKey)
}()
} }
// Reserve 3 chars for "..."
if maxLen <= 3 {
return s[:maxLen]
} }
return s[:maxLen-3] + "..."
} }
// GetStartupInfo returns information about loaded tools and skills for logging. // GetStartupInfo returns information about loaded tools and skills for logging.
@@ -517,12 +490,12 @@ func formatMessagesForLog(messages []providers.Message) string {
for _, tc := range msg.ToolCalls { for _, tc := range msg.ToolCalls {
result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name) result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name)
if tc.Function != nil { if tc.Function != nil {
result += fmt.Sprintf(" Arguments: %s\n", truncateString(tc.Function.Arguments, 200)) result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200))
} }
} }
} }
if msg.Content != "" { if msg.Content != "" {
content := truncateString(msg.Content, 200) content := utils.Truncate(msg.Content, 200)
result += fmt.Sprintf(" Content: %s\n", content) result += fmt.Sprintf(" Content: %s\n", content)
} }
if msg.ToolCallID != "" { if msg.ToolCallID != "" {
@@ -546,20 +519,114 @@ func formatToolsForLog(tools []providers.ToolDefinition) string {
result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name) result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name)
result += fmt.Sprintf(" Description: %s\n", tool.Function.Description) result += fmt.Sprintf(" Description: %s\n", tool.Function.Description)
if len(tool.Function.Parameters) > 0 { if len(tool.Function.Parameters) > 0 {
result += fmt.Sprintf(" Parameters: %s\n", truncateString(fmt.Sprintf("%v", tool.Function.Parameters), 200)) result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200))
} }
} }
result += "]" result += "]"
return result return result
} }
// truncateString truncates a string to max length // summarizeSession summarizes the conversation history for a session.
func truncateString(s string, maxLen int) string { func (al *AgentLoop) summarizeSession(sessionKey string) {
if len(s) <= maxLen { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
return s defer cancel()
history := al.sessions.GetHistory(sessionKey)
summary := al.sessions.GetSummary(sessionKey)
// Keep last 4 messages for continuity
if len(history) <= 4 {
return
} }
if maxLen <= 3 {
return s[:maxLen] toSummarize := history[:len(history)-4]
// Oversized Message Guard
// Skip messages larger than 50% of context window to prevent summarizer overflow
maxMessageTokens := al.contextWindow / 2
validMessages := make([]providers.Message, 0)
omitted := false
for _, m := range toSummarize {
if m.Role != "user" && m.Role != "assistant" {
continue
} }
return s[:maxLen-3] + "..." // Estimate tokens for this message
msgTokens := len(m.Content) / 4
if msgTokens > maxMessageTokens {
omitted = true
continue
}
validMessages = append(validMessages, m)
}
if len(validMessages) == 0 {
return
}
// Multi-Part Summarization
// Split into two parts if history is significant
var finalSummary string
if len(validMessages) > 10 {
mid := len(validMessages) / 2
part1 := validMessages[:mid]
part2 := validMessages[mid:]
s1, _ := al.summarizeBatch(ctx, part1, "")
s2, _ := al.summarizeBatch(ctx, part2, "")
// Merge them
mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2)
resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err == nil {
finalSummary = resp.Content
} else {
finalSummary = s1 + " " + s2
}
} else {
finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary)
}
if omitted && finalSummary != "" {
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
}
if finalSummary != "" {
al.sessions.SetSummary(sessionKey, finalSummary)
al.sessions.TruncateHistory(sessionKey, 4)
al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
}
}
// summarizeBatch summarizes a batch of messages.
func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) {
prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n"
if existingSummary != "" {
prompt += "Existing context: " + existingSummary + "\n"
}
prompt += "\nCONVERSATION:\n"
for _, m := range batch {
prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content)
}
response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{
"max_tokens": 1024,
"temperature": 0.3,
})
if err != nil {
return "", err
}
return response.Content, nil
}
// estimateTokens estimates the number of tokens in a message list.
func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
total := 0
for _, m := range messages {
total += len(m.Content) / 4 // Simple heuristic: 4 chars per token
}
return total
} }

View File

@@ -61,7 +61,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
return return
} }
// 生成 SessionKey: channel:chatID // Build session key: channel:chatID
sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) sessionKey := fmt.Sprintf("%s:%s", c.name, chatID)
msg := bus.InboundMessage{ msg := bus.InboundMessage{
@@ -70,8 +70,8 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
ChatID: chatID, ChatID: chatID,
Content: content, Content: content,
Media: media, Media: media,
Metadata: metadata,
SessionKey: sessionKey, SessionKey: sessionKey,
Metadata: metadata,
} }
c.bus.PublishInbound(msg) c.bus.PublishInbound(msg)

View File

@@ -1,12 +1,17 @@
package cron package cron
import ( import (
"crypto/rand"
"encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
"github.com/adhocore/gronx"
) )
type CronSchedule struct { type CronSchedule struct {
@@ -58,6 +63,7 @@ type CronService struct {
mu sync.RWMutex mu sync.RWMutex
running bool running bool
stopChan chan struct{} stopChan chan struct{}
gronx *gronx.Gronx
} }
func NewCronService(storePath string, onJob JobHandler) *CronService { func NewCronService(storePath string, onJob JobHandler) *CronService {
@@ -65,7 +71,9 @@ func NewCronService(storePath string, onJob JobHandler) *CronService {
storePath: storePath, storePath: storePath,
onJob: onJob, onJob: onJob,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
gronx: gronx.New(),
} }
// Initialize and load store on creation
cs.loadStore() cs.loadStore()
return cs return cs
} }
@@ -83,7 +91,7 @@ func (cs *CronService) Start() error {
} }
cs.recomputeNextRuns() cs.recomputeNextRuns()
if err := cs.saveStore(); err != nil { if err := cs.saveStoreUnsafe(); err != nil {
return fmt.Errorf("failed to save store: %w", err) return fmt.Errorf("failed to save store: %w", err)
} }
@@ -120,30 +128,49 @@ func (cs *CronService) runLoop() {
} }
func (cs *CronService) checkJobs() { func (cs *CronService) checkJobs() {
cs.mu.RLock() cs.mu.Lock()
if !cs.running { if !cs.running {
cs.mu.RUnlock() cs.mu.Unlock()
return return
} }
now := time.Now().UnixMilli() now := time.Now().UnixMilli()
var dueJobs []*CronJob var dueJobs []*CronJob
// Collect jobs that are due (we need to copy them to execute outside lock)
for i := range cs.store.Jobs { for i := range cs.store.Jobs {
job := &cs.store.Jobs[i] job := &cs.store.Jobs[i]
if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now {
dueJobs = append(dueJobs, job) // Create a shallow copy of the job for execution
jobCopy := *job
dueJobs = append(dueJobs, &jobCopy)
} }
} }
cs.mu.RUnlock()
// Update next run times for due jobs immediately (before executing)
// Use map for O(n) lookup instead of O(n²) nested loop
dueMap := make(map[string]bool, len(dueJobs))
for _, job := range dueJobs {
dueMap[job.ID] = true
}
for i := range cs.store.Jobs {
if dueMap[cs.store.Jobs[i].ID] {
// Reset NextRunAtMS temporarily so we don't re-execute
cs.store.Jobs[i].State.NextRunAtMS = nil
}
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
}
cs.mu.Unlock()
// Execute jobs outside the lock
for _, job := range dueJobs { for _, job := range dueJobs {
cs.executeJob(job) cs.executeJob(job)
} }
cs.mu.Lock()
defer cs.mu.Unlock()
cs.saveStore()
} }
func (cs *CronService) executeJob(job *CronJob) { func (cs *CronService) executeJob(job *CronJob) {
@@ -154,30 +181,42 @@ func (cs *CronService) executeJob(job *CronJob) {
_, err = cs.onJob(job) _, err = cs.onJob(job)
} }
// Now acquire lock to update state
cs.mu.Lock() cs.mu.Lock()
defer cs.mu.Unlock() defer cs.mu.Unlock()
job.State.LastRunAtMS = &startTime // Find the job in store and update it
job.UpdatedAtMS = time.Now().UnixMilli() for i := range cs.store.Jobs {
if cs.store.Jobs[i].ID == job.ID {
cs.store.Jobs[i].State.LastRunAtMS = &startTime
cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli()
if err != nil { if err != nil {
job.State.LastStatus = "error" cs.store.Jobs[i].State.LastStatus = "error"
job.State.LastError = err.Error() cs.store.Jobs[i].State.LastError = err.Error()
} else { } else {
job.State.LastStatus = "ok" cs.store.Jobs[i].State.LastStatus = "ok"
job.State.LastError = "" cs.store.Jobs[i].State.LastError = ""
} }
if job.Schedule.Kind == "at" { // Compute next run time
if job.DeleteAfterRun { if cs.store.Jobs[i].Schedule.Kind == "at" {
if cs.store.Jobs[i].DeleteAfterRun {
cs.removeJobUnsafe(job.ID) cs.removeJobUnsafe(job.ID)
} else { } else {
job.Enabled = false cs.store.Jobs[i].Enabled = false
job.State.NextRunAtMS = nil cs.store.Jobs[i].State.NextRunAtMS = nil
} }
} else { } else {
nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) nextRun := cs.computeNextRun(&cs.store.Jobs[i].Schedule, time.Now().UnixMilli())
job.State.NextRunAtMS = nextRun cs.store.Jobs[i].State.NextRunAtMS = nextRun
}
break
}
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
} }
} }
@@ -197,6 +236,23 @@ func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int6
return &next return &next
} }
if schedule.Kind == "cron" {
if schedule.Expr == "" {
return nil
}
// Use gronx to calculate next run time
now := time.UnixMilli(nowMS)
nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false)
if err != nil {
log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err)
return nil
}
nextMS := nextTime.UnixMilli()
return &nextMS
}
return nil return nil
} }
@@ -223,9 +279,17 @@ func (cs *CronService) getNextWakeMS() *int64 {
} }
func (cs *CronService) Load() error { func (cs *CronService) Load() error {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.loadStore() return cs.loadStore()
} }
func (cs *CronService) SetOnJob(handler JobHandler) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.onJob = handler
}
func (cs *CronService) loadStore() error { func (cs *CronService) loadStore() error {
cs.store = &CronStore{ cs.store = &CronStore{
Version: 1, Version: 1,
@@ -243,7 +307,7 @@ func (cs *CronService) loadStore() error {
return json.Unmarshal(data, cs.store) return json.Unmarshal(data, cs.store)
} }
func (cs *CronService) saveStore() error { func (cs *CronService) saveStoreUnsafe() error {
dir := filepath.Dir(cs.storePath) dir := filepath.Dir(cs.storePath)
if err := os.MkdirAll(dir, 0755); err != nil { if err := os.MkdirAll(dir, 0755); err != nil {
return err return err
@@ -263,6 +327,9 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string
now := time.Now().UnixMilli() now := time.Now().UnixMilli()
// One-time tasks (at) should be deleted after execution
deleteAfterRun := (schedule.Kind == "at")
job := CronJob{ job := CronJob{
ID: generateID(), ID: generateID(),
Name: name, Name: name,
@@ -280,11 +347,11 @@ func (cs *CronService) AddJob(name string, schedule CronSchedule, message string
}, },
CreatedAtMS: now, CreatedAtMS: now,
UpdatedAtMS: now, UpdatedAtMS: now,
DeleteAfterRun: false, DeleteAfterRun: deleteAfterRun,
} }
cs.store.Jobs = append(cs.store.Jobs, job) cs.store.Jobs = append(cs.store.Jobs, job)
if err := cs.saveStore(); err != nil { if err := cs.saveStoreUnsafe(); err != nil {
return nil, err return nil, err
} }
@@ -310,7 +377,9 @@ func (cs *CronService) removeJobUnsafe(jobID string) bool {
removed := len(cs.store.Jobs) < before removed := len(cs.store.Jobs) < before
if removed { if removed {
cs.saveStore() if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after remove: %v", err)
}
} }
return removed return removed
@@ -332,7 +401,9 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
job.State.NextRunAtMS = nil job.State.NextRunAtMS = nil
} }
cs.saveStore() if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after enable: %v", err)
}
return job return job
} }
} }
@@ -377,5 +448,11 @@ func (cs *CronService) Status() map[string]interface{} {
} }
func generateID() string { func generateID() string {
// Use crypto/rand for better uniqueness under concurrent access
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
// Fallback to time-based if crypto/rand fails
return fmt.Sprintf("%d", time.Now().UnixNano()) return fmt.Sprintf("%d", time.Now().UnixNano())
} }
return hex.EncodeToString(b)
}

View File

@@ -59,6 +59,15 @@ func (sm *SessionManager) GetOrCreate(key string) *Session {
} }
func (sm *SessionManager) AddMessage(sessionKey, role, content string) { func (sm *SessionManager) AddMessage(sessionKey, role, content string) {
sm.AddFullMessage(sessionKey, providers.Message{
Role: role,
Content: content,
})
}
// AddFullMessage adds a complete message with tool calls and tool call ID to the session.
// This is used to save the full conversation flow including tool calls and tool results.
func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Message) {
sm.mu.Lock() sm.mu.Lock()
defer sm.mu.Unlock() defer sm.mu.Unlock()
@@ -72,10 +81,7 @@ func (sm *SessionManager) AddMessage(sessionKey, role, content string) {
sm.sessions[sessionKey] = session sm.sessions[sessionKey] = session
} }
session.Messages = append(session.Messages, providers.Message{ session.Messages = append(session.Messages, msg)
Role: role,
Content: content,
})
session.Updated = time.Now() session.Updated = time.Now()
} }

View File

@@ -9,6 +9,13 @@ type Tool interface {
Execute(ctx context.Context, args map[string]interface{}) (string, error) Execute(ctx context.Context, args map[string]interface{}) (string, error)
} }
// ContextualTool is an optional interface that tools can implement
// to receive the current message context (channel, chatID)
type ContextualTool interface {
Tool
SetContext(channel, chatID string)
}
func ToolToSchema(tool Tool) map[string]interface{} { func ToolToSchema(tool Tool) map[string]interface{} {
return map[string]interface{}{ return map[string]interface{}{
"type": "function", "type": "function",

284
pkg/tools/cron.go Normal file
View File

@@ -0,0 +1,284 @@
package tools
import (
"context"
"fmt"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/cron"
"github.com/sipeed/picoclaw/pkg/utils"
)
// JobExecutor is the interface for executing cron jobs through the agent
type JobExecutor interface {
ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
}
// CronTool provides scheduling capabilities for the agent
type CronTool struct {
cronService *cron.CronService
executor JobExecutor
msgBus *bus.MessageBus
channel string
chatID string
mu sync.RWMutex
}
// NewCronTool creates a new CronTool
func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus) *CronTool {
return &CronTool{
cronService: cronService,
executor: executor,
msgBus: msgBus,
}
}
// Name returns the tool name
func (t *CronTool) Name() string {
return "cron"
}
// Description returns the tool description
func (t *CronTool) Description() string {
return "Schedule reminders and tasks. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am)."
}
// Parameters returns the tool parameters schema
func (t *CronTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"action": map[string]interface{}{
"type": "string",
"enum": []string{"add", "list", "remove", "enable", "disable"},
"description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.",
},
"message": map[string]interface{}{
"type": "string",
"description": "The reminder/task message to display when triggered (required for add)",
},
"at_seconds": map[string]interface{}{
"type": "integer",
"description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.",
},
"every_seconds": map[string]interface{}{
"type": "integer",
"description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.",
},
"cron_expr": map[string]interface{}{
"type": "string",
"description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.",
},
"job_id": map[string]interface{}{
"type": "string",
"description": "Job ID (for remove/enable/disable)",
},
"deliver": map[string]interface{}{
"type": "boolean",
"description": "If true, send message directly to channel. If false, let agent process the message (for complex tasks). Default: true",
},
},
"required": []string{"action"},
}
}
// SetContext sets the current session context for job creation
func (t *CronTool) SetContext(channel, chatID string) {
t.mu.Lock()
defer t.mu.Unlock()
t.channel = channel
t.chatID = chatID
}
// Execute runs the tool with given arguments
func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) {
action, ok := args["action"].(string)
if !ok {
return "", fmt.Errorf("action is required")
}
switch action {
case "add":
return t.addJob(args)
case "list":
return t.listJobs()
case "remove":
return t.removeJob(args)
case "enable":
return t.enableJob(args, true)
case "disable":
return t.enableJob(args, false)
default:
return "", fmt.Errorf("unknown action: %s", action)
}
}
func (t *CronTool) addJob(args map[string]interface{}) (string, error) {
t.mu.RLock()
channel := t.channel
chatID := t.chatID
t.mu.RUnlock()
if channel == "" || chatID == "" {
return "Error: no session context (channel/chat_id not set). Use this tool in an active conversation.", nil
}
message, ok := args["message"].(string)
if !ok || message == "" {
return "Error: message is required for add", nil
}
var schedule cron.CronSchedule
// Check for at_seconds (one-time), every_seconds (recurring), or cron_expr
atSeconds, hasAt := args["at_seconds"].(float64)
everySeconds, hasEvery := args["every_seconds"].(float64)
cronExpr, hasCron := args["cron_expr"].(string)
// Priority: at_seconds > every_seconds > cron_expr
if hasAt {
atMS := time.Now().UnixMilli() + int64(atSeconds)*1000
schedule = cron.CronSchedule{
Kind: "at",
AtMS: &atMS,
}
} else if hasEvery {
everyMS := int64(everySeconds) * 1000
schedule = cron.CronSchedule{
Kind: "every",
EveryMS: &everyMS,
}
} else if hasCron {
schedule = cron.CronSchedule{
Kind: "cron",
Expr: cronExpr,
}
} else {
return "Error: one of at_seconds, every_seconds, or cron_expr is required", nil
}
// Read deliver parameter, default to true
deliver := true
if d, ok := args["deliver"].(bool); ok {
deliver = d
}
// Truncate message for job name (max 30 chars)
messagePreview := utils.Truncate(message, 30)
job, err := t.cronService.AddJob(
messagePreview,
schedule,
message,
deliver,
channel,
chatID,
)
if err != nil {
return fmt.Sprintf("Error adding job: %v", err), nil
}
return fmt.Sprintf("Created job '%s' (id: %s)", job.Name, job.ID), nil
}
func (t *CronTool) listJobs() (string, error) {
jobs := t.cronService.ListJobs(false)
if len(jobs) == 0 {
return "No scheduled jobs.", nil
}
result := "Scheduled jobs:\n"
for _, j := range jobs {
var scheduleInfo string
if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil {
scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000)
} else if j.Schedule.Kind == "cron" {
scheduleInfo = j.Schedule.Expr
} else if j.Schedule.Kind == "at" {
scheduleInfo = "one-time"
} else {
scheduleInfo = "unknown"
}
result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo)
}
return result, nil
}
func (t *CronTool) removeJob(args map[string]interface{}) (string, error) {
jobID, ok := args["job_id"].(string)
if !ok || jobID == "" {
return "Error: job_id is required for remove", nil
}
if t.cronService.RemoveJob(jobID) {
return fmt.Sprintf("Removed job %s", jobID), nil
}
return fmt.Sprintf("Job %s not found", jobID), nil
}
func (t *CronTool) enableJob(args map[string]interface{}, enable bool) (string, error) {
jobID, ok := args["job_id"].(string)
if !ok || jobID == "" {
return "Error: job_id is required for enable/disable", nil
}
job := t.cronService.EnableJob(jobID, enable)
if job == nil {
return fmt.Sprintf("Job %s not found", jobID), nil
}
status := "enabled"
if !enable {
status = "disabled"
}
return fmt.Sprintf("Job '%s' %s", job.Name, status), nil
}
// ExecuteJob executes a cron job through the agent
func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
// Get channel/chatID from job payload
channel := job.Payload.Channel
chatID := job.Payload.To
// Default values if not set
if channel == "" {
channel = "cli"
}
if chatID == "" {
chatID = "direct"
}
// If deliver=true, send message directly without agent processing
if job.Payload.Deliver {
t.msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: job.Payload.Message,
})
return "ok"
}
// For deliver=false, process through agent (for complex tasks)
sessionKey := fmt.Sprintf("cron-%s", job.ID)
// Call agent with the job's message
response, err := t.executor.ProcessDirectWithChannel(
ctx,
job.Payload.Message,
sessionKey,
channel,
chatID,
)
if err != nil {
return fmt.Sprintf("Error: %v", err)
}
// Response is automatically sent via MessageBus by AgentLoop
_ = response // Will be sent by AgentLoop
return "ok"
}

View File

@@ -34,6 +34,10 @@ func (r *ToolRegistry) Get(name string) (Tool, bool) {
} }
func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) (string, error) { func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string]interface{}) (string, error) {
return r.ExecuteWithContext(ctx, name, args, "", "")
}
func (r *ToolRegistry) ExecuteWithContext(ctx context.Context, name string, args map[string]interface{}, channel, chatID string) (string, error) {
logger.InfoCF("tool", "Tool execution started", logger.InfoCF("tool", "Tool execution started",
map[string]interface{}{ map[string]interface{}{
"tool": name, "tool": name,
@@ -49,6 +53,11 @@ func (r *ToolRegistry) Execute(ctx context.Context, name string, args map[string
return "", fmt.Errorf("tool '%s' not found", name) return "", fmt.Errorf("tool '%s' not found", name)
} }
// If tool implements ContextualTool, set context
if contextualTool, ok := tool.(ContextualTool); ok && channel != "" && chatID != "" {
contextualTool.SetContext(channel, chatID)
}
start := time.Now() start := time.Now()
result, err := tool.Execute(ctx, args) result, err := tool.Execute(ctx, args)
duration := time.Since(start) duration := time.Since(start)

16
pkg/utils/string.go Normal file
View File

@@ -0,0 +1,16 @@
package utils
// Truncate returns a truncated version of s with at most maxLen runes.
// Handles multi-byte Unicode characters properly.
// If the string is truncated, "..." is appended to indicate truncation.
func Truncate(s string, maxLen int) string {
runes := []rune(s)
if len(runes) <= maxLen {
return s
}
// Reserve 3 chars for "..."
if maxLen <= 3 {
return string(runes[:maxLen])
}
return string(runes[:maxLen-3]) + "..."
}