package tools import ( "context" "fmt" "sync" "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/cron" "github.com/sipeed/picoclaw/pkg/utils" ) // JobExecutor is the interface for executing cron jobs through the agent type JobExecutor interface { ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) } // CronTool provides scheduling capabilities for the agent type CronTool struct { cronService *cron.CronService executor JobExecutor msgBus *bus.MessageBus execTool *ExecTool channel string chatID string mu sync.RWMutex } // NewCronTool creates a new CronTool func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, restrict bool) *CronTool { return &CronTool{ cronService: cronService, executor: executor, msgBus: msgBus, execTool: NewExecTool(workspace, restrict), } } // Name returns the tool name func (t *CronTool) Name() string { return "cron" } // Description returns the tool description func (t *CronTool) Description() string { return "Schedule reminders, tasks, or system commands. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules. Use 'command' to execute shell commands directly." } // Parameters returns the tool parameters schema func (t *CronTool) Parameters() map[string]interface{} { return map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "action": map[string]interface{}{ "type": "string", "enum": []string{"add", "list", "remove", "enable", "disable"}, "description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.", }, "message": map[string]interface{}{ "type": "string", "description": "The reminder/task message to display when triggered. If 'command' is used, this describes what the command does.", }, "command": map[string]interface{}{ "type": "string", "description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message. 'deliver' will be forced to false for commands.", }, "at_seconds": map[string]interface{}{ "type": "integer", "description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.", }, "every_seconds": map[string]interface{}{ "type": "integer", "description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.", }, "cron_expr": map[string]interface{}{ "type": "string", "description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.", }, "job_id": map[string]interface{}{ "type": "string", "description": "Job ID (for remove/enable/disable)", }, "deliver": map[string]interface{}{ "type": "boolean", "description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: true", }, }, "required": []string{"action"}, } } // SetContext sets the current session context for job creation func (t *CronTool) SetContext(channel, chatID string) { t.mu.Lock() defer t.mu.Unlock() t.channel = channel t.chatID = chatID } // Execute runs the tool with the given arguments func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) *ToolResult { action, ok := args["action"].(string) if !ok { return ErrorResult("action is required") } switch action { case "add": return t.addJob(args) case "list": return t.listJobs() case "remove": return t.removeJob(args) case "enable": return t.enableJob(args, true) case "disable": return t.enableJob(args, false) default: return ErrorResult(fmt.Sprintf("unknown action: %s", action)) } } func (t *CronTool) addJob(args map[string]interface{}) *ToolResult { t.mu.RLock() channel := t.channel chatID := t.chatID t.mu.RUnlock() if channel == "" || chatID == "" { return ErrorResult("no session context (channel/chat_id not set). Use this tool in an active conversation.") } message, ok := args["message"].(string) if !ok || message == "" { return ErrorResult("message is required for add") } var schedule cron.CronSchedule // Check for at_seconds (one-time), every_seconds (recurring), or cron_expr atSeconds, hasAt := args["at_seconds"].(float64) everySeconds, hasEvery := args["every_seconds"].(float64) cronExpr, hasCron := args["cron_expr"].(string) // Priority: at_seconds > every_seconds > cron_expr if hasAt { atMS := time.Now().UnixMilli() + int64(atSeconds)*1000 schedule = cron.CronSchedule{ Kind: "at", AtMS: &atMS, } } else if hasEvery { everyMS := int64(everySeconds) * 1000 schedule = cron.CronSchedule{ Kind: "every", EveryMS: &everyMS, } } else if hasCron { schedule = cron.CronSchedule{ Kind: "cron", Expr: cronExpr, } } else { return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required") } // Read deliver parameter, default to true deliver := true if d, ok := args["deliver"].(bool); ok { deliver = d } command, _ := args["command"].(string) if command != "" { // Commands must be processed by agent/exec tool, so deliver must be false (or handled specifically) // Actually, let's keep deliver=false to let the system know it's not a simple chat message // But for our new logic in ExecuteJob, we can handle it regardless of deliver flag if Payload.Command is set. // However, logically, it's not "delivered" to chat directly as is. deliver = false } // Truncate message for job name (max 30 chars) messagePreview := utils.Truncate(message, 30) job, err := t.cronService.AddJob( messagePreview, schedule, message, deliver, channel, chatID, ) if err != nil { return ErrorResult(fmt.Sprintf("Error adding job: %v", err)) } if command != "" { job.Payload.Command = command // Need to save the updated payload t.cronService.UpdateJob(job) } return SilentResult(fmt.Sprintf("Cron job added: %s (id: %s)", job.Name, job.ID)) } func (t *CronTool) listJobs() *ToolResult { jobs := t.cronService.ListJobs(false) if len(jobs) == 0 { return SilentResult("No scheduled jobs") } result := "Scheduled jobs:\n" for _, j := range jobs { var scheduleInfo string if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil { scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000) } else if j.Schedule.Kind == "cron" { scheduleInfo = j.Schedule.Expr } else if j.Schedule.Kind == "at" { scheduleInfo = "one-time" } else { scheduleInfo = "unknown" } result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo) } return SilentResult(result) } func (t *CronTool) removeJob(args map[string]interface{}) *ToolResult { jobID, ok := args["job_id"].(string) if !ok || jobID == "" { return ErrorResult("job_id is required for remove") } if t.cronService.RemoveJob(jobID) { return SilentResult(fmt.Sprintf("Cron job removed: %s", jobID)) } return ErrorResult(fmt.Sprintf("Job %s not found", jobID)) } func (t *CronTool) enableJob(args map[string]interface{}, enable bool) *ToolResult { jobID, ok := args["job_id"].(string) if !ok || jobID == "" { return ErrorResult("job_id is required for enable/disable") } job := t.cronService.EnableJob(jobID, enable) if job == nil { return ErrorResult(fmt.Sprintf("Job %s not found", jobID)) } status := "enabled" if !enable { status = "disabled" } return SilentResult(fmt.Sprintf("Cron job '%s' %s", job.Name, status)) } // ExecuteJob executes a cron job through the agent func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { // Get channel/chatID from job payload channel := job.Payload.Channel chatID := job.Payload.To // Default values if not set if channel == "" { channel = "cli" } if chatID == "" { chatID = "direct" } // Execute command if present if job.Payload.Command != "" { args := map[string]interface{}{ "command": job.Payload.Command, } result := t.execTool.Execute(ctx, args) var output string if result.IsError { output = fmt.Sprintf("Error executing scheduled command: %s", result.ForLLM) } else { output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM) } t.msgBus.PublishOutbound(bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: output, }) return "ok" } // If deliver=true, send message directly without agent processing if job.Payload.Deliver { t.msgBus.PublishOutbound(bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: job.Payload.Message, }) return "ok" } // For deliver=false, process through agent (for complex tasks) sessionKey := fmt.Sprintf("cron-%s", job.ID) // Call agent with job's message response, err := t.executor.ProcessDirectWithChannel( ctx, job.Payload.Message, sessionKey, channel, chatID, ) if err != nil { return fmt.Sprintf("Error: %v", err) } // Response is automatically sent via MessageBus by AgentLoop _ = response // Will be sent by AgentLoop return "ok" }