package main import ( "bytes" "context" "database/sql" "embed" "encoding/json" "errors" "fmt" "html/template" "io/fs" "log" "math" "net/http" "os" "os/signal" "path/filepath" "reflect" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/fsnotify/fsnotify" _ "github.com/mattn/go-sqlite3" "github.com/robinson/gos7" "gopkg.in/yaml.v3" ) //go:embed static var staticFiles embed.FS const version = "0.9.2" // --------------------------------------------------------------------------- // Config structs // --------------------------------------------------------------------------- type Config struct { Server ServerConfig `yaml:"server"` PLC PLCConfig `yaml:"plc"` Thresholds ThresholdsConfig `yaml:"thresholds"` Trend TrendConfig `yaml:"trend"` Press PressConfig `yaml:"press"` UI UIConfig `yaml:"ui"` Modules ModulesConfig `yaml:"modules"` DB DBConfig `yaml:"db"` MQTT MQTTConfig `yaml:"mqtt"` } type ServerConfig struct { ListenAddr string `yaml:"listen_addr"` } type PLCConfig struct { IP string `yaml:"ip"` DBNum int `yaml:"db_num"` Rack int `yaml:"rack"` Slot int `yaml:"slot"` PollMs int `yaml:"poll_ms"` ConnectTimeoutSec int `yaml:"connect_timeout_sec"` IdleTimeoutSec int `yaml:"idle_timeout_sec"` ReconnectDelaySec int `yaml:"reconnect_delay_sec"` } type ThresholdsConfig struct { WarningPercent float64 `yaml:"warning_percent"` CriticalPercent float64 `yaml:"critical_percent"` GaugeMaxPercent float64 `yaml:"gauge_max_percent"` ImbalanceWarningPercent float64 `yaml:"imbalance_warning_percent"` ImbalanceCriticalPercent float64 `yaml:"imbalance_critical_percent"` LegacyWarningKn float64 `yaml:"warning_kn,omitempty"` LegacyCriticalKn float64 `yaml:"critical_kn,omitempty"` LegacyMaxKn float64 `yaml:"max_kn,omitempty"` } type TrendConfig struct { Minutes int `yaml:"minutes"` } type PressConfig struct { MaxTonnage float64 `yaml:"MAX_TONNAGE"` LegacyMaxTonnage float64 `yaml:"max_tonnage,omitempty"` } type UIConfig struct { Title string `yaml:"title"` Subtitle string `yaml:"subtitle"` LeftLabel string `yaml:"left_label"` RightLabel string `yaml:"right_label"` UnitForce string `yaml:"unit_force"` UnitPct string `yaml:"unit_percent"` } type ModulesConfig struct { ShowHeaderControls *bool `yaml:"show_header_controls,omitempty"` ShowVerdict *bool `yaml:"show_verdict,omitempty"` ShowSummaryBar *bool `yaml:"show_summary_bar,omitempty"` ShowOverview *bool `yaml:"show_overview,omitempty"` ShowIntelligence *bool `yaml:"show_intelligence,omitempty"` ShowAlarmTimeline *bool `yaml:"show_alarm_timeline,omitempty"` ShowGauges *bool `yaml:"show_gauges,omitempty"` ShowGaugeDigital *bool `yaml:"show_gauge_digital,omitempty"` ShowTrendChart *bool `yaml:"show_trend_chart,omitempty"` } type DBConfig struct { Path string `yaml:"path"` BusyTimeoutMs int `yaml:"busy_timeout_ms"` BatchSize int `yaml:"batch_size"` FlushIntervalMs int `yaml:"flush_interval_ms"` RetentionDays int `yaml:"retention_days"` MaxChartPoints int `yaml:"max_chart_points"` WriterQueueSize int `yaml:"writer_queue_size"` AlarmQueueSize int `yaml:"alarm_queue_size"` CheckpointPages int `yaml:"checkpoint_pages"` CleanupIntervalHr int `yaml:"cleanup_interval_hours"` } // MQTTConfig holds all MQTT broker and publishing settings. // Requires restart when changed — not hot-reloadable. type MQTTConfig struct { Enabled bool `yaml:"enabled"` Broker string `yaml:"broker"` // e.g. "tcp://192.168.1.10:1883" ClientID string `yaml:"client_id"` // unique client identifier Username string `yaml:"username"` // leave blank if no auth Password string `yaml:"password"` // leave blank if no auth TopicPrefix string `yaml:"topic_prefix"` // e.g. "plant1/press3" QoS int `yaml:"qos"` // 0, 1, or 2 Retain bool `yaml:"retain"` // retain auto-published state msgs AutoPublish bool `yaml:"auto_publish"` // publish PLC state on timer PublishIntervalMs int `yaml:"publish_interval_ms"` // how often to auto-publish ConnectTimeoutSec int `yaml:"connect_timeout_sec"` ReconnectDelaySec int `yaml:"reconnect_delay_sec"` } // --------------------------------------------------------------------------- // Config helpers // --------------------------------------------------------------------------- func boolPtr(v bool) *bool { return &v } func boolValue(v *bool, def bool) bool { if v == nil { return def } return *v } func defaultConfig() Config { return Config{ Server: ServerConfig{ListenAddr: ":8080"}, PLC: PLCConfig{ IP: "192.168.0.1", DBNum: 1001, Rack: 0, Slot: 1, PollMs: 500, ConnectTimeoutSec: 5, IdleTimeoutSec: 5, ReconnectDelaySec: 5, }, Thresholds: ThresholdsConfig{ WarningPercent: 80, CriticalPercent: 95, GaugeMaxPercent: 130, ImbalanceWarningPercent: 10, ImbalanceCriticalPercent: 20, }, Trend: TrendConfig{Minutes: 5}, Press: PressConfig{MaxTonnage: 64}, UI: UIConfig{ Title: "Force Monitor", Subtitle: "Siemens S7-1215C • Piezo peak/stroke input • PLC values in % • kN calculated from MAX_TONNAGE", LeftLabel: "LEVI STEBER", RightLabel: "DESNI STEBER", UnitForce: "kN", UnitPct: "%", }, Modules: ModulesConfig{ ShowHeaderControls: boolPtr(true), ShowVerdict: boolPtr(true), ShowSummaryBar: boolPtr(true), ShowOverview: boolPtr(true), ShowIntelligence: boolPtr(true), ShowAlarmTimeline: boolPtr(true), ShowGauges: boolPtr(true), ShowGaugeDigital: boolPtr(false), ShowTrendChart: boolPtr(true), }, DB: DBConfig{ Path: "force_monitor.db", BusyTimeoutMs: 5000, BatchSize: 32, FlushIntervalMs: 1000, RetentionDays: 30, MaxChartPoints: 2000, WriterQueueSize: 4096, AlarmQueueSize: 512, CheckpointPages: 1000, CleanupIntervalHr: 6, }, MQTT: MQTTConfig{ Enabled: false, Broker: "tcp://localhost:1883", ClientID: "force_monitor", TopicPrefix: "force_monitor", QoS: 1, Retain: false, AutoPublish: true, PublishIntervalMs: 1000, ConnectTimeoutSec: 10, ReconnectDelaySec: 5, }, } } func setIfZeroF(dst *float64, def float64) { if *dst <= 0 { *dst = def } } // setIfZeroI replaces *dst with def when *dst <= 0. // Note: this means config values of 0 are treated as "not set". // For PLC Rack (valid at 0), the default is also 0, so this is a safe no-op. // For PLC Slot, a value of 0 would be overwritten with default 1. func setIfZeroI(dst *int, def int) { if *dst <= 0 { *dst = def } } func setIfEmpty(dst *string, def string) { if strings.TrimSpace(*dst) == "" { *dst = def } } func setIfNilBool(dst **bool, def bool) { if *dst == nil { v := def *dst = &v } } func normalizeConfig(cfg *Config) { def := defaultConfig() setIfEmpty(&cfg.Server.ListenAddr, def.Server.ListenAddr) setIfEmpty(&cfg.PLC.IP, def.PLC.IP) setIfZeroI(&cfg.PLC.DBNum, def.PLC.DBNum) // Rack: default is 0 so setIfZeroI is a no-op; kept for symmetry. setIfZeroI(&cfg.PLC.Rack, def.PLC.Rack) setIfZeroI(&cfg.PLC.Slot, def.PLC.Slot) setIfZeroI(&cfg.PLC.PollMs, def.PLC.PollMs) setIfZeroI(&cfg.PLC.ConnectTimeoutSec, def.PLC.ConnectTimeoutSec) setIfZeroI(&cfg.PLC.IdleTimeoutSec, def.PLC.IdleTimeoutSec) setIfZeroI(&cfg.PLC.ReconnectDelaySec, def.PLC.ReconnectDelaySec) // Migrate legacy kN fields to percent fields if new fields are absent. if cfg.Thresholds.WarningPercent <= 0 && cfg.Thresholds.LegacyWarningKn > 0 { cfg.Thresholds.WarningPercent = cfg.Thresholds.LegacyWarningKn } if cfg.Thresholds.CriticalPercent <= 0 && cfg.Thresholds.LegacyCriticalKn > 0 { cfg.Thresholds.CriticalPercent = cfg.Thresholds.LegacyCriticalKn } if cfg.Thresholds.GaugeMaxPercent <= 0 && cfg.Thresholds.LegacyMaxKn > 0 { cfg.Thresholds.GaugeMaxPercent = cfg.Thresholds.LegacyMaxKn } setIfZeroF(&cfg.Thresholds.WarningPercent, def.Thresholds.WarningPercent) setIfZeroF(&cfg.Thresholds.CriticalPercent, def.Thresholds.CriticalPercent) setIfZeroF(&cfg.Thresholds.GaugeMaxPercent, def.Thresholds.GaugeMaxPercent) setIfZeroF(&cfg.Thresholds.ImbalanceWarningPercent, def.Thresholds.ImbalanceWarningPercent) setIfZeroF(&cfg.Thresholds.ImbalanceCriticalPercent, def.Thresholds.ImbalanceCriticalPercent) if cfg.Thresholds.ImbalanceCriticalPercent < cfg.Thresholds.ImbalanceWarningPercent { cfg.Thresholds.ImbalanceCriticalPercent = cfg.Thresholds.ImbalanceWarningPercent } setIfZeroI(&cfg.Trend.Minutes, def.Trend.Minutes) if cfg.Press.MaxTonnage <= 0 && cfg.Press.LegacyMaxTonnage > 0 { cfg.Press.MaxTonnage = cfg.Press.LegacyMaxTonnage } setIfZeroF(&cfg.Press.MaxTonnage, def.Press.MaxTonnage) setIfEmpty(&cfg.UI.Title, def.UI.Title) setIfEmpty(&cfg.UI.Subtitle, def.UI.Subtitle) setIfEmpty(&cfg.UI.LeftLabel, def.UI.LeftLabel) setIfEmpty(&cfg.UI.RightLabel, def.UI.RightLabel) setIfEmpty(&cfg.UI.UnitForce, def.UI.UnitForce) setIfEmpty(&cfg.UI.UnitPct, def.UI.UnitPct) setIfNilBool(&cfg.Modules.ShowHeaderControls, boolValue(def.Modules.ShowHeaderControls, true)) setIfNilBool(&cfg.Modules.ShowVerdict, boolValue(def.Modules.ShowVerdict, true)) setIfNilBool(&cfg.Modules.ShowSummaryBar, boolValue(def.Modules.ShowSummaryBar, true)) setIfNilBool(&cfg.Modules.ShowOverview, boolValue(def.Modules.ShowOverview, true)) setIfNilBool(&cfg.Modules.ShowIntelligence, boolValue(def.Modules.ShowIntelligence, true)) setIfNilBool(&cfg.Modules.ShowAlarmTimeline, boolValue(def.Modules.ShowAlarmTimeline, true)) setIfNilBool(&cfg.Modules.ShowGauges, boolValue(def.Modules.ShowGauges, true)) setIfNilBool(&cfg.Modules.ShowGaugeDigital, boolValue(def.Modules.ShowGaugeDigital, false)) setIfNilBool(&cfg.Modules.ShowTrendChart, boolValue(def.Modules.ShowTrendChart, true)) setIfEmpty(&cfg.DB.Path, def.DB.Path) setIfZeroI(&cfg.DB.BusyTimeoutMs, def.DB.BusyTimeoutMs) setIfZeroI(&cfg.DB.BatchSize, def.DB.BatchSize) setIfZeroI(&cfg.DB.FlushIntervalMs, def.DB.FlushIntervalMs) setIfZeroI(&cfg.DB.RetentionDays, def.DB.RetentionDays) setIfZeroI(&cfg.DB.MaxChartPoints, def.DB.MaxChartPoints) setIfZeroI(&cfg.DB.WriterQueueSize, def.DB.WriterQueueSize) setIfZeroI(&cfg.DB.AlarmQueueSize, def.DB.AlarmQueueSize) setIfZeroI(&cfg.DB.CheckpointPages, def.DB.CheckpointPages) setIfZeroI(&cfg.DB.CleanupIntervalHr, def.DB.CleanupIntervalHr) // MQTT normalization (only when enabled to avoid noisy defaults) if cfg.MQTT.Enabled { setIfEmpty(&cfg.MQTT.Broker, def.MQTT.Broker) setIfEmpty(&cfg.MQTT.ClientID, def.MQTT.ClientID) setIfEmpty(&cfg.MQTT.TopicPrefix, def.MQTT.TopicPrefix) if cfg.MQTT.QoS < 0 || cfg.MQTT.QoS > 2 { cfg.MQTT.QoS = def.MQTT.QoS } setIfZeroI(&cfg.MQTT.PublishIntervalMs, def.MQTT.PublishIntervalMs) setIfZeroI(&cfg.MQTT.ConnectTimeoutSec, def.MQTT.ConnectTimeoutSec) setIfZeroI(&cfg.MQTT.ReconnectDelaySec, def.MQTT.ReconnectDelaySec) } } func loadConfigStrict(configPath string) (Config, error) { cfg := defaultConfig() data, err := os.ReadFile(configPath) if err != nil { return cfg, fmt.Errorf("failed to read config file: %w", err) } dec := yaml.NewDecoder(bytes.NewReader(data)) dec.KnownFields(true) if err := dec.Decode(&cfg); err != nil { return cfg, fmt.Errorf("failed to parse config file: %w", err) } normalizeConfig(&cfg) return cfg, nil } func loadOrCreateConfig(configPath string) (Config, error) { cfg := defaultConfig() _, err := os.Stat(configPath) if errors.Is(err, os.ErrNotExist) { data, marshalErr := yaml.Marshal(&cfg) if marshalErr != nil { return cfg, fmt.Errorf("failed to marshal default config: %w", marshalErr) } if writeErr := os.WriteFile(configPath, data, 0644); writeErr != nil { return cfg, fmt.Errorf("failed to create config file: %w", writeErr) } log.Printf("config file not found, created default config: %s", configPath) return cfg, nil } if err != nil { return cfg, fmt.Errorf("failed to stat config file: %w", err) } return loadConfigStrict(configPath) } func validateConfig(cfg Config) error { if cfg.Thresholds.WarningPercent <= 0 { return fmt.Errorf("thresholds.warning_percent must be > 0") } if cfg.Thresholds.CriticalPercent < cfg.Thresholds.WarningPercent { return fmt.Errorf("thresholds.critical_percent must be >= thresholds.warning_percent") } if cfg.Thresholds.GaugeMaxPercent < cfg.Thresholds.CriticalPercent { return fmt.Errorf("thresholds.gauge_max_percent must be >= thresholds.critical_percent") } if cfg.Thresholds.ImbalanceWarningPercent <= 0 { return fmt.Errorf("thresholds.imbalance_warning_percent must be > 0") } if cfg.Thresholds.ImbalanceCriticalPercent < cfg.Thresholds.ImbalanceWarningPercent { return fmt.Errorf("thresholds.imbalance_critical_percent must be >= thresholds.imbalance_warning_percent") } if cfg.Trend.Minutes <= 0 { return fmt.Errorf("trend.minutes must be > 0") } if cfg.Press.MaxTonnage <= 0 { return fmt.Errorf("press.MAX_TONNAGE must be > 0") } if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.Broker) == "" { return fmt.Errorf("mqtt.broker must be set when mqtt.enabled is true") } if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.ClientID) == "" { return fmt.Errorf("mqtt.client_id must be set when mqtt.enabled is true") } return nil } // --------------------------------------------------------------------------- // Domain types // --------------------------------------------------------------------------- type Sample struct { TS time.Time SilaLPct float32 SilaRPct float32 SilaLKN float32 SilaRKN float32 SumPercent float32 SumKN float32 ImbalancePercent float32 BiasPercent float32 } type AlarmEvent struct { TS time.Time Severity string Source string Code string State string Message string Value float64 Limit float64 } type AppState struct { sync.RWMutex Connected bool SilaL float32 SilaR float32 SilaLkN float32 SilaRkN float32 SumPercent float32 SumkN float32 ImbalancePercent float32 BiasPercent float32 LastUpdate time.Time DroppedSamples uint64 DroppedEvents uint64 } type APIState struct { Connected bool `json:"connected"` SilaL float32 `json:"sila_l"` SilaR float32 `json:"sila_r"` SilaLkN float32 `json:"sila_l_kn"` SilaRkN float32 `json:"sila_r_kn"` SumPercent float32 `json:"sum_percent"` SumkN float32 `json:"sum_kn"` ImbalancePercent float32 `json:"imbalance_percent"` BiasPercent float32 `json:"bias_percent"` LastUpdate string `json:"last_update"` DroppedSamples uint64 `json:"dropped_samples"` DroppedEvents uint64 `json:"dropped_events"` } type HistoryPoint struct { Time string `json:"time"` SilaL float32 `json:"sila_l"` SilaR float32 `json:"sila_r"` } type HistoryResponse struct { Window string `json:"window"` Points []HistoryPoint `json:"points"` } type TrendResponse struct { Window string `json:"window"` AvgPeak5m float32 `json:"avg_peak_5m"` AvgPeak1h float32 `json:"avg_peak_1h"` AvgImbalance5m float32 `json:"avg_imbalance_5m"` AvgImbalance1h float32 `json:"avg_imbalance_1h"` ForceDeltaPct float32 `json:"force_delta_pct"` ImbalanceDeltaPct float32 `json:"imbalance_delta_pct"` ForceDirection string `json:"force_direction"` ImbalanceDirection string `json:"imbalance_direction"` ProcessStability string `json:"process_stability"` StabilityReason string `json:"stability_reason"` ForceStdDev float32 `json:"force_stddev"` ImbalanceStdDev float32 `json:"imbalance_stddev"` SampleCount int `json:"sample_count"` OldHalfCount int `json:"old_half_count"` NewHalfCount int `json:"new_half_count"` } type AlarmEventAPI struct { Time string `json:"time"` Severity string `json:"severity"` Source string `json:"source"` State string `json:"state"` Message string `json:"message"` Value float64 `json:"value"` Limit float64 `json:"limit"` } type AlarmResponse struct { Events []AlarmEventAPI `json:"events"` } type PageData struct { Title string Subtitle string LeftLabel string RightLabel string UnitForce string UnitPct string MaxTonnage float64 WarningPercent float64 CriticalPercent float64 GaugeMaxPercent float64 ImbalanceWarningPercent float64 ImbalanceCriticalPercent float64 PollMs int DefaultWindow string DefaultTrendWindow string ShowHeaderControls bool ShowVerdict bool ShowSummaryBar bool ShowOverview bool ShowIntelligence bool ShowAlarmTimeline bool ShowGauges bool ShowGaugeDigital bool ShowTrendChart bool UIRevision uint64 } type NumericStats struct { Avg float64 AvgSq float64 Min float64 Max float64 Count int } func (s NumericStats) StdDev() float64 { if s.Count <= 1 { return 0 } v := s.AvgSq - (s.Avg * s.Avg) if v < 0 { v = 0 } return math.Sqrt(v) } type AlarmTracker struct { sync.Mutex PLCKnown bool PLCConnected bool LeftZone string RightZone string ImbZone string } // --------------------------------------------------------------------------- // MQTT types // --------------------------------------------------------------------------- // MQTTReceivedMsg holds a single inbound MQTT message stored in history. type MQTTReceivedMsg struct { Topic string `json:"topic"` Payload string `json:"payload"` Retained bool `json:"retained"` Time string `json:"time"` } // MQTTPublishRequest is the body for POST /api/mqtt/publish. type MQTTPublishRequest struct { Topic string `json:"topic"` Payload string `json:"payload"` QoS int `json:"qos"` Retain bool `json:"retain"` } // MQTTSubscribeRequest is the body for POST /api/mqtt/subscribe. type MQTTSubscribeRequest struct { Topic string `json:"topic"` QoS int `json:"qos"` } // MQTTStatusResponse is returned by GET /api/mqtt/status. type MQTTStatusResponse struct { Enabled bool `json:"enabled"` Connected bool `json:"connected"` Broker string `json:"broker"` ClientID string `json:"client_id"` TopicPrefix string `json:"topic_prefix"` Subscribed []string `json:"subscribed"` LastError string `json:"last_error,omitempty"` } // mqttManager wraps the paho MQTT client and tracks connection state, // inbound message history, and active subscriptions. type mqttManager struct { mu sync.RWMutex client mqtt.Client connected bool lastErr string broker string clientID string prefix string msgHistory []MQTTReceivedMsg msgMax int subs map[string]byte // topic -> qos currently subscribed } // --------------------------------------------------------------------------- // Package-level singletons // --------------------------------------------------------------------------- var ( cfg Config cfgMu sync.RWMutex state AppState db *sql.DB sampleCh chan Sample alarmCh chan AlarmEvent alarmTracker AlarmTracker uiTemplate = template.Must(template.New("ui").Parse(uiHTML)) cachedUI []byte uiRevision uint64 = 1 // mqttMgr is nil when MQTT is disabled. mqttMgr *mqttManager ) // --------------------------------------------------------------------------- // MQTT manager // --------------------------------------------------------------------------- func newMQTTManager(mcfg MQTTConfig) *mqttManager { m := &mqttManager{ broker: mcfg.Broker, clientID: mcfg.ClientID, prefix: mcfg.TopicPrefix, msgMax: 500, subs: make(map[string]byte), } opts := mqtt.NewClientOptions() opts.AddBroker(mcfg.Broker) opts.SetClientID(mcfg.ClientID) if mcfg.Username != "" { opts.SetUsername(mcfg.Username) opts.SetPassword(mcfg.Password) } opts.SetAutoReconnect(true) opts.SetMaxReconnectInterval(time.Duration(mcfg.ReconnectDelaySec*6) * time.Second) opts.SetConnectTimeout(time.Duration(mcfg.ConnectTimeoutSec) * time.Second) opts.SetCleanSession(true) opts.SetKeepAlive(30 * time.Second) opts.SetOnConnectHandler(func(c mqtt.Client) { m.mu.Lock() m.connected = true m.lastErr = "" // Snapshot current subs before releasing lock. resubTopics := make(map[string]byte, len(m.subs)) for t, q := range m.subs { resubTopics[t] = q } m.mu.Unlock() log.Printf("MQTT connected to %s", mcfg.Broker) // Re-subscribe on reconnect. for topic, qos := range resubTopics { tok := c.Subscribe(topic, qos, m.messageHandler) if tok.Wait() && tok.Error() != nil { log.Printf("MQTT re-subscribe %s failed: %v", topic, tok.Error()) } } }) opts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { m.mu.Lock() m.connected = false if err != nil { m.lastErr = err.Error() } m.mu.Unlock() log.Printf("MQTT connection lost: %v", err) }) m.client = mqtt.NewClient(opts) return m } // connect dials the broker; paho handles all subsequent reconnections. func (m *mqttManager) connect() error { tok := m.client.Connect() if !tok.WaitTimeout(30 * time.Second) { return fmt.Errorf("MQTT connect timeout after 30s") } if err := tok.Error(); err != nil { m.mu.Lock() m.lastErr = err.Error() m.mu.Unlock() return err } return nil } // disconnect cleanly disconnects from the broker. func (m *mqttManager) disconnect() { if m.client != nil { m.client.Disconnect(500) } } // messageHandler is the default callback for all subscriptions. func (m *mqttManager) messageHandler(_ mqtt.Client, msg mqtt.Message) { entry := MQTTReceivedMsg{ Topic: msg.Topic(), Payload: string(msg.Payload()), Retained: msg.Retained(), Time: time.Now().UTC().Format(time.RFC3339Nano), } m.mu.Lock() m.msgHistory = append(m.msgHistory, entry) if len(m.msgHistory) > m.msgMax { // Trim oldest half to avoid O(n) shift on every overflow. half := m.msgMax / 2 copy(m.msgHistory, m.msgHistory[half:]) m.msgHistory = m.msgHistory[:m.msgMax-half] } m.mu.Unlock() } // publish sends a message; returns an error if not connected or publish fails. func (m *mqttManager) publish(topic, payload string, qos byte, retain bool) error { if topic == "" { return fmt.Errorf("topic must not be empty") } if qos > 2 { qos = 2 } m.mu.RLock() ok := m.connected m.mu.RUnlock() if !ok { return fmt.Errorf("MQTT not connected") } tok := m.client.Publish(topic, qos, retain, payload) if qos > 0 { if !tok.WaitTimeout(5 * time.Second) { return fmt.Errorf("MQTT publish timeout") } return tok.Error() } return nil } // subscribe adds a topic subscription; resubscribed automatically on reconnect. func (m *mqttManager) subscribe(topic string, qos byte) error { if topic == "" { return fmt.Errorf("topic must not be empty") } if qos > 2 { qos = 2 } m.mu.Lock() m.subs[topic] = qos m.mu.Unlock() m.mu.RLock() ok := m.connected m.mu.RUnlock() if !ok { // Will be subscribed upon reconnect via OnConnectHandler. return nil } tok := m.client.Subscribe(topic, qos, m.messageHandler) if tok.Wait() && tok.Error() != nil { return tok.Error() } return nil } // unsubscribe removes a topic subscription. func (m *mqttManager) unsubscribe(topic string) error { m.mu.Lock() delete(m.subs, topic) m.mu.Unlock() m.mu.RLock() ok := m.connected m.mu.RUnlock() if !ok { return nil } tok := m.client.Unsubscribe(topic) if tok.Wait() && tok.Error() != nil { return tok.Error() } return nil } // status returns a snapshot of connection state for the API. func (m *mqttManager) status() MQTTStatusResponse { m.mu.RLock() defer m.mu.RUnlock() subs := make([]string, 0, len(m.subs)) for t := range m.subs { subs = append(subs, t) } return MQTTStatusResponse{ Enabled: true, Connected: m.connected, Broker: m.broker, ClientID: m.clientID, TopicPrefix: m.prefix, Subscribed: subs, LastError: m.lastErr, } } // getMessages returns up to limit of the most recently received messages. func (m *mqttManager) getMessages(limit int) []MQTTReceivedMsg { if limit <= 0 { limit = 50 } if limit > 500 { limit = 500 } m.mu.RLock() defer m.mu.RUnlock() total := len(m.msgHistory) if total == 0 { return []MQTTReceivedMsg{} } start := 0 if total > limit { start = total - limit } out := make([]MQTTReceivedMsg, total-start) copy(out, m.msgHistory[start:]) return out } // startMQTTPublisher periodically publishes the current PLC state to MQTT. // Topics published: // // {prefix}/data – full JSON APIState (same as /api/data) // {prefix}/force/left – left column force in % // {prefix}/force/right – right column force in % // {prefix}/force/sum_kn – total kN // {prefix}/connected – PLC connection boolean func startMQTTPublisher(ctx context.Context) { if mqttMgr == nil { return } mcfg := getConfigSnapshot().MQTT if !mcfg.AutoPublish { return } interval := time.Duration(mcfg.PublishIntervalMs) * time.Millisecond ticker := time.NewTicker(interval) defer ticker.Stop() prefix := mcfg.TopicPrefix qos := byte(mcfg.QoS) retain := mcfg.Retain for { select { case <-ctx.Done(): return case <-ticker.C: s := snapshotState() full, err := json.Marshal(s) if err == nil { _ = mqttMgr.publish(prefix+"/data", string(full), qos, retain) } _ = mqttMgr.publish(prefix+"/force/left", fmt.Sprintf("%.2f", s.SilaL), qos, retain) _ = mqttMgr.publish(prefix+"/force/right", fmt.Sprintf("%.2f", s.SilaR), qos, retain) _ = mqttMgr.publish(prefix+"/force/sum_kn", fmt.Sprintf("%.2f", s.SumkN), qos, retain) _ = mqttMgr.publish(prefix+"/force/imbalance", fmt.Sprintf("%.2f", s.ImbalancePercent), qos, retain) connStr := "false" if s.Connected { connStr = "true" } _ = mqttMgr.publish(prefix+"/connected", connStr, qos, retain) } } } // mqttPublishAlarm forwards a single alarm event to MQTT non-blocking. // Called from enqueueAlarm; errors are silently discarded to avoid // blocking the alarm pipeline. func mqttPublishAlarm(a AlarmEvent) { if mqttMgr == nil { return } mcfg := getConfigSnapshot().MQTT if !mcfg.Enabled { return } payload, err := json.Marshal(map[string]interface{}{ "time": a.TS.UTC().Format(time.RFC3339), "severity": a.Severity, "source": a.Source, "code": a.Code, "state": a.State, "message": a.Message, "value": a.Value, "limit": a.Limit, }) if err != nil { return } _ = mqttMgr.publish(mcfg.TopicPrefix+"/alarm", string(payload), byte(mcfg.QoS), false) } // --------------------------------------------------------------------------- // Force calculation // --------------------------------------------------------------------------- func calculateForces(leftPercent, rightPercent float32, maxTonnage float64) (leftKN, rightKN, sumPercent, sumKN float32) { lp := float64(leftPercent) rp := float64(rightPercent) sumPct := (lp + rp) / 2.0 left := (lp / 100.0) * (maxTonnage / 2.0) right := (rp / 100.0) * (maxTonnage / 2.0) total := (sumPct / 100.0) * maxTonnage return float32(left), float32(right), float32(sumPct), float32(total) } func getConfigSnapshot() Config { cfgMu.RLock() defer cfgMu.RUnlock() return cfg } func buildCachedUI(config Config, revision uint64) ([]byte, error) { data := PageData{ Title: config.UI.Title, Subtitle: config.UI.Subtitle, LeftLabel: config.UI.LeftLabel, RightLabel: config.UI.RightLabel, UnitForce: config.UI.UnitForce, UnitPct: config.UI.UnitPct, MaxTonnage: config.Press.MaxTonnage, WarningPercent: config.Thresholds.WarningPercent, CriticalPercent: config.Thresholds.CriticalPercent, GaugeMaxPercent: config.Thresholds.GaugeMaxPercent, ImbalanceWarningPercent: config.Thresholds.ImbalanceWarningPercent, ImbalanceCriticalPercent: config.Thresholds.ImbalanceCriticalPercent, PollMs: config.PLC.PollMs, DefaultWindow: fmt.Sprintf("%dm", config.Trend.Minutes), DefaultTrendWindow: "15m", ShowHeaderControls: boolValue(config.Modules.ShowHeaderControls, true), ShowVerdict: boolValue(config.Modules.ShowVerdict, true), ShowSummaryBar: boolValue(config.Modules.ShowSummaryBar, true), ShowOverview: boolValue(config.Modules.ShowOverview, true), ShowIntelligence: boolValue(config.Modules.ShowIntelligence, true), ShowAlarmTimeline: boolValue(config.Modules.ShowAlarmTimeline, true), ShowGauges: boolValue(config.Modules.ShowGauges, true), ShowGaugeDigital: boolValue(config.Modules.ShowGaugeDigital, false), ShowTrendChart: boolValue(config.Modules.ShowTrendChart, true), UIRevision: revision, } var buf bytes.Buffer if err := uiTemplate.Execute(&buf, data); err != nil { return nil, err } return buf.Bytes(), nil } func initCachedUI() { config := getConfigSnapshot() payload, err := buildCachedUI(config, atomic.LoadUint64(&uiRevision)) if err != nil { log.Fatalf("failed to pre-render UI template: %v", err) } cfgMu.Lock() cachedUI = payload cfgMu.Unlock() } func hotReloadSectionsLocked(dst *Config, src Config) { dst.Thresholds = src.Thresholds dst.Trend = src.Trend dst.Press = src.Press dst.UI = src.UI dst.Modules = src.Modules } func configSectionChanges(oldCfg, newCfg Config) (hotSections []string, restartSections []string) { if !reflect.DeepEqual(oldCfg.Thresholds, newCfg.Thresholds) { hotSections = append(hotSections, "thresholds") } if !reflect.DeepEqual(oldCfg.Trend, newCfg.Trend) { hotSections = append(hotSections, "trend") } if !reflect.DeepEqual(oldCfg.Press, newCfg.Press) { hotSections = append(hotSections, "press") } if !reflect.DeepEqual(oldCfg.UI, newCfg.UI) { hotSections = append(hotSections, "ui") } if !reflect.DeepEqual(oldCfg.Modules, newCfg.Modules) { hotSections = append(hotSections, "modules") } if !reflect.DeepEqual(oldCfg.Server, newCfg.Server) { restartSections = append(restartSections, "server") } if !reflect.DeepEqual(oldCfg.PLC, newCfg.PLC) { restartSections = append(restartSections, "plc") } if !reflect.DeepEqual(oldCfg.DB, newCfg.DB) { restartSections = append(restartSections, "db") } if !reflect.DeepEqual(oldCfg.MQTT, newCfg.MQTT) { restartSections = append(restartSections, "mqtt") } return hotSections, restartSections } func reloadConfigSafely(configPath string) { newCfg, err := loadConfigStrict(configPath) if err != nil { log.Printf("config reload rejected: %v", err) return } if err := validateConfig(newCfg); err != nil { log.Printf("config reload rejected: %v", err) return } oldCfg := getConfigSnapshot() hotSections, restartSections := configSectionChanges(oldCfg, newCfg) updatedCfg := oldCfg hotReloadSectionsLocked(&updatedCfg, newCfg) if len(hotSections) > 0 { nextUIRevision := atomic.LoadUint64(&uiRevision) + 1 payload, err := buildCachedUI(updatedCfg, nextUIRevision) if err != nil { log.Printf("config reload rejected: failed to rebuild UI: %v", err) return } cfgMu.Lock() cfg = updatedCfg cachedUI = payload atomic.StoreUint64(&uiRevision, nextUIRevision) cfgMu.Unlock() } if len(hotSections) == 0 && len(restartSections) == 0 { log.Printf("config reload checked: no effective changes") return } if len(hotSections) > 0 { log.Printf("config hot-reloaded safely: %s", strings.Join(hotSections, ", ")) } if len(restartSections) > 0 { log.Printf("config changes detected in %s; restart required before they take effect", strings.Join(restartSections, ", ")) } } func startConfigWatcher(ctx context.Context, configPath string) error { watcher, err := fsnotify.NewWatcher() if err != nil { return err } dir := filepath.Dir(configPath) target := filepath.Clean(configPath) if err := watcher.Add(dir); err != nil { _ = watcher.Close() return err } go func() { defer watcher.Close() var ( debounceTimer *time.Timer debounceC <-chan time.Time ) resetDebounce := func() { if debounceTimer == nil { debounceTimer = time.NewTimer(350 * time.Millisecond) } else { if !debounceTimer.Stop() { select { case <-debounceTimer.C: default: } } debounceTimer.Reset(350 * time.Millisecond) } debounceC = debounceTimer.C } for { select { case <-ctx.Done(): if debounceTimer != nil { debounceTimer.Stop() } return case event, ok := <-watcher.Events: if !ok { return } if filepath.Clean(event.Name) != target { continue } if event.Has(fsnotify.Chmod) { continue } if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) { resetDebounce() } case <-debounceC: debounceC = nil reloadConfigSafely(configPath) case err, ok := <-watcher.Errors: if !ok { return } log.Printf("config watcher error: %v", err) } } }() return nil } // --------------------------------------------------------------------------- // State helpers // --------------------------------------------------------------------------- func snapshotState() APIState { state.RLock() defer state.RUnlock() lastUpdate := "" if !state.LastUpdate.IsZero() { lastUpdate = state.LastUpdate.Format(time.RFC3339Nano) } return APIState{ Connected: state.Connected, SilaL: state.SilaL, SilaR: state.SilaR, SilaLkN: state.SilaLkN, SilaRkN: state.SilaRkN, SumPercent: state.SumPercent, SumkN: state.SumkN, ImbalancePercent: state.ImbalancePercent, BiasPercent: state.BiasPercent, LastUpdate: lastUpdate, DroppedSamples: state.DroppedSamples, DroppedEvents: state.DroppedEvents, } } func markDisconnected(reason string) { state.Lock() state.Connected = false state.Unlock() maybeLogPLCDisconnected(reason) } func enqueueSample(s Sample) { select { case sampleCh <- s: default: state.Lock() state.DroppedSamples++ state.Unlock() } } func enqueueAlarm(a AlarmEvent) { select { case alarmCh <- a: default: state.Lock() state.DroppedEvents++ state.Unlock() } // Forward to MQTT non-blocking; ignore errors. go mqttPublishAlarm(a) } // --------------------------------------------------------------------------- // Database initialisation // --------------------------------------------------------------------------- func ensureColumn(database *sql.DB, tableName, columnName, definition string) error { rows, err := database.Query(fmt.Sprintf("PRAGMA table_info(%s)", tableName)) if err != nil { return err } defer rows.Close() found := false for rows.Next() { var cid int var name, ctype string var notNull int var dfltValue sql.NullString var pk int if err := rows.Scan(&cid, &name, &ctype, ¬Null, &dfltValue, &pk); err != nil { return err } if name == columnName { found = true break } } if err := rows.Err(); err != nil { return err } if found { return nil } _, err = database.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", tableName, columnName, definition)) return err } func initDatabase(dbPath string, dbCfg DBConfig) (*sql.DB, error) { dsn := fmt.Sprintf("file:%s?_busy_timeout=%d&_foreign_keys=on", filepath.ToSlash(dbPath), dbCfg.BusyTimeoutMs) database, err := sql.Open("sqlite3", dsn) if err != nil { return nil, fmt.Errorf("open sqlite: %w", err) } database.SetMaxOpenConns(1) database.SetMaxIdleConns(1) database.SetConnMaxLifetime(0) pragmas := []string{ "PRAGMA journal_mode=WAL;", "PRAGMA synchronous=NORMAL;", fmt.Sprintf("PRAGMA wal_autocheckpoint=%d;", dbCfg.CheckpointPages), fmt.Sprintf("PRAGMA busy_timeout=%d;", dbCfg.BusyTimeoutMs), "PRAGMA temp_store=MEMORY;", } for _, q := range pragmas { if _, err := database.Exec(q); err != nil { _ = database.Close() return nil, fmt.Errorf("sqlite pragma failed (%s): %w", q, err) } } schema := ` CREATE TABLE IF NOT EXISTS samples ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts DATETIME NOT NULL, ts_unix_ns INTEGER NOT NULL DEFAULT 0, sila_l_pct REAL NOT NULL, sila_r_pct REAL NOT NULL, sila_l_kn REAL NOT NULL, sila_r_kn REAL NOT NULL, sum_pct REAL NOT NULL, sum_kn REAL NOT NULL, imbalance_pct REAL NOT NULL DEFAULT 0, bias_pct REAL NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_samples_ts ON samples(ts); CREATE INDEX IF NOT EXISTS idx_samples_ts_unix_ns ON samples(ts_unix_ns); CREATE TABLE IF NOT EXISTS alarm_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts DATETIME NOT NULL, ts_unix_ns INTEGER NOT NULL DEFAULT 0, severity TEXT NOT NULL, source TEXT NOT NULL, code TEXT NOT NULL, state TEXT NOT NULL, message TEXT NOT NULL, value REAL NOT NULL DEFAULT 0, limit_value REAL NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_ns DESC); ` if _, err := database.Exec(schema); err != nil { _ = database.Close() return nil, fmt.Errorf("create schema: %w", err) } // Schema migrations: add missing columns to support older databases. migrations := []struct{ table, col, def string }{ {"samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"}, {"samples", "imbalance_pct", "REAL NOT NULL DEFAULT 0"}, {"samples", "bias_pct", "REAL NOT NULL DEFAULT 0"}, {"alarm_events", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"}, } for _, m := range migrations { if err := ensureColumn(database, m.table, m.col, m.def); err != nil { _ = database.Close() return nil, fmt.Errorf("migration (%s.%s): %w", m.table, m.col, err) } } // Ensure the ts_unix_ns index exists (may have been missed pre-migration). if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_ts_unix_ns ON samples(ts_unix_ns)`); err != nil { _ = database.Close() return nil, fmt.Errorf("create ts_unix_ns index: %w", err) } // Backfill ts_unix_ns for rows that pre-date the column. for _, tbl := range []string{"samples", "alarm_events"} { q := fmt.Sprintf(`UPDATE %s SET ts_unix_ns = CAST(strftime('%%s', ts) AS INTEGER) * 1000000000 WHERE ts_unix_ns = 0 AND ts IS NOT NULL`, tbl) if _, err := database.Exec(q); err != nil { log.Printf("warning: ts_unix_ns backfill for %s failed: %v", tbl, err) } } return database, nil } // --------------------------------------------------------------------------- // DB writer goroutines // --------------------------------------------------------------------------- func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) { ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() batch := make([]Sample, 0, batchSize) flush := func() { if len(batch) == 0 { return } tx, err := database.Begin() if err != nil { log.Printf("db begin failed: %v", err) return } stmt, err := tx.Prepare(` INSERT INTO samples ( ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn, sum_pct, sum_kn, imbalance_pct, bias_pct ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { _ = tx.Rollback() log.Printf("db prepare failed: %v", err) return } ok := true for _, s := range batch { if _, err := stmt.Exec( s.TS.UTC().Format(time.RFC3339Nano), s.TS.UTC().UnixNano(), s.SilaLPct, s.SilaRPct, s.SilaLKN, s.SilaRKN, s.SumPercent, s.SumKN, s.ImbalancePercent, s.BiasPercent, ); err != nil { ok = false log.Printf("db insert failed: %v", err) break } } _ = stmt.Close() if !ok { _ = tx.Rollback() return } if err := tx.Commit(); err != nil { log.Printf("db commit failed: %v", err) return } batch = batch[:0] } for { select { case <-ctx.Done(): // Drain remaining samples before exit. for { select { case s := <-sampleCh: batch = append(batch, s) default: flush() return } } case s := <-sampleCh: batch = append(batch, s) if len(batch) >= batchSize { flush() } case <-ticker.C: flush() } } } func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) { ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() batch := make([]AlarmEvent, 0, batchSize) flush := func() { if len(batch) == 0 { return } tx, err := database.Begin() if err != nil { log.Printf("alarm db begin failed: %v", err) return } stmt, err := tx.Prepare(` INSERT INTO alarm_events ( ts, ts_unix_ns, severity, source, code, state, message, value, limit_value ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { _ = tx.Rollback() log.Printf("alarm db prepare failed: %v", err) return } ok := true for _, a := range batch { if _, err := stmt.Exec( a.TS.UTC().Format(time.RFC3339Nano), a.TS.UTC().UnixNano(), a.Severity, a.Source, a.Code, a.State, a.Message, a.Value, a.Limit, ); err != nil { ok = false log.Printf("alarm db insert failed: %v", err) break } } _ = stmt.Close() if !ok { _ = tx.Rollback() return } if err := tx.Commit(); err != nil { log.Printf("alarm db commit failed: %v", err) return } batch = batch[:0] } for { select { case <-ctx.Done(): for { select { case a := <-alarmCh: batch = append(batch, a) default: flush() return } } case a := <-alarmCh: batch = append(batch, a) if len(batch) >= batchSize { flush() } case <-ticker.C: flush() } } } func startDBCleanup(ctx context.Context, database *sql.DB, retentionDays, intervalHr int) { if retentionDays <= 0 { return } ticker := time.NewTicker(time.Duration(intervalHr) * time.Hour) defer ticker.Stop() cleanup := func() { cutoffNs := time.Now().AddDate(0, 0, -retentionDays).UTC().UnixNano() for _, tbl := range []string{"samples", "alarm_events"} { if _, err := database.Exec( fmt.Sprintf(`DELETE FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, tbl), cutoffNs, ); err != nil { log.Printf("db cleanup %s failed: %v", tbl, err) } } } cleanup() for { select { case <-ctx.Done(): return case <-ticker.C: cleanup() } } } // --------------------------------------------------------------------------- // Alarm zone helpers // --------------------------------------------------------------------------- func zoneFromValue(value, warn, crit float64) string { if value >= crit { return "critical" } if value >= warn { return "warning" } return "ok" } func sourceName(source string) string { switch source { case "force_left": return "Left force" case "force_right": return "Right force" case "imbalance": return "Imbalance" case "plc": return "PLC" default: return source } } func sourceLimit(source, zone string) float64 { config := getConfigSnapshot() switch source { case "imbalance": if zone == "critical" { return config.Thresholds.ImbalanceCriticalPercent } if zone == "warning" { return config.Thresholds.ImbalanceWarningPercent } default: if zone == "critical" { return config.Thresholds.CriticalPercent } if zone == "warning" { return config.Thresholds.WarningPercent } } return 0 } func maybeLogZoneChange(source, prev, curr string, value float64) { if prev == curr { return } name := sourceName(source) now := time.Now() if prev == "" && curr == "ok" { return } switch curr { case "ok": enqueueAlarm(AlarmEvent{ TS: now, Severity: "info", Source: source, Code: source + "_clear", State: "clear", Message: fmt.Sprintf("%s returned to OK", name), Value: value, Limit: 0, }) case "warning": msg := fmt.Sprintf("%s entered WARNING zone", name) if prev == "critical" { msg = fmt.Sprintf("%s downgraded from CRITICAL to WARNING", name) } enqueueAlarm(AlarmEvent{ TS: now, Severity: "warning", Source: source, Code: source + "_warning", State: "active", Message: msg, Value: value, Limit: sourceLimit(source, "warning"), }) case "critical": msg := fmt.Sprintf("%s entered CRITICAL zone", name) if prev == "warning" { msg = fmt.Sprintf("%s escalated from WARNING to CRITICAL", name) } enqueueAlarm(AlarmEvent{ TS: now, Severity: "critical", Source: source, Code: source + "_critical", State: "active", Message: msg, Value: value, Limit: sourceLimit(source, "critical"), }) } } func evaluateProcessAlarms(s Sample) { config := getConfigSnapshot() leftZone := zoneFromValue(float64(s.SilaLPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent) rightZone := zoneFromValue(float64(s.SilaRPct), config.Thresholds.WarningPercent, config.Thresholds.CriticalPercent) imbZone := zoneFromValue(float64(s.ImbalancePercent), config.Thresholds.ImbalanceWarningPercent, config.Thresholds.ImbalanceCriticalPercent) alarmTracker.Lock() defer alarmTracker.Unlock() maybeLogZoneChange("force_left", alarmTracker.LeftZone, leftZone, float64(s.SilaLPct)) maybeLogZoneChange("force_right", alarmTracker.RightZone, rightZone, float64(s.SilaRPct)) maybeLogZoneChange("imbalance", alarmTracker.ImbZone, imbZone, float64(s.ImbalancePercent)) alarmTracker.LeftZone = leftZone alarmTracker.RightZone = rightZone alarmTracker.ImbZone = imbZone } func maybeLogPLCConnected() { alarmTracker.Lock() defer alarmTracker.Unlock() if !alarmTracker.PLCKnown { alarmTracker.PLCKnown = true alarmTracker.PLCConnected = true enqueueAlarm(AlarmEvent{ TS: time.Now(), Severity: "info", Source: "plc", Code: "plc_connected", State: "info", Message: "PLC connection established", Value: 1, }) return } if !alarmTracker.PLCConnected { alarmTracker.PLCConnected = true enqueueAlarm(AlarmEvent{ TS: time.Now(), Severity: "info", Source: "plc", Code: "plc_restored", State: "info", Message: "PLC connection restored", Value: 1, }) } } func maybeLogPLCDisconnected(reason string) { alarmTracker.Lock() defer alarmTracker.Unlock() if !alarmTracker.PLCKnown || !alarmTracker.PLCConnected { return } alarmTracker.PLCConnected = false alarmTracker.LeftZone = "" alarmTracker.RightZone = "" alarmTracker.ImbZone = "" enqueueAlarm(AlarmEvent{ TS: time.Now(), Severity: "critical", Source: "plc", Code: "plc_disconnected", State: "active", Message: "PLC connection lost: " + reason, Value: 0, }) } // --------------------------------------------------------------------------- // PLC poller // --------------------------------------------------------------------------- func startPLCPoller(ctx context.Context) { bootCfg := getConfigSnapshot() pollInterval := time.Duration(bootCfg.PLC.PollMs) * time.Millisecond reconnectDelay := time.Duration(bootCfg.PLC.ReconnectDelaySec) * time.Second for { select { case <-ctx.Done(): return default: } handler := gos7.NewTCPClientHandler(bootCfg.PLC.IP, bootCfg.PLC.Rack, bootCfg.PLC.Slot) handler.Timeout = time.Duration(bootCfg.PLC.ConnectTimeoutSec) * time.Second handler.IdleTimeout = time.Duration(bootCfg.PLC.IdleTimeoutSec) * time.Second if err := handler.Connect(); err != nil { markDisconnected(err.Error()) log.Printf("PLC connect failed: %v - retrying in %ds...", err, bootCfg.PLC.ReconnectDelaySec) select { case <-ctx.Done(): return case <-time.After(reconnectDelay): } continue } maybeLogPLCConnected() client := gos7.NewClient(handler) log.Println("PLC connected successfully") buf := make([]byte, 8) for { select { case <-ctx.Done(): _ = handler.Close() return default: } if err := client.AGReadDB(bootCfg.PLC.DBNum, 0, 8, buf); err != nil { log.Printf("PLC read error: %v - reconnecting...", err) markDisconnected(err.Error()) _ = handler.Close() break } var helper gos7.Helper silaL := helper.GetRealAt(buf, 0) silaR := helper.GetRealAt(buf, 4) leftKN, rightKN, sumPercent, sumKN := calculateForces(silaL, silaR, getConfigSnapshot().Press.MaxTonnage) imbalance := float32(math.Abs(float64(silaL - silaR))) bias := silaL - silaR now := time.Now() state.Lock() state.Connected = true state.SilaL = silaL state.SilaR = silaR state.SilaLkN = leftKN state.SilaRkN = rightKN state.SumPercent = sumPercent state.SumkN = sumKN state.ImbalancePercent = imbalance state.BiasPercent = bias state.LastUpdate = now state.Unlock() sample := Sample{ TS: now, SilaLPct: silaL, SilaRPct: silaR, SilaLKN: leftKN, SilaRKN: rightKN, SumPercent: sumPercent, SumKN: sumKN, ImbalancePercent: imbalance, BiasPercent: bias, } evaluateProcessAlarms(sample) enqueueSample(sample) select { case <-ctx.Done(): _ = handler.Close() return case <-time.After(pollInterval): } } } } // --------------------------------------------------------------------------- // Query helpers // --------------------------------------------------------------------------- func parseWindow(raw string) (time.Duration, string, error) { s := strings.TrimSpace(strings.ToLower(raw)) if s == "" { s = fmt.Sprintf("%dm", getConfigSnapshot().Trend.Minutes) } if strings.HasSuffix(s, "d") { n, err := strconv.Atoi(strings.TrimSuffix(s, "d")) if err != nil || n <= 0 { return 0, "", fmt.Errorf("invalid day window") } return time.Duration(n) * 24 * time.Hour, s, nil } d, err := time.ParseDuration(s) if err != nil || d <= 0 { return 0, "", fmt.Errorf("invalid window") } return d, s, nil } func formatHistoryLabel(t time.Time, window time.Duration) string { local := t.Local() if window >= 12*time.Hour { return local.Format("02.01 15:04") } return local.Format("15:04:05.000") } func queryHistory(window time.Duration) ([]HistoryPoint, error) { cutoffNs := time.Now().Add(-window).UTC().UnixNano() rows, err := db.Query(`SELECT ts, sila_l_pct, sila_r_pct FROM samples WHERE ts_unix_ns >= ? ORDER BY ts_unix_ns ASC`, cutoffNs) if err != nil { return nil, err } defer rows.Close() points := make([]HistoryPoint, 0, 1024) for rows.Next() { var ts string var l, r float64 if err := rows.Scan(&ts, &l, &r); err != nil { return nil, err } t, err := time.Parse(time.RFC3339Nano, ts) if err != nil { continue } points = append(points, HistoryPoint{ Time: formatHistoryLabel(t, window), SilaL: float32(l), SilaR: float32(r), }) } if err := rows.Err(); err != nil { return nil, err } maxPts := getConfigSnapshot().DB.MaxChartPoints if len(points) <= maxPts { return points, nil } return downsamplePoints(points, maxPts), nil } func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint { if len(points) <= max || max < 3 { return points } out := make([]HistoryPoint, 0, max) step := float64(len(points)-1) / float64(max-1) used := make(map[int]struct{}, max) for i := 0; i < max; i++ { idx := int(float64(i) * step) if idx >= len(points) { idx = len(points) - 1 } if _, ok := used[idx]; ok { continue } used[idx] = struct{}{} out = append(out, points[idx]) } if len(out) == 0 { return points } // Always ensure last point is the most recent. out[len(out)-1] = points[len(points)-1] return out } func validField(field string) (string, error) { switch field { case "sum_pct": return "sum_pct", nil case "imbalance_pct": return "imbalance_pct", nil default: return "", fmt.Errorf("invalid field") } } func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) { safeField, err := validField(field) if err != nil { return NumericStats{}, err } query := fmt.Sprintf(` SELECT COALESCE(AVG(%[1]s), 0), COALESCE(AVG(%[1]s * %[1]s), 0), COALESCE(MIN(%[1]s), 0), COALESCE(MAX(%[1]s), 0), COUNT(*) FROM samples WHERE ts_unix_ns >= ? AND ts_unix_ns < ? `, safeField) var stats NumericStats err = db.QueryRow(query, fromNs, toNs).Scan(&stats.Avg, &stats.AvgSq, &stats.Min, &stats.Max, &stats.Count) if err != nil { return NumericStats{}, err } return stats, nil } // --------------------------------------------------------------------------- // Trend / stability classification // --------------------------------------------------------------------------- func classifyDirection(delta float64, oldCount, newCount int, stableThreshold float64, posLabel, negLabel string) string { if oldCount < 3 || newCount < 3 { return "insufficient_data" } if math.Abs(delta) < stableThreshold { return "stable" } if delta > 0 { return posLabel } return negLabel } func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sampleCount int) (string, string) { if sampleCount < 8 { return "insufficient_data", "Too few samples in selected trend window" } config := getConfigSnapshot() if forceStd >= 6.0 || math.Abs(forceDelta) >= 8.0 || avgImb5m >= config.Thresholds.ImbalanceCriticalPercent || imbStd >= 4.0 { if avgImb5m >= config.Thresholds.ImbalanceCriticalPercent { return "unstable", "High average imbalance in last 5 minutes" } if math.Abs(forceDelta) >= 8.0 { return "unstable", "Average peak force is drifting fast" } if forceStd >= 6.0 { return "unstable", "Force variation is too high" } return "unstable", "Imbalance variation is too high" } if forceStd >= 3.0 || math.Abs(forceDelta) >= 3.0 || avgImb5m >= config.Thresholds.ImbalanceWarningPercent || imbStd >= 2.0 { if avgImb5m >= config.Thresholds.ImbalanceWarningPercent { return "caution", "Imbalance is trending above warning region" } if math.Abs(forceDelta) >= 3.0 { return "caution", "Average force is drifting" } if forceStd >= 3.0 { return "caution", "Force is less repeatable than normal" } return "caution", "Imbalance repeatability is degrading" } return "stable", "Process variation is low" } func buildTrendResponse(window time.Duration, label string) (TrendResponse, error) { nowNs := time.Now().UTC().UnixNano() windowNs := window.Nanoseconds() startNs := nowNs - windowNs midNs := startNs + (windowNs / 2) force5m, err := queryNumericStats("sum_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } force1h, err := queryNumericStats("sum_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } imb5m, err := queryNumericStats("imbalance_pct", nowNs-(5*time.Minute).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } imb1h, err := queryNumericStats("imbalance_pct", nowNs-(1*time.Hour).Nanoseconds(), nowNs) if err != nil { return TrendResponse{}, err } forceOld, err := queryNumericStats("sum_pct", startNs, midNs) if err != nil { return TrendResponse{}, err } forceNew, err := queryNumericStats("sum_pct", midNs, nowNs) if err != nil { return TrendResponse{}, err } imbOld, err := queryNumericStats("imbalance_pct", startNs, midNs) if err != nil { return TrendResponse{}, err } imbNew, err := queryNumericStats("imbalance_pct", midNs, nowNs) if err != nil { return TrendResponse{}, err } forceDelta := forceNew.Avg - forceOld.Avg imbDelta := imbNew.Avg - imbOld.Avg forceDirection := classifyDirection(forceDelta, forceOld.Count, forceNew.Count, 1.0, "rising", "falling") imbDirection := classifyDirection(imbDelta, imbOld.Count, imbNew.Count, 0.5, "worsening", "improving") fullWindowForce, err := queryNumericStats("sum_pct", startNs, nowNs) if err != nil { return TrendResponse{}, err } fullWindowImb, err := queryNumericStats("imbalance_pct", startNs, nowNs) if err != nil { return TrendResponse{}, err } stability, reason := classifyProcessStability( fullWindowForce.StdDev(), fullWindowImb.StdDev(), forceDelta, imb5m.Avg, fullWindowForce.Count, ) return TrendResponse{ Window: label, AvgPeak5m: float32(force5m.Avg), AvgPeak1h: float32(force1h.Avg), AvgImbalance5m: float32(imb5m.Avg), AvgImbalance1h: float32(imb1h.Avg), ForceDeltaPct: float32(forceDelta), ImbalanceDeltaPct: float32(imbDelta), ForceDirection: forceDirection, ImbalanceDirection: imbDirection, ProcessStability: stability, StabilityReason: reason, ForceStdDev: float32(fullWindowForce.StdDev()), ImbalanceStdDev: float32(fullWindowImb.StdDev()), SampleCount: fullWindowForce.Count, OldHalfCount: forceOld.Count, NewHalfCount: forceNew.Count, }, nil } func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) { if limit <= 0 { limit = 20 } if limit > 100 { limit = 100 } rows, err := db.Query(` SELECT ts, severity, source, state, message, value, limit_value FROM alarm_events ORDER BY ts_unix_ns DESC LIMIT ? `, limit) if err != nil { return nil, err } defer rows.Close() events := make([]AlarmEventAPI, 0, limit) for rows.Next() { var ts, severity, source, state, message string var value, limitValue float64 if err := rows.Scan(&ts, &severity, &source, &state, &message, &value, &limitValue); err != nil { return nil, err } displayTime := ts if t, err := time.Parse(time.RFC3339Nano, ts); err == nil { displayTime = t.Local().Format("02.01.2006 15:04:05") } events = append(events, AlarmEventAPI{ Time: displayTime, Severity: severity, Source: source, State: state, Message: message, Value: value, Limit: limitValue, }) } return events, rows.Err() } // --------------------------------------------------------------------------- // HTTP helpers // --------------------------------------------------------------------------- // writeJSON writes v as JSON with correct headers and CORS. func writeJSON(w http.ResponseWriter, status int, v any) { h := w.Header() h.Set("Content-Type", "application/json") h.Set("Cache-Control", "no-store") h.Set("Access-Control-Allow-Origin", "*") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(v) } // allowMethod checks r.Method and handles CORS preflight. // Returns false if the caller should stop processing. func allowMethod(w http.ResponseWriter, r *http.Request, method string) bool { if r.Method == http.MethodOptions { h := w.Header() h.Set("Access-Control-Allow-Origin", "*") h.Set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS") h.Set("Access-Control-Allow-Headers", "Content-Type") w.WriteHeader(http.StatusNoContent) return false } if r.Method != method { http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) return false } return true } // --------------------------------------------------------------------------- // HTTP handlers — core // --------------------------------------------------------------------------- func apiData(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } writeJSON(w, http.StatusOK, snapshotState()) } func apiUIRevision(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } writeJSON(w, http.StatusOK, map[string]uint64{"revision": atomic.LoadUint64(&uiRevision)}) } func apiHistory(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } window, label, err := parseWindow(r.URL.Query().Get("window")) if err != nil { http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest) return } points, err := queryHistory(window) if err != nil { log.Printf("history query failed: %v", err) http.Error(w, `{"error":"history query failed"}`, http.StatusInternalServerError) return } writeJSON(w, http.StatusOK, HistoryResponse{Window: label, Points: points}) } func apiTrend(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } window, label, err := parseWindow(r.URL.Query().Get("window")) if err != nil { http.Error(w, `{"error":"invalid trend window"}`, http.StatusBadRequest) return } resp, err := buildTrendResponse(window, label) if err != nil { log.Printf("trend query failed: %v", err) http.Error(w, `{"error":"trend query failed"}`, http.StatusInternalServerError) return } writeJSON(w, http.StatusOK, resp) } func apiAlarms(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } limit := 20 if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { if n, err := strconv.Atoi(raw); err == nil && n > 0 { limit = n } } events, err := queryAlarmEvents(limit) if err != nil { log.Printf("alarm query failed: %v", err) http.Error(w, `{"error":"alarm query failed"}`, http.StatusInternalServerError) return } writeJSON(w, http.StatusOK, AlarmResponse{Events: events}) } func serveUI(w http.ResponseWriter, r *http.Request) { cfgMu.RLock() payload := cachedUI cfgMu.RUnlock() w.Header().Set("Content-Type", "text/html; charset=utf-8") _, _ = w.Write(payload) } // --------------------------------------------------------------------------- // HTTP handlers — MQTT REST API // --------------------------------------------------------------------------- // GET /api/mqtt/status // Returns MQTT connection state, broker info, and active subscriptions. func apiMQTTStatus(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } if mqttMgr == nil { writeJSON(w, http.StatusOK, MQTTStatusResponse{ Enabled: false, Connected: false, Broker: "", Subscribed: []string{}, }) return } writeJSON(w, http.StatusOK, mqttMgr.status()) } // POST /api/mqtt/publish // Body: {"topic":"plant/press/cmd","payload":"reset","qos":1,"retain":false} // Publishes an arbitrary message to the MQTT broker. func apiMQTTPublish(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodPost) { return } if mqttMgr == nil { http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) return } var req MQTTPublishRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) return } if strings.TrimSpace(req.Topic) == "" { http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) return } if req.QoS < 0 || req.QoS > 2 { http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest) return } if err := mqttMgr.publish(req.Topic, req.Payload, byte(req.QoS), req.Retain); err != nil { writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, map[string]string{"status": "published", "topic": req.Topic}) } // GET /api/mqtt/messages?limit=50 // Returns the most recently received MQTT messages (across all subscribed topics). func apiMQTTMessages(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodGet) { return } if mqttMgr == nil { writeJSON(w, http.StatusOK, map[string]any{"messages": []MQTTReceivedMsg{}, "enabled": false}) return } limit := 50 if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { if n, err := strconv.Atoi(raw); err == nil && n > 0 { limit = n } } msgs := mqttMgr.getMessages(limit) writeJSON(w, http.StatusOK, map[string]any{"messages": msgs, "count": len(msgs)}) } // POST /api/mqtt/subscribe // Body: {"topic":"plant/press/#","qos":1} // Subscribes to a topic filter. Messages are stored in the ring buffer // and accessible via GET /api/mqtt/messages. func apiMQTTSubscribe(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodPost) { return } if mqttMgr == nil { http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) return } var req MQTTSubscribeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) return } if strings.TrimSpace(req.Topic) == "" { http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) return } if req.QoS < 0 || req.QoS > 2 { http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest) return } if err := mqttMgr.subscribe(req.Topic, byte(req.QoS)); err != nil { writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, map[string]string{"status": "subscribed", "topic": req.Topic}) } // DELETE /api/mqtt/subscribe // Body: {"topic":"plant/press/#"} // Unsubscribes from a topic filter. func apiMQTTUnsubscribe(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, http.MethodDelete) { return } if mqttMgr == nil { http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) return } var req MQTTSubscribeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) return } if strings.TrimSpace(req.Topic) == "" { http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) return } if err := mqttMgr.unsubscribe(req.Topic); err != nil { writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, map[string]string{"status": "unsubscribed", "topic": req.Topic}) } // --------------------------------------------------------------------------- // main // --------------------------------------------------------------------------- func main() { wd, err := os.Getwd() if err != nil { log.Fatalf("failed to get working directory: %v", err) } configPath := filepath.Join(wd, "config.yaml") cfg, err = loadOrCreateConfig(configPath) if err != nil { log.Fatalf("failed to load config: %v", err) } if err := validateConfig(cfg); err != nil { log.Fatalf("invalid config: %v", err) } dbPath := cfg.DB.Path if !filepath.IsAbs(dbPath) { dbPath = filepath.Join(wd, dbPath) } // Pass DB config explicitly so initDatabase doesn't touch the global. db, err = initDatabase(dbPath, cfg.DB) if err != nil { log.Fatalf("failed to init database: %v", err) } defer db.Close() sampleCh = make(chan Sample, cfg.DB.WriterQueueSize) alarmCh = make(chan AlarmEvent, cfg.DB.AlarmQueueSize) initCachedUI() log.Printf("S7-1200 Force Monitor v%s", version) log.Printf("Config: %s", configPath) log.Printf("DB: %s", dbPath) log.Printf("PLC: ip=%s db=%d rack=%d slot=%d poll=%dms", cfg.PLC.IP, cfg.PLC.DBNum, cfg.PLC.Rack, cfg.PLC.Slot, cfg.PLC.PollMs) log.Printf("Press: MAX_TONNAGE=%.2f %s", cfg.Press.MaxTonnage, cfg.UI.UnitForce) // Initialise MQTT if enabled. if cfg.MQTT.Enabled { mqttMgr = newMQTTManager(cfg.MQTT) if err := mqttMgr.connect(); err != nil { // Non-fatal: paho will reconnect automatically. log.Printf("MQTT initial connect failed (will retry): %v", err) } else { log.Printf("MQTT: connected to %s (prefix=%s)", cfg.MQTT.Broker, cfg.MQTT.TopicPrefix) } } else { log.Printf("MQTT: disabled (set mqtt.enabled: true in config to enable)") } ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() if err := startConfigWatcher(ctx, configPath); err != nil { log.Printf("config watch disabled: %v", err) } else { log.Printf("Config watcher enabled for %s", configPath) } // Snapshot DB config values once (DB is not hot-reloadable). dbCfg := cfg.DB var wg sync.WaitGroup wg.Add(4) go func() { defer wg.Done(); startDBWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }() go func() { defer wg.Done(); startAlarmWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }() go func() { defer wg.Done(); startDBCleanup(ctx, db, dbCfg.RetentionDays, dbCfg.CleanupIntervalHr) }() go func() { defer wg.Done(); startPLCPoller(ctx) }() if cfg.MQTT.Enabled { wg.Add(1) go func() { defer wg.Done(); startMQTTPublisher(ctx) }() } staticRoot, err := fs.Sub(staticFiles, "static") if err != nil { log.Fatalf("failed to mount embedded static files: %v", err) } mux := http.NewServeMux() mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(staticRoot)))) mux.HandleFunc("/", serveUI) // Core data API mux.HandleFunc("/api/data", apiData) mux.HandleFunc("/api/ui-revision", apiUIRevision) mux.HandleFunc("/api/history", apiHistory) mux.HandleFunc("/api/trend", apiTrend) mux.HandleFunc("/api/alarms", apiAlarms) // MQTT REST API // // GET /api/mqtt/status → connection status, broker, subscriptions // POST /api/mqtt/publish → publish message to any topic // GET /api/mqtt/messages[?limit=N] → last N received messages // POST /api/mqtt/subscribe → subscribe to topic filter // DELETE /api/mqtt/subscribe → unsubscribe from topic filter mux.HandleFunc("/api/mqtt/status", apiMQTTStatus) mux.HandleFunc("/api/mqtt/publish", apiMQTTPublish) mux.HandleFunc("/api/mqtt/messages", apiMQTTMessages) mux.HandleFunc("/api/mqtt/subscribe", apiMQTTSubscribe) mux.HandleFunc("/api/mqtt/unsubscribe", apiMQTTUnsubscribe) srv := &http.Server{ Addr: cfg.Server.ListenAddr, Handler: mux, ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } log.Printf("Listening on http://localhost%s", cfg.Server.ListenAddr) log.Printf("MQTT API: GET /api/mqtt/status | POST /api/mqtt/publish | GET /api/mqtt/messages") go func() { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("HTTP server error: %v", err) } }() <-ctx.Done() log.Println("Shutting down — flushing DB writers...") shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("HTTP server shutdown error: %v", err) } if mqttMgr != nil { mqttMgr.disconnect() log.Println("MQTT disconnected") } wg.Wait() log.Println("Shutdown complete") } const uiHTML = ` {{.Title}}

{{.Title}}

{{.Subtitle}}

MAX_TONNAGE = {{printf "%.1f" .MaxTonnage}} {{.UnitForce}}

{{if .ShowHeaderControls}}
{{end}}
Disconnected
Last update: --:--:--.---
Dropped S: 0 | E: 0
{{if .ShowVerdict}}
Machine verdict
NO DATA
Waiting for PLC data
{{end}} {{if .ShowSummaryBar}}
FORCE
NO DATA
--
IMBALANCE
NO DATA
--
PLC
OFFLINE
Disconnected
{{end}} {{if .ShowOverview}}
TOTAL PEAK FORCE
0.0
{{.UnitForce}}
TOTAL %
0.0 {{.UnitPct}}
IMBALANCE
0.0 {{.UnitPct}}
abs(L - R)
BIAS
0.0 {{.UnitPct}}
L - R
LIMITS
Force W {{printf "%.0f" .WarningPercent}} / C {{printf "%.0f" .CriticalPercent}}
Imb W {{printf "%.0f" .ImbalanceWarningPercent}} / C {{printf "%.0f" .ImbalanceCriticalPercent}}
{{end}} {{if .ShowIntelligence}}

Drift / Deterioration Intelligence

Averages, drift direction, imbalance deterioration and process stability
AVG PEAK 5 MIN
--
No data
AVG PEAK 1 HOUR
--
No data
FORCE TREND
--
No data
IMBALANCE TREND
--
No data
PROCESS STABILITY
--
No data
{{end}} {{if .ShowAlarmTimeline}}

Event / Alarm Timeline

Recent transitions show exactly when the process began drifting, overloading, losing balance, or losing PLC communication
Newest events first • clear events included
Time Severity Source Event Value Limit
No events yet
{{end}} {{if .ShowGauges}}
{{if .ShowGaugeDigital}}

{{.LeftLabel}}

NORMAL
0.0
{{.UnitPct}}
0.0 {{.UnitForce}}
{{else}}

{{.LeftLabel}}

NORMAL
{{end}}
{{if .ShowGaugeDigital}}

{{.RightLabel}}

NORMAL
0.0
{{.UnitPct}}
0.0 {{.UnitForce}}
{{else}}

{{.RightLabel}}

NORMAL
{{end}}
{{end}} {{if .ShowTrendChart}}

Peak Trend

Piezo peak/stroke history from SQLite with visible warning and critical limits
{{end}}
`