Files
picoclaw/pkg/cron/service.go
yinwm 6d4d2bc61e feat: add cron tool integration with agent
- Add adhocore/gronx dependency for cron expression parsing
- Fix CronService race conditions and add cron expression support
- Add CronTool with add/list/remove/enable/disable actions
- Add ContextualTool interface for tools needing channel/chatID context
- Add ProcessDirectWithChannel to AgentLoop for cron job execution
- Register CronTool in gateway and wire up onJob handler
- Fix slice bounds panic in addJob for short messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 20:28:46 +08:00

454 lines
9.4 KiB
Go

package cron
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/adhocore/gronx"
)
type CronSchedule struct {
Kind string `json:"kind"`
AtMS *int64 `json:"atMs,omitempty"`
EveryMS *int64 `json:"everyMs,omitempty"`
Expr string `json:"expr,omitempty"`
TZ string `json:"tz,omitempty"`
}
type CronPayload struct {
Kind string `json:"kind"`
Message string `json:"message"`
Deliver bool `json:"deliver"`
Channel string `json:"channel,omitempty"`
To string `json:"to,omitempty"`
}
type CronJobState struct {
NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"`
LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"`
LastStatus string `json:"lastStatus,omitempty"`
LastError string `json:"lastError,omitempty"`
}
type CronJob struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
Schedule CronSchedule `json:"schedule"`
Payload CronPayload `json:"payload"`
State CronJobState `json:"state"`
CreatedAtMS int64 `json:"createdAtMs"`
UpdatedAtMS int64 `json:"updatedAtMs"`
DeleteAfterRun bool `json:"deleteAfterRun"`
}
type CronStore struct {
Version int `json:"version"`
Jobs []CronJob `json:"jobs"`
}
type JobHandler func(job *CronJob) (string, error)
type CronService struct {
storePath string
store *CronStore
onJob JobHandler
mu sync.RWMutex
running bool
stopChan chan struct{}
gronx *gronx.Gronx
}
func NewCronService(storePath string, onJob JobHandler) *CronService {
cs := &CronService{
storePath: storePath,
onJob: onJob,
stopChan: make(chan struct{}),
gronx: gronx.New(),
}
// Initialize and load store on creation
cs.loadStore()
return cs
}
func (cs *CronService) Start() error {
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.running {
return nil
}
if err := cs.loadStore(); err != nil {
return fmt.Errorf("failed to load store: %w", err)
}
cs.recomputeNextRuns()
if err := cs.saveStoreUnsafe(); err != nil {
return fmt.Errorf("failed to save store: %w", err)
}
cs.running = true
go cs.runLoop()
return nil
}
func (cs *CronService) Stop() {
cs.mu.Lock()
defer cs.mu.Unlock()
if !cs.running {
return
}
cs.running = false
close(cs.stopChan)
}
func (cs *CronService) runLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-cs.stopChan:
return
case <-ticker.C:
cs.checkJobs()
}
}
}
func (cs *CronService) checkJobs() {
cs.mu.Lock()
if !cs.running {
cs.mu.Unlock()
return
}
now := time.Now().UnixMilli()
var dueJobs []*CronJob
// Collect jobs that are due (we need to copy them to execute outside lock)
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now {
// Create a shallow copy of the job for execution
jobCopy := *job
dueJobs = append(dueJobs, &jobCopy)
}
}
// Update next run times for due jobs immediately (before executing)
for i := range cs.store.Jobs {
for _, dueJob := range dueJobs {
if cs.store.Jobs[i].ID == dueJob.ID {
// Reset NextRunAtMS temporarily so we don't re-execute
cs.store.Jobs[i].State.NextRunAtMS = nil
break
}
}
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
}
cs.mu.Unlock()
// Execute jobs outside the lock
for _, job := range dueJobs {
cs.executeJob(job)
}
}
func (cs *CronService) executeJob(job *CronJob) {
startTime := time.Now().UnixMilli()
var err error
if cs.onJob != nil {
_, err = cs.onJob(job)
}
// Now acquire lock to update state
cs.mu.Lock()
defer cs.mu.Unlock()
// Find the job in store and update it
for i := range cs.store.Jobs {
if cs.store.Jobs[i].ID == job.ID {
cs.store.Jobs[i].State.LastRunAtMS = &startTime
cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli()
if err != nil {
cs.store.Jobs[i].State.LastStatus = "error"
cs.store.Jobs[i].State.LastError = err.Error()
} else {
cs.store.Jobs[i].State.LastStatus = "ok"
cs.store.Jobs[i].State.LastError = ""
}
// Compute next run time
if cs.store.Jobs[i].Schedule.Kind == "at" {
if cs.store.Jobs[i].DeleteAfterRun {
cs.removeJobUnsafe(job.ID)
} else {
cs.store.Jobs[i].Enabled = false
cs.store.Jobs[i].State.NextRunAtMS = nil
}
} else {
nextRun := cs.computeNextRun(&cs.store.Jobs[i].Schedule, time.Now().UnixMilli())
cs.store.Jobs[i].State.NextRunAtMS = nextRun
}
break
}
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
}
}
func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 {
if schedule.Kind == "at" {
if schedule.AtMS != nil && *schedule.AtMS > nowMS {
return schedule.AtMS
}
return nil
}
if schedule.Kind == "every" {
if schedule.EveryMS == nil || *schedule.EveryMS <= 0 {
return nil
}
next := nowMS + *schedule.EveryMS
return &next
}
if schedule.Kind == "cron" {
if schedule.Expr == "" {
return nil
}
// Use gronx to calculate next run time
now := time.UnixMilli(nowMS)
nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false)
if err != nil {
log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err)
return nil
}
nextMS := nextTime.UnixMilli()
return &nextMS
}
return nil
}
func (cs *CronService) recomputeNextRuns() {
now := time.Now().UnixMilli()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now)
}
}
}
func (cs *CronService) getNextWakeMS() *int64 {
var nextWake *int64
for _, job := range cs.store.Jobs {
if job.Enabled && job.State.NextRunAtMS != nil {
if nextWake == nil || *job.State.NextRunAtMS < *nextWake {
nextWake = job.State.NextRunAtMS
}
}
}
return nextWake
}
func (cs *CronService) Load() error {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.loadStore()
}
func (cs *CronService) SetOnJob(handler JobHandler) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.onJob = handler
}
func (cs *CronService) loadStore() error {
cs.store = &CronStore{
Version: 1,
Jobs: []CronJob{},
}
data, err := os.ReadFile(cs.storePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return json.Unmarshal(data, cs.store)
}
func (cs *CronService) saveStoreUnsafe() error {
dir := filepath.Dir(cs.storePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
data, err := json.MarshalIndent(cs.store, "", " ")
if err != nil {
return err
}
return os.WriteFile(cs.storePath, data, 0644)
}
func (cs *CronService) AddJob(name string, schedule CronSchedule, message string, deliver bool, channel, to string) (*CronJob, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
now := time.Now().UnixMilli()
job := CronJob{
ID: generateID(),
Name: name,
Enabled: true,
Schedule: schedule,
Payload: CronPayload{
Kind: "agent_turn",
Message: message,
Deliver: deliver,
Channel: channel,
To: to,
},
State: CronJobState{
NextRunAtMS: cs.computeNextRun(&schedule, now),
},
CreatedAtMS: now,
UpdatedAtMS: now,
DeleteAfterRun: false,
}
cs.store.Jobs = append(cs.store.Jobs, job)
if err := cs.saveStoreUnsafe(); err != nil {
return nil, err
}
return &job, nil
}
func (cs *CronService) RemoveJob(jobID string) bool {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.removeJobUnsafe(jobID)
}
func (cs *CronService) removeJobUnsafe(jobID string) bool {
before := len(cs.store.Jobs)
var jobs []CronJob
for _, job := range cs.store.Jobs {
if job.ID != jobID {
jobs = append(jobs, job)
}
}
cs.store.Jobs = jobs
removed := len(cs.store.Jobs) < before
if removed {
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after remove: %v", err)
}
}
return removed
}
func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
cs.mu.Lock()
defer cs.mu.Unlock()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.ID == jobID {
job.Enabled = enabled
job.UpdatedAtMS = time.Now().UnixMilli()
if enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
} else {
job.State.NextRunAtMS = nil
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after enable: %v", err)
}
return job
}
}
return nil
}
func (cs *CronService) ListJobs(includeDisabled bool) []CronJob {
cs.mu.RLock()
defer cs.mu.RUnlock()
if includeDisabled {
return cs.store.Jobs
}
var enabled []CronJob
for _, job := range cs.store.Jobs {
if job.Enabled {
enabled = append(enabled, job)
}
}
return enabled
}
func (cs *CronService) Status() map[string]interface{} {
cs.mu.RLock()
defer cs.mu.RUnlock()
var enabledCount int
for _, job := range cs.store.Jobs {
if job.Enabled {
enabledCount++
}
}
return map[string]interface{}{
"enabled": cs.running,
"jobs": len(cs.store.Jobs),
"nextWakeAtMS": cs.getNextWakeMS(),
}
}
func generateID() string {
// Use crypto/rand for better uniqueness under concurrent access
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
// Fallback to time-based if crypto/rand fails
return fmt.Sprintf("%d", time.Now().UnixNano())
}
return hex.EncodeToString(b)
}