Files
picoclaw/pkg/cron/service.go
2026-02-09 19:20:19 +08:00

382 lines
7.3 KiB
Go

package cron
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
type CronSchedule struct {
Kind string `json:"kind"`
AtMS *int64 `json:"atMs,omitempty"`
EveryMS *int64 `json:"everyMs,omitempty"`
Expr string `json:"expr,omitempty"`
TZ string `json:"tz,omitempty"`
}
type CronPayload struct {
Kind string `json:"kind"`
Message string `json:"message"`
Deliver bool `json:"deliver"`
Channel string `json:"channel,omitempty"`
To string `json:"to,omitempty"`
}
type CronJobState struct {
NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"`
LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"`
LastStatus string `json:"lastStatus,omitempty"`
LastError string `json:"lastError,omitempty"`
}
type CronJob struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
Schedule CronSchedule `json:"schedule"`
Payload CronPayload `json:"payload"`
State CronJobState `json:"state"`
CreatedAtMS int64 `json:"createdAtMs"`
UpdatedAtMS int64 `json:"updatedAtMs"`
DeleteAfterRun bool `json:"deleteAfterRun"`
}
type CronStore struct {
Version int `json:"version"`
Jobs []CronJob `json:"jobs"`
}
type JobHandler func(job *CronJob) (string, error)
type CronService struct {
storePath string
store *CronStore
onJob JobHandler
mu sync.RWMutex
running bool
stopChan chan struct{}
}
func NewCronService(storePath string, onJob JobHandler) *CronService {
cs := &CronService{
storePath: storePath,
onJob: onJob,
stopChan: make(chan struct{}),
}
cs.loadStore()
return cs
}
func (cs *CronService) Start() error {
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.running {
return nil
}
if err := cs.loadStore(); err != nil {
return fmt.Errorf("failed to load store: %w", err)
}
cs.recomputeNextRuns()
if err := cs.saveStore(); err != nil {
return fmt.Errorf("failed to save store: %w", err)
}
cs.running = true
go cs.runLoop()
return nil
}
func (cs *CronService) Stop() {
cs.mu.Lock()
defer cs.mu.Unlock()
if !cs.running {
return
}
cs.running = false
close(cs.stopChan)
}
func (cs *CronService) runLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-cs.stopChan:
return
case <-ticker.C:
cs.checkJobs()
}
}
}
func (cs *CronService) checkJobs() {
cs.mu.RLock()
if !cs.running {
cs.mu.RUnlock()
return
}
now := time.Now().UnixMilli()
var dueJobs []*CronJob
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now {
dueJobs = append(dueJobs, job)
}
}
cs.mu.RUnlock()
for _, job := range dueJobs {
cs.executeJob(job)
}
cs.mu.Lock()
defer cs.mu.Unlock()
cs.saveStore()
}
func (cs *CronService) executeJob(job *CronJob) {
startTime := time.Now().UnixMilli()
var err error
if cs.onJob != nil {
_, err = cs.onJob(job)
}
cs.mu.Lock()
defer cs.mu.Unlock()
job.State.LastRunAtMS = &startTime
job.UpdatedAtMS = time.Now().UnixMilli()
if err != nil {
job.State.LastStatus = "error"
job.State.LastError = err.Error()
} else {
job.State.LastStatus = "ok"
job.State.LastError = ""
}
if job.Schedule.Kind == "at" {
if job.DeleteAfterRun {
cs.removeJobUnsafe(job.ID)
} else {
job.Enabled = false
job.State.NextRunAtMS = nil
}
} else {
nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
job.State.NextRunAtMS = nextRun
}
}
func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 {
if schedule.Kind == "at" {
if schedule.AtMS != nil && *schedule.AtMS > nowMS {
return schedule.AtMS
}
return nil
}
if schedule.Kind == "every" {
if schedule.EveryMS == nil || *schedule.EveryMS <= 0 {
return nil
}
next := nowMS + *schedule.EveryMS
return &next
}
return nil
}
func (cs *CronService) recomputeNextRuns() {
now := time.Now().UnixMilli()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now)
}
}
}
func (cs *CronService) getNextWakeMS() *int64 {
var nextWake *int64
for _, job := range cs.store.Jobs {
if job.Enabled && job.State.NextRunAtMS != nil {
if nextWake == nil || *job.State.NextRunAtMS < *nextWake {
nextWake = job.State.NextRunAtMS
}
}
}
return nextWake
}
func (cs *CronService) Load() error {
return cs.loadStore()
}
func (cs *CronService) loadStore() error {
cs.store = &CronStore{
Version: 1,
Jobs: []CronJob{},
}
data, err := os.ReadFile(cs.storePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return json.Unmarshal(data, cs.store)
}
func (cs *CronService) saveStore() error {
dir := filepath.Dir(cs.storePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
data, err := json.MarshalIndent(cs.store, "", " ")
if err != nil {
return err
}
return os.WriteFile(cs.storePath, data, 0644)
}
func (cs *CronService) AddJob(name string, schedule CronSchedule, message string, deliver bool, channel, to string) (*CronJob, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
now := time.Now().UnixMilli()
job := CronJob{
ID: generateID(),
Name: name,
Enabled: true,
Schedule: schedule,
Payload: CronPayload{
Kind: "agent_turn",
Message: message,
Deliver: deliver,
Channel: channel,
To: to,
},
State: CronJobState{
NextRunAtMS: cs.computeNextRun(&schedule, now),
},
CreatedAtMS: now,
UpdatedAtMS: now,
DeleteAfterRun: false,
}
cs.store.Jobs = append(cs.store.Jobs, job)
if err := cs.saveStore(); err != nil {
return nil, err
}
return &job, nil
}
func (cs *CronService) RemoveJob(jobID string) bool {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.removeJobUnsafe(jobID)
}
func (cs *CronService) removeJobUnsafe(jobID string) bool {
before := len(cs.store.Jobs)
var jobs []CronJob
for _, job := range cs.store.Jobs {
if job.ID != jobID {
jobs = append(jobs, job)
}
}
cs.store.Jobs = jobs
removed := len(cs.store.Jobs) < before
if removed {
cs.saveStore()
}
return removed
}
func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
cs.mu.Lock()
defer cs.mu.Unlock()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.ID == jobID {
job.Enabled = enabled
job.UpdatedAtMS = time.Now().UnixMilli()
if enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
} else {
job.State.NextRunAtMS = nil
}
cs.saveStore()
return job
}
}
return nil
}
func (cs *CronService) ListJobs(includeDisabled bool) []CronJob {
cs.mu.RLock()
defer cs.mu.RUnlock()
if includeDisabled {
return cs.store.Jobs
}
var enabled []CronJob
for _, job := range cs.store.Jobs {
if job.Enabled {
enabled = append(enabled, job)
}
}
return enabled
}
func (cs *CronService) Status() map[string]interface{} {
cs.mu.RLock()
defer cs.mu.RUnlock()
var enabledCount int
for _, job := range cs.store.Jobs {
if job.Enabled {
enabledCount++
}
}
return map[string]interface{}{
"enabled": cs.running,
"jobs": len(cs.store.Jobs),
"nextWakeAtMS": cs.getNextWakeMS(),
}
}
func generateID() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}