Spaces:
Configuration error
Configuration error
| package model | |
| import ( | |
| "sync" | |
| "time" | |
| process "github.com/mudler/go-processmanager" | |
| "github.com/rs/zerolog/log" | |
| ) | |
| // WatchDog tracks all the requests from GRPC clients. | |
| // All GRPC Clients created by ModelLoader should have an associated injected | |
| // watchdog that will keep track of the state of each backend (busy or not) | |
| // and for how much time it has been busy. | |
| // If a backend is busy for too long, the watchdog will kill the process and | |
| // force a reload of the model | |
| // The watchdog runs as a separate go routine, | |
| // and the GRPC client talks to it via a channel to send status updates | |
| type WatchDog struct { | |
| sync.Mutex | |
| timetable map[string]time.Time | |
| idleTime map[string]time.Time | |
| timeout, idletimeout time.Duration | |
| addressMap map[string]*process.Process | |
| addressModelMap map[string]string | |
| pm ProcessManager | |
| stop chan bool | |
| busyCheck, idleCheck bool | |
| } | |
| type ProcessManager interface { | |
| ShutdownModel(modelName string) error | |
| } | |
| func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog { | |
| return &WatchDog{ | |
| timeout: timeoutBusy, | |
| idletimeout: timeoutIdle, | |
| pm: pm, | |
| timetable: make(map[string]time.Time), | |
| idleTime: make(map[string]time.Time), | |
| addressMap: make(map[string]*process.Process), | |
| busyCheck: busy, | |
| idleCheck: idle, | |
| addressModelMap: make(map[string]string), | |
| } | |
| } | |
| func (wd *WatchDog) Shutdown() { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| wd.stop <- true | |
| } | |
| func (wd *WatchDog) AddAddressModelMap(address string, model string) { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| wd.addressModelMap[address] = model | |
| } | |
| func (wd *WatchDog) Add(address string, p *process.Process) { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| wd.addressMap[address] = p | |
| } | |
| func (wd *WatchDog) Mark(address string) { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| wd.timetable[address] = time.Now() | |
| delete(wd.idleTime, address) | |
| } | |
| func (wd *WatchDog) UnMark(ModelAddress string) { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| delete(wd.timetable, ModelAddress) | |
| wd.idleTime[ModelAddress] = time.Now() | |
| } | |
| func (wd *WatchDog) Run() { | |
| log.Info().Msg("[WatchDog] starting watchdog") | |
| for { | |
| select { | |
| case <-wd.stop: | |
| log.Info().Msg("[WatchDog] Stopping watchdog") | |
| return | |
| case <-time.After(30 * time.Second): | |
| if !wd.busyCheck && !wd.idleCheck { | |
| log.Info().Msg("[WatchDog] No checks enabled, stopping watchdog") | |
| return | |
| } | |
| if wd.busyCheck { | |
| wd.checkBusy() | |
| } | |
| if wd.idleCheck { | |
| wd.checkIdle() | |
| } | |
| } | |
| } | |
| } | |
| func (wd *WatchDog) checkIdle() { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| log.Debug().Msg("[WatchDog] Watchdog checks for idle connections") | |
| for address, t := range wd.idleTime { | |
| log.Debug().Msgf("[WatchDog] %s: idle connection", address) | |
| if time.Since(t) > wd.idletimeout { | |
| log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address) | |
| model, ok := wd.addressModelMap[address] | |
| if ok { | |
| if err := wd.pm.ShutdownModel(model); err != nil { | |
| log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") | |
| } | |
| log.Debug().Msgf("[WatchDog] model shut down: %s", address) | |
| delete(wd.idleTime, address) | |
| delete(wd.addressModelMap, address) | |
| delete(wd.addressMap, address) | |
| } else { | |
| log.Warn().Msgf("[WatchDog] Address %s unresolvable", address) | |
| delete(wd.idleTime, address) | |
| } | |
| } | |
| } | |
| } | |
| func (wd *WatchDog) checkBusy() { | |
| wd.Lock() | |
| defer wd.Unlock() | |
| log.Debug().Msg("[WatchDog] Watchdog checks for busy connections") | |
| for address, t := range wd.timetable { | |
| log.Debug().Msgf("[WatchDog] %s: active connection", address) | |
| if time.Since(t) > wd.timeout { | |
| model, ok := wd.addressModelMap[address] | |
| if ok { | |
| log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model) | |
| if err := wd.pm.ShutdownModel(model); err != nil { | |
| log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") | |
| } | |
| log.Debug().Msgf("[WatchDog] model shut down: %s", address) | |
| delete(wd.timetable, address) | |
| delete(wd.addressModelMap, address) | |
| delete(wd.addressMap, address) | |
| } else { | |
| log.Warn().Msgf("[WatchDog] Address %s unresolvable", address) | |
| delete(wd.timetable, address) | |
| } | |
| } | |
| } | |
| } | |