diff --git a/go.mod b/go.mod index 448ef39..98c5bb8 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,18 @@ module tonnage-app-imco -go 1.21 +go 1.24.0 require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/fsnotify/fsnotify v1.9.0 github.com/mattn/go-sqlite3 v1.14.42 github.com/robinson/gos7 v0.0.0-20241205073040-7ea1d6fb9d20 gopkg.in/yaml.v3 v3.0.1 ) -require golang.org/x/sys v0.13.0 // indirect +require ( + github.com/gorilla/websocket v1.5.3 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.36.0 // indirect +) diff --git a/go.sum b/go.sum index 8cfe082..dbb5e51 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,19 @@ +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/mattn/go-sqlite3 v1.14.42 h1:MigqEP4ZmHw3aIdIT7T+9TLa90Z6smwcthx+Azv4Cgo= github.com/mattn/go-sqlite3 v1.14.42/go.mod h1:pjEuOr8IwzLJP2MfGeTb0A35jauH+C2kbHKBr7yXKVQ= github.com/robinson/gos7 v0.0.0-20241205073040-7ea1d6fb9d20 h1:HjGiMRQ3pKwKH3p0mmLtY62bwd973txhzV9FfpdGo7U= github.com/robinson/gos7 v0.0.0-20241205073040-7ea1d6fb9d20/go.mod h1:AMHIeh1KJ7Xa2RVOMHdv9jXKrpw0D4EWGGQMHLb2doc= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/main.go b/main.go index 3a9af1e..9188cf1 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "syscall" "time" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/fsnotify/fsnotify" _ "github.com/mattn/go-sqlite3" "github.com/robinson/gos7" @@ -33,7 +34,7 @@ import ( //go:embed static var staticFiles embed.FS -const version = "0.9.1" +const version = "0.9.2" // --------------------------------------------------------------------------- // Config structs @@ -48,6 +49,7 @@ type Config struct { UI UIConfig `yaml:"ui"` Modules ModulesConfig `yaml:"modules"` DB DBConfig `yaml:"db"` + MQTT MQTTConfig `yaml:"mqtt"` } type ServerConfig struct { @@ -120,10 +122,29 @@ type DBConfig struct { CleanupIntervalHr int `yaml:"cleanup_interval_hours"` } -func boolPtr(v bool) *bool { - return &v +// MQTTConfig holds all MQTT broker and publishing settings. +// Requires restart when changed — not hot-reloadable. +type MQTTConfig struct { + Enabled bool `yaml:"enabled"` + Broker string `yaml:"broker"` // e.g. "tcp://192.168.1.10:1883" + ClientID string `yaml:"client_id"` // unique client identifier + Username string `yaml:"username"` // leave blank if no auth + Password string `yaml:"password"` // leave blank if no auth + TopicPrefix string `yaml:"topic_prefix"` // e.g. "plant1/press3" + QoS int `yaml:"qos"` // 0, 1, or 2 + Retain bool `yaml:"retain"` // retain auto-published state msgs + AutoPublish bool `yaml:"auto_publish"` // publish PLC state on timer + PublishIntervalMs int `yaml:"publish_interval_ms"` // how often to auto-publish + ConnectTimeoutSec int `yaml:"connect_timeout_sec"` + ReconnectDelaySec int `yaml:"reconnect_delay_sec"` } +// --------------------------------------------------------------------------- +// Config helpers +// --------------------------------------------------------------------------- + +func boolPtr(v bool) *bool { return &v } + func boolValue(v *bool, def bool) bool { if v == nil { return def @@ -133,9 +154,7 @@ func boolValue(v *bool, def bool) bool { func defaultConfig() Config { return Config{ - Server: ServerConfig{ - ListenAddr: ":8080", - }, + Server: ServerConfig{ListenAddr: ":8080"}, PLC: PLCConfig{ IP: "192.168.0.1", DBNum: 1001, @@ -153,12 +172,8 @@ func defaultConfig() Config { ImbalanceWarningPercent: 10, ImbalanceCriticalPercent: 20, }, - Trend: TrendConfig{ - Minutes: 5, - }, - Press: PressConfig{ - MaxTonnage: 64, - }, + Trend: TrendConfig{Minutes: 5}, + Press: PressConfig{MaxTonnage: 64}, UI: UIConfig{ Title: "Force Monitor", Subtitle: "Siemens S7-1215C • Piezo peak/stroke input • PLC values in % • kN calculated from MAX_TONNAGE", @@ -190,19 +205,31 @@ func defaultConfig() Config { CheckpointPages: 1000, CleanupIntervalHr: 6, }, + MQTT: MQTTConfig{ + Enabled: false, + Broker: "tcp://localhost:1883", + ClientID: "force_monitor", + TopicPrefix: "force_monitor", + QoS: 1, + Retain: false, + AutoPublish: true, + PublishIntervalMs: 1000, + ConnectTimeoutSec: 10, + ReconnectDelaySec: 5, + }, } } -// --------------------------------------------------------------------------- -// Config normalisation helpers -// --------------------------------------------------------------------------- - func setIfZeroF(dst *float64, def float64) { if *dst <= 0 { *dst = def } } +// setIfZeroI replaces *dst with def when *dst <= 0. +// Note: this means config values of 0 are treated as "not set". +// For PLC Rack (valid at 0), the default is also 0, so this is a safe no-op. +// For PLC Slot, a value of 0 would be overwritten with default 1. func setIfZeroI(dst *int, def int) { if *dst <= 0 { *dst = def @@ -229,6 +256,7 @@ func normalizeConfig(cfg *Config) { setIfEmpty(&cfg.PLC.IP, def.PLC.IP) setIfZeroI(&cfg.PLC.DBNum, def.PLC.DBNum) + // Rack: default is 0 so setIfZeroI is a no-op; kept for symmetry. setIfZeroI(&cfg.PLC.Rack, def.PLC.Rack) setIfZeroI(&cfg.PLC.Slot, def.PLC.Slot) setIfZeroI(&cfg.PLC.PollMs, def.PLC.PollMs) @@ -236,6 +264,7 @@ func normalizeConfig(cfg *Config) { setIfZeroI(&cfg.PLC.IdleTimeoutSec, def.PLC.IdleTimeoutSec) setIfZeroI(&cfg.PLC.ReconnectDelaySec, def.PLC.ReconnectDelaySec) + // Migrate legacy kN fields to percent fields if new fields are absent. if cfg.Thresholds.WarningPercent <= 0 && cfg.Thresholds.LegacyWarningKn > 0 { cfg.Thresholds.WarningPercent = cfg.Thresholds.LegacyWarningKn } @@ -290,6 +319,19 @@ func normalizeConfig(cfg *Config) { setIfZeroI(&cfg.DB.AlarmQueueSize, def.DB.AlarmQueueSize) setIfZeroI(&cfg.DB.CheckpointPages, def.DB.CheckpointPages) setIfZeroI(&cfg.DB.CleanupIntervalHr, def.DB.CleanupIntervalHr) + + // MQTT normalization (only when enabled to avoid noisy defaults) + if cfg.MQTT.Enabled { + setIfEmpty(&cfg.MQTT.Broker, def.MQTT.Broker) + setIfEmpty(&cfg.MQTT.ClientID, def.MQTT.ClientID) + setIfEmpty(&cfg.MQTT.TopicPrefix, def.MQTT.TopicPrefix) + if cfg.MQTT.QoS < 0 || cfg.MQTT.QoS > 2 { + cfg.MQTT.QoS = def.MQTT.QoS + } + setIfZeroI(&cfg.MQTT.PublishIntervalMs, def.MQTT.PublishIntervalMs) + setIfZeroI(&cfg.MQTT.ConnectTimeoutSec, def.MQTT.ConnectTimeoutSec) + setIfZeroI(&cfg.MQTT.ReconnectDelaySec, def.MQTT.ReconnectDelaySec) + } } func loadConfigStrict(configPath string) (Config, error) { @@ -354,6 +396,12 @@ func validateConfig(cfg Config) error { if cfg.Press.MaxTonnage <= 0 { return fmt.Errorf("press.MAX_TONNAGE must be > 0") } + if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.Broker) == "" { + return fmt.Errorf("mqtt.broker must be set when mqtt.enabled is true") + } + if cfg.MQTT.Enabled && strings.TrimSpace(cfg.MQTT.ClientID) == "" { + return fmt.Errorf("mqtt.client_id must be set when mqtt.enabled is true") + } return nil } @@ -516,6 +564,58 @@ type AlarmTracker struct { ImbZone string } +// --------------------------------------------------------------------------- +// MQTT types +// --------------------------------------------------------------------------- + +// MQTTReceivedMsg holds a single inbound MQTT message stored in history. +type MQTTReceivedMsg struct { + Topic string `json:"topic"` + Payload string `json:"payload"` + Retained bool `json:"retained"` + Time string `json:"time"` +} + +// MQTTPublishRequest is the body for POST /api/mqtt/publish. +type MQTTPublishRequest struct { + Topic string `json:"topic"` + Payload string `json:"payload"` + QoS int `json:"qos"` + Retain bool `json:"retain"` +} + +// MQTTSubscribeRequest is the body for POST /api/mqtt/subscribe. +type MQTTSubscribeRequest struct { + Topic string `json:"topic"` + QoS int `json:"qos"` +} + +// MQTTStatusResponse is returned by GET /api/mqtt/status. +type MQTTStatusResponse struct { + Enabled bool `json:"enabled"` + Connected bool `json:"connected"` + Broker string `json:"broker"` + ClientID string `json:"client_id"` + TopicPrefix string `json:"topic_prefix"` + Subscribed []string `json:"subscribed"` + LastError string `json:"last_error,omitempty"` +} + +// mqttManager wraps the paho MQTT client and tracks connection state, +// inbound message history, and active subscriptions. +type mqttManager struct { + mu sync.RWMutex + client mqtt.Client + connected bool + lastErr string + broker string + clientID string + prefix string + msgHistory []MQTTReceivedMsg + msgMax int + subs map[string]byte // topic -> qos currently subscribed +} + // --------------------------------------------------------------------------- // Package-level singletons // --------------------------------------------------------------------------- @@ -531,8 +631,307 @@ var ( uiTemplate = template.Must(template.New("ui").Parse(uiHTML)) cachedUI []byte uiRevision uint64 = 1 + + // mqttMgr is nil when MQTT is disabled. + mqttMgr *mqttManager ) +// --------------------------------------------------------------------------- +// MQTT manager +// --------------------------------------------------------------------------- + +func newMQTTManager(mcfg MQTTConfig) *mqttManager { + m := &mqttManager{ + broker: mcfg.Broker, + clientID: mcfg.ClientID, + prefix: mcfg.TopicPrefix, + msgMax: 500, + subs: make(map[string]byte), + } + + opts := mqtt.NewClientOptions() + opts.AddBroker(mcfg.Broker) + opts.SetClientID(mcfg.ClientID) + if mcfg.Username != "" { + opts.SetUsername(mcfg.Username) + opts.SetPassword(mcfg.Password) + } + opts.SetAutoReconnect(true) + opts.SetMaxReconnectInterval(time.Duration(mcfg.ReconnectDelaySec*6) * time.Second) + opts.SetConnectTimeout(time.Duration(mcfg.ConnectTimeoutSec) * time.Second) + opts.SetCleanSession(true) + opts.SetKeepAlive(30 * time.Second) + + opts.SetOnConnectHandler(func(c mqtt.Client) { + m.mu.Lock() + m.connected = true + m.lastErr = "" + // Snapshot current subs before releasing lock. + resubTopics := make(map[string]byte, len(m.subs)) + for t, q := range m.subs { + resubTopics[t] = q + } + m.mu.Unlock() + + log.Printf("MQTT connected to %s", mcfg.Broker) + + // Re-subscribe on reconnect. + for topic, qos := range resubTopics { + tok := c.Subscribe(topic, qos, m.messageHandler) + if tok.Wait() && tok.Error() != nil { + log.Printf("MQTT re-subscribe %s failed: %v", topic, tok.Error()) + } + } + }) + + opts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { + m.mu.Lock() + m.connected = false + if err != nil { + m.lastErr = err.Error() + } + m.mu.Unlock() + log.Printf("MQTT connection lost: %v", err) + }) + + m.client = mqtt.NewClient(opts) + return m +} + +// connect dials the broker; paho handles all subsequent reconnections. +func (m *mqttManager) connect() error { + tok := m.client.Connect() + if !tok.WaitTimeout(30 * time.Second) { + return fmt.Errorf("MQTT connect timeout after 30s") + } + if err := tok.Error(); err != nil { + m.mu.Lock() + m.lastErr = err.Error() + m.mu.Unlock() + return err + } + return nil +} + +// disconnect cleanly disconnects from the broker. +func (m *mqttManager) disconnect() { + if m.client != nil { + m.client.Disconnect(500) + } +} + +// messageHandler is the default callback for all subscriptions. +func (m *mqttManager) messageHandler(_ mqtt.Client, msg mqtt.Message) { + entry := MQTTReceivedMsg{ + Topic: msg.Topic(), + Payload: string(msg.Payload()), + Retained: msg.Retained(), + Time: time.Now().UTC().Format(time.RFC3339Nano), + } + m.mu.Lock() + m.msgHistory = append(m.msgHistory, entry) + if len(m.msgHistory) > m.msgMax { + // Trim oldest half to avoid O(n) shift on every overflow. + half := m.msgMax / 2 + copy(m.msgHistory, m.msgHistory[half:]) + m.msgHistory = m.msgHistory[:m.msgMax-half] + } + m.mu.Unlock() +} + +// publish sends a message; returns an error if not connected or publish fails. +func (m *mqttManager) publish(topic, payload string, qos byte, retain bool) error { + if topic == "" { + return fmt.Errorf("topic must not be empty") + } + if qos > 2 { + qos = 2 + } + m.mu.RLock() + ok := m.connected + m.mu.RUnlock() + if !ok { + return fmt.Errorf("MQTT not connected") + } + tok := m.client.Publish(topic, qos, retain, payload) + if qos > 0 { + if !tok.WaitTimeout(5 * time.Second) { + return fmt.Errorf("MQTT publish timeout") + } + return tok.Error() + } + return nil +} + +// subscribe adds a topic subscription; resubscribed automatically on reconnect. +func (m *mqttManager) subscribe(topic string, qos byte) error { + if topic == "" { + return fmt.Errorf("topic must not be empty") + } + if qos > 2 { + qos = 2 + } + m.mu.Lock() + m.subs[topic] = qos + m.mu.Unlock() + + m.mu.RLock() + ok := m.connected + m.mu.RUnlock() + if !ok { + // Will be subscribed upon reconnect via OnConnectHandler. + return nil + } + + tok := m.client.Subscribe(topic, qos, m.messageHandler) + if tok.Wait() && tok.Error() != nil { + return tok.Error() + } + return nil +} + +// unsubscribe removes a topic subscription. +func (m *mqttManager) unsubscribe(topic string) error { + m.mu.Lock() + delete(m.subs, topic) + m.mu.Unlock() + + m.mu.RLock() + ok := m.connected + m.mu.RUnlock() + if !ok { + return nil + } + tok := m.client.Unsubscribe(topic) + if tok.Wait() && tok.Error() != nil { + return tok.Error() + } + return nil +} + +// status returns a snapshot of connection state for the API. +func (m *mqttManager) status() MQTTStatusResponse { + m.mu.RLock() + defer m.mu.RUnlock() + + subs := make([]string, 0, len(m.subs)) + for t := range m.subs { + subs = append(subs, t) + } + return MQTTStatusResponse{ + Enabled: true, + Connected: m.connected, + Broker: m.broker, + ClientID: m.clientID, + TopicPrefix: m.prefix, + Subscribed: subs, + LastError: m.lastErr, + } +} + +// getMessages returns up to limit of the most recently received messages. +func (m *mqttManager) getMessages(limit int) []MQTTReceivedMsg { + if limit <= 0 { + limit = 50 + } + if limit > 500 { + limit = 500 + } + m.mu.RLock() + defer m.mu.RUnlock() + + total := len(m.msgHistory) + if total == 0 { + return []MQTTReceivedMsg{} + } + start := 0 + if total > limit { + start = total - limit + } + out := make([]MQTTReceivedMsg, total-start) + copy(out, m.msgHistory[start:]) + return out +} + +// startMQTTPublisher periodically publishes the current PLC state to MQTT. +// Topics published: +// +// {prefix}/data – full JSON APIState (same as /api/data) +// {prefix}/force/left – left column force in % +// {prefix}/force/right – right column force in % +// {prefix}/force/sum_kn – total kN +// {prefix}/connected – PLC connection boolean +func startMQTTPublisher(ctx context.Context) { + if mqttMgr == nil { + return + } + mcfg := getConfigSnapshot().MQTT + if !mcfg.AutoPublish { + return + } + + interval := time.Duration(mcfg.PublishIntervalMs) * time.Millisecond + ticker := time.NewTicker(interval) + defer ticker.Stop() + + prefix := mcfg.TopicPrefix + qos := byte(mcfg.QoS) + retain := mcfg.Retain + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s := snapshotState() + + full, err := json.Marshal(s) + if err == nil { + _ = mqttMgr.publish(prefix+"/data", string(full), qos, retain) + } + + _ = mqttMgr.publish(prefix+"/force/left", fmt.Sprintf("%.2f", s.SilaL), qos, retain) + _ = mqttMgr.publish(prefix+"/force/right", fmt.Sprintf("%.2f", s.SilaR), qos, retain) + _ = mqttMgr.publish(prefix+"/force/sum_kn", fmt.Sprintf("%.2f", s.SumkN), qos, retain) + _ = mqttMgr.publish(prefix+"/force/imbalance", fmt.Sprintf("%.2f", s.ImbalancePercent), qos, retain) + + connStr := "false" + if s.Connected { + connStr = "true" + } + _ = mqttMgr.publish(prefix+"/connected", connStr, qos, retain) + } + } +} + +// mqttPublishAlarm forwards a single alarm event to MQTT non-blocking. +// Called from enqueueAlarm; errors are silently discarded to avoid +// blocking the alarm pipeline. +func mqttPublishAlarm(a AlarmEvent) { + if mqttMgr == nil { + return + } + mcfg := getConfigSnapshot().MQTT + if !mcfg.Enabled { + return + } + + payload, err := json.Marshal(map[string]interface{}{ + "time": a.TS.UTC().Format(time.RFC3339), + "severity": a.Severity, + "source": a.Source, + "code": a.Code, + "state": a.State, + "message": a.Message, + "value": a.Value, + "limit": a.Limit, + }) + if err != nil { + return + } + _ = mqttMgr.publish(mcfg.TopicPrefix+"/alarm", string(payload), byte(mcfg.QoS), false) +} + // --------------------------------------------------------------------------- // Force calculation // --------------------------------------------------------------------------- @@ -598,7 +997,6 @@ func initCachedUI() { if err != nil { log.Fatalf("failed to pre-render UI template: %v", err) } - cfgMu.Lock() cachedUI = payload cfgMu.Unlock() @@ -628,7 +1026,6 @@ func configSectionChanges(oldCfg, newCfg Config) (hotSections []string, restartS if !reflect.DeepEqual(oldCfg.Modules, newCfg.Modules) { hotSections = append(hotSections, "modules") } - if !reflect.DeepEqual(oldCfg.Server, newCfg.Server) { restartSections = append(restartSections, "server") } @@ -638,7 +1035,9 @@ func configSectionChanges(oldCfg, newCfg Config) (hotSections []string, restartS if !reflect.DeepEqual(oldCfg.DB, newCfg.DB) { restartSections = append(restartSections, "db") } - + if !reflect.DeepEqual(oldCfg.MQTT, newCfg.MQTT) { + restartSections = append(restartSections, "mqtt") + } return hotSections, restartSections } @@ -666,7 +1065,6 @@ func reloadConfigSafely(configPath string) { log.Printf("config reload rejected: failed to rebuild UI: %v", err) return } - cfgMu.Lock() cfg = updatedCfg cachedUI = payload @@ -769,7 +1167,6 @@ func snapshotState() APIState { if !state.LastUpdate.IsZero() { lastUpdate = state.LastUpdate.Format(time.RFC3339Nano) } - return APIState{ Connected: state.Connected, SilaL: state.SilaL, @@ -811,6 +1208,8 @@ func enqueueAlarm(a AlarmEvent) { state.DroppedEvents++ state.Unlock() } + // Forward to MQTT non-blocking; ignore errors. + go mqttPublishAlarm(a) } // --------------------------------------------------------------------------- @@ -831,7 +1230,6 @@ func ensureColumn(database *sql.DB, tableName, columnName, definition string) er var notNull int var dfltValue sql.NullString var pk int - if err := rows.Scan(&cid, &name, &ctype, ¬Null, &dfltValue, &pk); err != nil { return err } @@ -846,13 +1244,12 @@ func ensureColumn(database *sql.DB, tableName, columnName, definition string) er if found { return nil } - _, err = database.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", tableName, columnName, definition)) return err } -func initDatabase(dbPath string) (*sql.DB, error) { - dsn := fmt.Sprintf("file:%s?_busy_timeout=%d&_foreign_keys=on", filepath.ToSlash(dbPath), cfg.DB.BusyTimeoutMs) +func initDatabase(dbPath string, dbCfg DBConfig) (*sql.DB, error) { + dsn := fmt.Sprintf("file:%s?_busy_timeout=%d&_foreign_keys=on", filepath.ToSlash(dbPath), dbCfg.BusyTimeoutMs) database, err := sql.Open("sqlite3", dsn) if err != nil { return nil, fmt.Errorf("open sqlite: %w", err) @@ -865,11 +1262,10 @@ func initDatabase(dbPath string) (*sql.DB, error) { pragmas := []string{ "PRAGMA journal_mode=WAL;", "PRAGMA synchronous=NORMAL;", - fmt.Sprintf("PRAGMA wal_autocheckpoint=%d;", cfg.DB.CheckpointPages), - fmt.Sprintf("PRAGMA busy_timeout=%d;", cfg.DB.BusyTimeoutMs), + fmt.Sprintf("PRAGMA wal_autocheckpoint=%d;", dbCfg.CheckpointPages), + fmt.Sprintf("PRAGMA busy_timeout=%d;", dbCfg.BusyTimeoutMs), "PRAGMA temp_store=MEMORY;", } - for _, q := range pragmas { if _, err := database.Exec(q); err != nil { _ = database.Close() @@ -913,38 +1309,32 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n return nil, fmt.Errorf("create schema: %w", err) } - if err := ensureColumn(database, "samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"); err != nil { - _ = database.Close() - return nil, fmt.Errorf("ensure ts_unix_ns column: %w", err) + // Schema migrations: add missing columns to support older databases. + migrations := []struct{ table, col, def string }{ + {"samples", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"}, + {"samples", "imbalance_pct", "REAL NOT NULL DEFAULT 0"}, + {"samples", "bias_pct", "REAL NOT NULL DEFAULT 0"}, + {"alarm_events", "ts_unix_ns", "INTEGER NOT NULL DEFAULT 0"}, } - if err := ensureColumn(database, "samples", "imbalance_pct", "REAL NOT NULL DEFAULT 0"); err != nil { - _ = database.Close() - return nil, fmt.Errorf("ensure imbalance_pct column: %w", err) - } - if err := ensureColumn(database, "samples", "bias_pct", "REAL NOT NULL DEFAULT 0"); err != nil { - _ = database.Close() - return nil, fmt.Errorf("ensure bias_pct column: %w", err) + for _, m := range migrations { + if err := ensureColumn(database, m.table, m.col, m.def); err != nil { + _ = database.Close() + return nil, fmt.Errorf("migration (%s.%s): %w", m.table, m.col, err) + } } + // Ensure the ts_unix_ns index exists (may have been missed pre-migration). if _, err := database.Exec(`CREATE INDEX IF NOT EXISTS idx_samples_ts_unix_ns ON samples(ts_unix_ns)`); err != nil { _ = database.Close() return nil, fmt.Errorf("create ts_unix_ns index: %w", err) } - if _, err := database.Exec(` - UPDATE samples - SET ts_unix_ns = CAST(strftime('%s', ts) AS INTEGER) * 1000000000 - WHERE ts_unix_ns = 0 AND ts IS NOT NULL - `); err != nil { - log.Printf("warning: ts_unix_ns backfill failed: %v", err) - } - - if _, err := database.Exec(` - UPDATE alarm_events - SET ts_unix_ns = CAST(strftime('%s', ts) AS INTEGER) * 1000000000 - WHERE ts_unix_ns = 0 AND ts IS NOT NULL - `); err != nil { - log.Printf("warning: alarm ts_unix_ns backfill failed: %v", err) + // Backfill ts_unix_ns for rows that pre-date the column. + for _, tbl := range []string{"samples", "alarm_events"} { + q := fmt.Sprintf(`UPDATE %s SET ts_unix_ns = CAST(strftime('%%s', ts) AS INTEGER) * 1000000000 WHERE ts_unix_ns = 0 AND ts IS NOT NULL`, tbl) + if _, err := database.Exec(q); err != nil { + log.Printf("warning: ts_unix_ns backfill for %s failed: %v", tbl, err) + } } return database, nil @@ -954,23 +1344,21 @@ CREATE INDEX IF NOT EXISTS idx_alarm_events_ts_unix_ns ON alarm_events(ts_unix_n // DB writer goroutines // --------------------------------------------------------------------------- -func startDBWriter(ctx context.Context, database *sql.DB) { - ticker := time.NewTicker(time.Duration(cfg.DB.FlushIntervalMs) * time.Millisecond) +func startDBWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) { + ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() - batch := make([]Sample, 0, cfg.DB.BatchSize) + batch := make([]Sample, 0, batchSize) flush := func() { if len(batch) == 0 { return } - tx, err := database.Begin() if err != nil { log.Printf("db begin failed: %v", err) return } - stmt, err := tx.Prepare(` INSERT INTO samples ( ts, ts_unix_ns, sila_l_pct, sila_r_pct, sila_l_kn, sila_r_kn, @@ -982,30 +1370,19 @@ func startDBWriter(ctx context.Context, database *sql.DB) { log.Printf("db prepare failed: %v", err) return } - ok := true for _, s := range batch { - _, err := stmt.Exec( - s.TS.UTC().Format(time.RFC3339Nano), - s.TS.UTC().UnixNano(), - s.SilaLPct, - s.SilaRPct, - s.SilaLKN, - s.SilaRKN, - s.SumPercent, - s.SumKN, - s.ImbalancePercent, - s.BiasPercent, - ) - if err != nil { + if _, err := stmt.Exec( + s.TS.UTC().Format(time.RFC3339Nano), s.TS.UTC().UnixNano(), + s.SilaLPct, s.SilaRPct, s.SilaLKN, s.SilaRKN, + s.SumPercent, s.SumKN, s.ImbalancePercent, s.BiasPercent, + ); err != nil { ok = false log.Printf("db insert failed: %v", err) break } } - _ = stmt.Close() - if !ok { _ = tx.Rollback() return @@ -1014,13 +1391,13 @@ func startDBWriter(ctx context.Context, database *sql.DB) { log.Printf("db commit failed: %v", err) return } - batch = batch[:0] } for { select { case <-ctx.Done(): + // Drain remaining samples before exit. for { select { case s := <-sampleCh: @@ -1032,7 +1409,7 @@ func startDBWriter(ctx context.Context, database *sql.DB) { } case s := <-sampleCh: batch = append(batch, s) - if len(batch) >= cfg.DB.BatchSize { + if len(batch) >= batchSize { flush() } case <-ticker.C: @@ -1041,23 +1418,21 @@ func startDBWriter(ctx context.Context, database *sql.DB) { } } -func startAlarmWriter(ctx context.Context, database *sql.DB) { - ticker := time.NewTicker(time.Duration(cfg.DB.FlushIntervalMs) * time.Millisecond) +func startAlarmWriter(ctx context.Context, database *sql.DB, batchSize, flushMs int) { + ticker := time.NewTicker(time.Duration(flushMs) * time.Millisecond) defer ticker.Stop() - batch := make([]AlarmEvent, 0, cfg.DB.BatchSize) + batch := make([]AlarmEvent, 0, batchSize) flush := func() { if len(batch) == 0 { return } - tx, err := database.Begin() if err != nil { log.Printf("alarm db begin failed: %v", err) return } - stmt, err := tx.Prepare(` INSERT INTO alarm_events ( ts, ts_unix_ns, severity, source, code, state, message, value, limit_value @@ -1068,29 +1443,18 @@ func startAlarmWriter(ctx context.Context, database *sql.DB) { log.Printf("alarm db prepare failed: %v", err) return } - ok := true for _, a := range batch { - _, err := stmt.Exec( - a.TS.UTC().Format(time.RFC3339Nano), - a.TS.UTC().UnixNano(), - a.Severity, - a.Source, - a.Code, - a.State, - a.Message, - a.Value, - a.Limit, - ) - if err != nil { + if _, err := stmt.Exec( + a.TS.UTC().Format(time.RFC3339Nano), a.TS.UTC().UnixNano(), + a.Severity, a.Source, a.Code, a.State, a.Message, a.Value, a.Limit, + ); err != nil { ok = false log.Printf("alarm db insert failed: %v", err) break } } - _ = stmt.Close() - if !ok { _ = tx.Rollback() return @@ -1099,7 +1463,6 @@ func startAlarmWriter(ctx context.Context, database *sql.DB) { log.Printf("alarm db commit failed: %v", err) return } - batch = batch[:0] } @@ -1117,7 +1480,7 @@ func startAlarmWriter(ctx context.Context, database *sql.DB) { } case a := <-alarmCh: batch = append(batch, a) - if len(batch) >= cfg.DB.BatchSize { + if len(batch) >= batchSize { flush() } case <-ticker.C: @@ -1126,26 +1489,27 @@ func startAlarmWriter(ctx context.Context, database *sql.DB) { } } -func startDBCleanup(ctx context.Context, database *sql.DB) { - if cfg.DB.RetentionDays <= 0 { +func startDBCleanup(ctx context.Context, database *sql.DB, retentionDays, intervalHr int) { + if retentionDays <= 0 { return } - ticker := time.NewTicker(time.Duration(cfg.DB.CleanupIntervalHr) * time.Hour) + ticker := time.NewTicker(time.Duration(intervalHr) * time.Hour) defer ticker.Stop() cleanup := func() { - cutoffNs := time.Now().AddDate(0, 0, -cfg.DB.RetentionDays).UTC().UnixNano() - if _, err := database.Exec(`DELETE FROM samples WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, cutoffNs); err != nil { - log.Printf("db cleanup samples failed: %v", err) - } - if _, err := database.Exec(`DELETE FROM alarm_events WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, cutoffNs); err != nil { - log.Printf("db cleanup alarms failed: %v", err) + cutoffNs := time.Now().AddDate(0, 0, -retentionDays).UTC().UnixNano() + for _, tbl := range []string{"samples", "alarm_events"} { + if _, err := database.Exec( + fmt.Sprintf(`DELETE FROM %s WHERE ts_unix_ns > 0 AND ts_unix_ns < ?`, tbl), + cutoffNs, + ); err != nil { + log.Printf("db cleanup %s failed: %v", tbl, err) + } } } cleanup() - for { select { case <-ctx.Done(): @@ -1160,7 +1524,7 @@ func startDBCleanup(ctx context.Context, database *sql.DB) { // Alarm zone helpers // --------------------------------------------------------------------------- -func zoneFromValue(value float64, warn, crit float64) string { +func zoneFromValue(value, warn, crit float64) string { if value >= crit { return "critical" } @@ -1210,14 +1574,11 @@ func maybeLogZoneChange(source, prev, curr string, value float64) { if prev == curr { return } - name := sourceName(source) now := time.Now() - if prev == "" && curr == "ok" { return } - switch curr { case "ok": enqueueAlarm(AlarmEvent{ @@ -1271,11 +1632,9 @@ func evaluateProcessAlarms(s Sample) { alarmTracker.Lock() defer alarmTracker.Unlock() - maybeLogZoneChange("force_left", alarmTracker.LeftZone, leftZone, float64(s.SilaLPct)) maybeLogZoneChange("force_right", alarmTracker.RightZone, rightZone, float64(s.SilaRPct)) maybeLogZoneChange("imbalance", alarmTracker.ImbZone, imbZone, float64(s.ImbalancePercent)) - alarmTracker.LeftZone = leftZone alarmTracker.RightZone = rightZone alarmTracker.ImbZone = imbZone @@ -1284,34 +1643,22 @@ func evaluateProcessAlarms(s Sample) { func maybeLogPLCConnected() { alarmTracker.Lock() defer alarmTracker.Unlock() - if !alarmTracker.PLCKnown { alarmTracker.PLCKnown = true alarmTracker.PLCConnected = true enqueueAlarm(AlarmEvent{ - TS: time.Now(), - Severity: "info", - Source: "plc", - Code: "plc_connected", - State: "info", - Message: "PLC connection established", - Value: 1, - Limit: 0, + TS: time.Now(), Severity: "info", Source: "plc", + Code: "plc_connected", State: "info", + Message: "PLC connection established", Value: 1, }) return } - if !alarmTracker.PLCConnected { alarmTracker.PLCConnected = true enqueueAlarm(AlarmEvent{ - TS: time.Now(), - Severity: "info", - Source: "plc", - Code: "plc_restored", - State: "info", - Message: "PLC connection restored", - Value: 1, - Limit: 0, + TS: time.Now(), Severity: "info", Source: "plc", + Code: "plc_restored", State: "info", + Message: "PLC connection restored", Value: 1, }) } } @@ -1319,25 +1666,17 @@ func maybeLogPLCConnected() { func maybeLogPLCDisconnected(reason string) { alarmTracker.Lock() defer alarmTracker.Unlock() - if !alarmTracker.PLCKnown || !alarmTracker.PLCConnected { return } - alarmTracker.PLCConnected = false alarmTracker.LeftZone = "" alarmTracker.RightZone = "" alarmTracker.ImbZone = "" - enqueueAlarm(AlarmEvent{ - TS: time.Now(), - Severity: "critical", - Source: "plc", - Code: "plc_disconnected", - State: "active", - Message: "PLC connection lost: " + reason, - Value: 0, - Limit: 0, + TS: time.Now(), Severity: "critical", Source: "plc", + Code: "plc_disconnected", State: "active", + Message: "PLC connection lost: " + reason, Value: 0, }) } @@ -1417,15 +1756,10 @@ func startPLCPoller(ctx context.Context) { state.Unlock() sample := Sample{ - TS: now, - SilaLPct: silaL, - SilaRPct: silaR, - SilaLKN: leftKN, - SilaRKN: rightKN, - SumPercent: sumPercent, - SumKN: sumKN, - ImbalancePercent: imbalance, - BiasPercent: bias, + TS: now, SilaLPct: silaL, SilaRPct: silaR, + SilaLKN: leftKN, SilaRKN: rightKN, + SumPercent: sumPercent, SumKN: sumKN, + ImbalancePercent: imbalance, BiasPercent: bias, } evaluateProcessAlarms(sample) @@ -1450,16 +1784,13 @@ func parseWindow(raw string) (time.Duration, string, error) { if s == "" { s = fmt.Sprintf("%dm", getConfigSnapshot().Trend.Minutes) } - if strings.HasSuffix(s, "d") { n, err := strconv.Atoi(strings.TrimSuffix(s, "d")) if err != nil || n <= 0 { return 0, "", fmt.Errorf("invalid day window") } - d := time.Duration(n) * 24 * time.Hour - return d, s, nil + return time.Duration(n) * 24 * time.Hour, s, nil } - d, err := time.ParseDuration(s) if err != nil || d <= 0 { return 0, "", fmt.Errorf("invalid window") @@ -1505,10 +1836,11 @@ func queryHistory(window time.Duration) ([]HistoryPoint, error) { return nil, err } - if len(points) <= cfg.DB.MaxChartPoints { + maxPts := getConfigSnapshot().DB.MaxChartPoints + if len(points) <= maxPts { return points, nil } - return downsamplePoints(points, cfg.DB.MaxChartPoints), nil + return downsamplePoints(points, maxPts), nil } func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint { @@ -1535,6 +1867,7 @@ func downsamplePoints(points []HistoryPoint, max int) []HistoryPoint { if len(out) == 0 { return points } + // Always ensure last point is the most recent. out[len(out)-1] = points[len(points)-1] return out } @@ -1555,7 +1888,6 @@ func queryNumericStats(field string, fromNs, toNs int64) (NumericStats, error) { if err != nil { return NumericStats{}, err } - query := fmt.Sprintf(` SELECT COALESCE(AVG(%[1]s), 0), @@ -1596,7 +1928,6 @@ func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sa if sampleCount < 8 { return "insufficient_data", "Too few samples in selected trend window" } - config := getConfigSnapshot() if forceStd >= 6.0 || math.Abs(forceDelta) >= 8.0 || avgImb5m >= config.Thresholds.ImbalanceCriticalPercent || imbStd >= 4.0 { @@ -1611,7 +1942,6 @@ func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sa } return "unstable", "Imbalance variation is too high" } - if forceStd >= 3.0 || math.Abs(forceDelta) >= 3.0 || avgImb5m >= config.Thresholds.ImbalanceWarningPercent || imbStd >= 2.0 { if avgImb5m >= config.Thresholds.ImbalanceWarningPercent { return "caution", "Imbalance is trending above warning region" @@ -1624,7 +1954,6 @@ func classifyProcessStability(forceStd, imbStd, forceDelta, avgImb5m float64, sa } return "caution", "Imbalance repeatability is degrading" } - return "stable", "Process variation is low" } @@ -1650,7 +1979,6 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro if err != nil { return TrendResponse{}, err } - forceOld, err := queryNumericStats("sum_pct", startNs, midNs) if err != nil { return TrendResponse{}, err @@ -1684,11 +2012,8 @@ func buildTrendResponse(window time.Duration, label string) (TrendResponse, erro } stability, reason := classifyProcessStability( - fullWindowForce.StdDev(), - fullWindowImb.StdDev(), - forceDelta, - imb5m.Avg, - fullWindowForce.Count, + fullWindowForce.StdDev(), fullWindowImb.StdDev(), + forceDelta, imb5m.Avg, fullWindowForce.Count, ) return TrendResponse{ @@ -1718,7 +2043,6 @@ func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) { if limit > 100 { limit = 100 } - rows, err := db.Query(` SELECT ts, severity, source, state, message, value, limit_value FROM alarm_events @@ -1734,113 +2058,124 @@ func queryAlarmEvents(limit int) ([]AlarmEventAPI, error) { for rows.Next() { var ts, severity, source, state, message string var value, limitValue float64 - if err := rows.Scan(&ts, &severity, &source, &state, &message, &value, &limitValue); err != nil { return nil, err } - - t, err := time.Parse(time.RFC3339Nano, ts) displayTime := ts - if err == nil { + if t, err := time.Parse(time.RFC3339Nano, ts); err == nil { displayTime = t.Local().Format("02.01.2006 15:04:05") } - events = append(events, AlarmEventAPI{ - Time: displayTime, - Severity: severity, - Source: source, - State: state, - Message: message, - Value: value, - Limit: limitValue, + Time: displayTime, Severity: severity, Source: source, + State: state, Message: message, Value: value, Limit: limitValue, }) } - - if err := rows.Err(); err != nil { - return nil, err - } - return events, nil + return events, rows.Err() } // --------------------------------------------------------------------------- -// HTTP handlers +// HTTP helpers +// --------------------------------------------------------------------------- + +// writeJSON writes v as JSON with correct headers and CORS. +func writeJSON(w http.ResponseWriter, status int, v any) { + h := w.Header() + h.Set("Content-Type", "application/json") + h.Set("Cache-Control", "no-store") + h.Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +// allowMethod checks r.Method and handles CORS preflight. +// Returns false if the caller should stop processing. +func allowMethod(w http.ResponseWriter, r *http.Request, method string) bool { + if r.Method == http.MethodOptions { + h := w.Header() + h.Set("Access-Control-Allow-Origin", "*") + h.Set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS") + h.Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(http.StatusNoContent) + return false + } + if r.Method != method { + http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) + return false + } + return true +} + +// --------------------------------------------------------------------------- +// HTTP handlers — core // --------------------------------------------------------------------------- func apiData(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-store") - _ = json.NewEncoder(w).Encode(snapshotState()) + if !allowMethod(w, r, http.MethodGet) { + return + } + writeJSON(w, http.StatusOK, snapshotState()) } func apiUIRevision(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-store") - _ = json.NewEncoder(w).Encode(map[string]uint64{ - "revision": atomic.LoadUint64(&uiRevision), - }) + if !allowMethod(w, r, http.MethodGet) { + return + } + writeJSON(w, http.StatusOK, map[string]uint64{"revision": atomic.LoadUint64(&uiRevision)}) } func apiHistory(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } window, label, err := parseWindow(r.URL.Query().Get("window")) if err != nil { - http.Error(w, "invalid window", http.StatusBadRequest) + http.Error(w, `{"error":"invalid window"}`, http.StatusBadRequest) return } - points, err := queryHistory(window) if err != nil { - http.Error(w, "history query failed", http.StatusInternalServerError) log.Printf("history query failed: %v", err) + http.Error(w, `{"error":"history query failed"}`, http.StatusInternalServerError) return } - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-store") - _ = json.NewEncoder(w).Encode(HistoryResponse{ - Window: label, - Points: points, - }) + writeJSON(w, http.StatusOK, HistoryResponse{Window: label, Points: points}) } func apiTrend(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } window, label, err := parseWindow(r.URL.Query().Get("window")) if err != nil { - http.Error(w, "invalid trend window", http.StatusBadRequest) + http.Error(w, `{"error":"invalid trend window"}`, http.StatusBadRequest) return } - resp, err := buildTrendResponse(window, label) if err != nil { - http.Error(w, "trend query failed", http.StatusInternalServerError) log.Printf("trend query failed: %v", err) + http.Error(w, `{"error":"trend query failed"}`, http.StatusInternalServerError) return } - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-store") - _ = json.NewEncoder(w).Encode(resp) + writeJSON(w, http.StatusOK, resp) } func apiAlarms(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } limit := 20 if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { if n, err := strconv.Atoi(raw); err == nil && n > 0 { limit = n } } - events, err := queryAlarmEvents(limit) if err != nil { - http.Error(w, "alarm query failed", http.StatusInternalServerError) log.Printf("alarm query failed: %v", err) + http.Error(w, `{"error":"alarm query failed"}`, http.StatusInternalServerError) return } - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-store") - _ = json.NewEncoder(w).Encode(AlarmResponse{ - Events: events, - }) + writeJSON(w, http.StatusOK, AlarmResponse{Events: events}) } func serveUI(w http.ResponseWriter, r *http.Request) { @@ -1852,6 +2187,144 @@ func serveUI(w http.ResponseWriter, r *http.Request) { _, _ = w.Write(payload) } +// --------------------------------------------------------------------------- +// HTTP handlers — MQTT REST API +// --------------------------------------------------------------------------- + +// GET /api/mqtt/status +// Returns MQTT connection state, broker info, and active subscriptions. +func apiMQTTStatus(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } + if mqttMgr == nil { + writeJSON(w, http.StatusOK, MQTTStatusResponse{ + Enabled: false, + Connected: false, + Broker: "", + Subscribed: []string{}, + }) + return + } + writeJSON(w, http.StatusOK, mqttMgr.status()) +} + +// POST /api/mqtt/publish +// Body: {"topic":"plant/press/cmd","payload":"reset","qos":1,"retain":false} +// Publishes an arbitrary message to the MQTT broker. +func apiMQTTPublish(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodPost) { + return + } + if mqttMgr == nil { + http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) + return + } + + var req MQTTPublishRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) + return + } + if strings.TrimSpace(req.Topic) == "" { + http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) + return + } + if req.QoS < 0 || req.QoS > 2 { + http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest) + return + } + + if err := mqttMgr.publish(req.Topic, req.Payload, byte(req.QoS), req.Retain); err != nil { + writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "published", "topic": req.Topic}) +} + +// GET /api/mqtt/messages?limit=50 +// Returns the most recently received MQTT messages (across all subscribed topics). +func apiMQTTMessages(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodGet) { + return + } + if mqttMgr == nil { + writeJSON(w, http.StatusOK, map[string]any{"messages": []MQTTReceivedMsg{}, "enabled": false}) + return + } + limit := 50 + if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { + if n, err := strconv.Atoi(raw); err == nil && n > 0 { + limit = n + } + } + msgs := mqttMgr.getMessages(limit) + writeJSON(w, http.StatusOK, map[string]any{"messages": msgs, "count": len(msgs)}) +} + +// POST /api/mqtt/subscribe +// Body: {"topic":"plant/press/#","qos":1} +// Subscribes to a topic filter. Messages are stored in the ring buffer +// and accessible via GET /api/mqtt/messages. +func apiMQTTSubscribe(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodPost) { + return + } + if mqttMgr == nil { + http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) + return + } + + var req MQTTSubscribeRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) + return + } + if strings.TrimSpace(req.Topic) == "" { + http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) + return + } + if req.QoS < 0 || req.QoS > 2 { + http.Error(w, `{"error":"qos must be 0, 1, or 2"}`, http.StatusBadRequest) + return + } + + if err := mqttMgr.subscribe(req.Topic, byte(req.QoS)); err != nil { + writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "subscribed", "topic": req.Topic}) +} + +// DELETE /api/mqtt/subscribe +// Body: {"topic":"plant/press/#"} +// Unsubscribes from a topic filter. +func apiMQTTUnsubscribe(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, http.MethodDelete) { + return + } + if mqttMgr == nil { + http.Error(w, `{"error":"MQTT not enabled"}`, http.StatusServiceUnavailable) + return + } + + var req MQTTSubscribeRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) + return + } + if strings.TrimSpace(req.Topic) == "" { + http.Error(w, `{"error":"topic required"}`, http.StatusBadRequest) + return + } + + if err := mqttMgr.unsubscribe(req.Topic); err != nil { + writeJSON(w, http.StatusBadGateway, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "unsubscribed", "topic": req.Topic}) +} + // --------------------------------------------------------------------------- // main // --------------------------------------------------------------------------- @@ -1876,7 +2349,8 @@ func main() { dbPath = filepath.Join(wd, dbPath) } - db, err = initDatabase(dbPath) + // Pass DB config explicitly so initDatabase doesn't touch the global. + db, err = initDatabase(dbPath, cfg.DB) if err != nil { log.Fatalf("failed to init database: %v", err) } @@ -1894,6 +2368,19 @@ func main() { cfg.PLC.IP, cfg.PLC.DBNum, cfg.PLC.Rack, cfg.PLC.Slot, cfg.PLC.PollMs) log.Printf("Press: MAX_TONNAGE=%.2f %s", cfg.Press.MaxTonnage, cfg.UI.UnitForce) + // Initialise MQTT if enabled. + if cfg.MQTT.Enabled { + mqttMgr = newMQTTManager(cfg.MQTT) + if err := mqttMgr.connect(); err != nil { + // Non-fatal: paho will reconnect automatically. + log.Printf("MQTT initial connect failed (will retry): %v", err) + } else { + log.Printf("MQTT: connected to %s (prefix=%s)", cfg.MQTT.Broker, cfg.MQTT.TopicPrefix) + } + } else { + log.Printf("MQTT: disabled (set mqtt.enabled: true in config to enable)") + } + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -1903,13 +2390,21 @@ func main() { log.Printf("Config watcher enabled for %s", configPath) } + // Snapshot DB config values once (DB is not hot-reloadable). + dbCfg := cfg.DB + var wg sync.WaitGroup wg.Add(4) - go func() { defer wg.Done(); startDBWriter(ctx, db) }() - go func() { defer wg.Done(); startAlarmWriter(ctx, db) }() - go func() { defer wg.Done(); startDBCleanup(ctx, db) }() + go func() { defer wg.Done(); startDBWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }() + go func() { defer wg.Done(); startAlarmWriter(ctx, db, dbCfg.BatchSize, dbCfg.FlushIntervalMs) }() + go func() { defer wg.Done(); startDBCleanup(ctx, db, dbCfg.RetentionDays, dbCfg.CleanupIntervalHr) }() go func() { defer wg.Done(); startPLCPoller(ctx) }() + if cfg.MQTT.Enabled { + wg.Add(1) + go func() { defer wg.Done(); startMQTTPublisher(ctx) }() + } + staticRoot, err := fs.Sub(staticFiles, "static") if err != nil { log.Fatalf("failed to mount embedded static files: %v", err) @@ -1918,18 +2413,37 @@ func main() { mux := http.NewServeMux() mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(staticRoot)))) mux.HandleFunc("/", serveUI) + + // Core data API mux.HandleFunc("/api/data", apiData) mux.HandleFunc("/api/ui-revision", apiUIRevision) mux.HandleFunc("/api/history", apiHistory) mux.HandleFunc("/api/trend", apiTrend) mux.HandleFunc("/api/alarms", apiAlarms) + // MQTT REST API + // + // GET /api/mqtt/status → connection status, broker, subscriptions + // POST /api/mqtt/publish → publish message to any topic + // GET /api/mqtt/messages[?limit=N] → last N received messages + // POST /api/mqtt/subscribe → subscribe to topic filter + // DELETE /api/mqtt/subscribe → unsubscribe from topic filter + mux.HandleFunc("/api/mqtt/status", apiMQTTStatus) + mux.HandleFunc("/api/mqtt/publish", apiMQTTPublish) + mux.HandleFunc("/api/mqtt/messages", apiMQTTMessages) + mux.HandleFunc("/api/mqtt/subscribe", apiMQTTSubscribe) + mux.HandleFunc("/api/mqtt/unsubscribe", apiMQTTUnsubscribe) + srv := &http.Server{ - Addr: cfg.Server.ListenAddr, - Handler: mux, + Addr: cfg.Server.ListenAddr, + Handler: mux, + ReadTimeout: 15 * time.Second, + WriteTimeout: 15 * time.Second, + IdleTimeout: 60 * time.Second, } log.Printf("Listening on http://localhost%s", cfg.Server.ListenAddr) + log.Printf("MQTT API: GET /api/mqtt/status | POST /api/mqtt/publish | GET /api/mqtt/messages") go func() { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { @@ -1946,6 +2460,11 @@ func main() { log.Printf("HTTP server shutdown error: %v", err) } + if mqttMgr != nil { + mqttMgr.disconnect() + log.Println("MQTT disconnected") + } + wg.Wait() log.Println("Shutdown complete") } @@ -1970,10 +2489,7 @@ const uiHTML = ` } * { box-sizing: border-box; } - - html, body { - min-height: 100%; - } + html, body { min-height: 100%; } body { font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; @@ -2024,56 +2540,21 @@ const uiHTML = ` } body[data-theme="light"] .text-zinc-100, - body[data-theme="light"] .text-zinc-200 { - color: #0f172a !important; - } - - body[data-theme="light"] .text-zinc-300 { - color: #1e293b !important; - } - + body[data-theme="light"] .text-zinc-200 { color: #0f172a !important; } + body[data-theme="light"] .text-zinc-300 { color: #1e293b !important; } body[data-theme="light"] .text-zinc-400, - body[data-theme="light"] .text-zinc-500 { - color: #475569 !important; - } - - body[data-theme="light"] .text-emerald-300 { - color: #059669 !important; - } - - body[data-theme="light"] .text-emerald-400 { - color: #10b981 !important; - } - + body[data-theme="light"] .text-zinc-500 { color: #475569 !important; } + body[data-theme="light"] .text-emerald-300 { color: #059669 !important; } + body[data-theme="light"] .text-emerald-400 { color: #10b981 !important; } body[data-theme="light"] .text-sky-100, - body[data-theme="light"] .text-sky-200 { - color: #0369a1 !important; - } - + body[data-theme="light"] .text-sky-200 { color: #0369a1 !important; } body[data-theme="light"] .text-violet-100, - body[data-theme="light"] .text-violet-200 { - color: #7c3aed !important; - } - - body[data-theme="light"] .text-amber-200 { - color: #b45309 !important; - } - - body[data-theme="light"] .text-sky-400 { - color: #0284c7 !important; - } - - body[data-theme="light"] .text-violet-400 { - color: #7c3aed !important; - } - - body[data-theme="light"] .text-red-400 { - color: #dc2626 !important; - } - - body[data-theme="light"] .text-yellow-400 { - color: #b45309 !important; - } + body[data-theme="light"] .text-violet-200 { color: #7c3aed !important; } + body[data-theme="light"] .text-amber-200 { color: #b45309 !important; } + body[data-theme="light"] .text-sky-400 { color: #0284c7 !important; } + body[data-theme="light"] .text-violet-400 { color: #7c3aed !important; } + body[data-theme="light"] .text-red-400 { color: #dc2626 !important; } + body[data-theme="light"] .text-yellow-400 { color: #b45309 !important; } .control-btn { display: inline-flex; @@ -2131,14 +2612,9 @@ const uiHTML = ` justify-content: center; } - .gauge-head-copy.with-digital { - align-items: flex-start; - } + .gauge-head-copy.with-digital { align-items: flex-start; } - .gauge-digital { - text-align: right; - flex-shrink: 0; - } + .gauge-digital { text-align: right; flex-shrink: 0; } .gauge-container { position: relative; @@ -2147,22 +2623,12 @@ const uiHTML = ` margin: 0 auto; } - .gauge-container.no-digital { - height: clamp(430px, 48vw, 560px); - } + .gauge-container.no-digital { height: clamp(430px, 48vw, 560px); } + .gauge-container.with-digital { height: clamp(360px, 42vw, 500px); } - .gauge-container.with-digital { - height: clamp(360px, 42vw, 500px); - } + .gauge-canvas { width: 100%; height: 100%; display: block; } - .gauge-canvas { - width: 100%; - height: 100%; - display: block; - } - - .window-btn.active, - .trend-window-btn.active { + .window-btn.active, .trend-window-btn.active { border-color: rgba(34,211,238,0.9); color: white; background: rgba(34,211,238,0.14); @@ -2175,14 +2641,9 @@ const uiHTML = ` background: rgba(14,165,233,0.12); } - .chart-wrap { - width: min(92vw, 1800px); - margin: 0 auto; - } + .chart-wrap { width: min(92vw, 1800px); margin: 0 auto; } - .summary-card, - .intel-card, - .verdict-card { + .summary-card, .intel-card, .verdict-card { display: flex; align-items: center; justify-content: space-between; @@ -2194,64 +2655,30 @@ const uiHTML = ` transition: 180ms ease; } - .intel-card { - min-height: 126px; - align-items: flex-start; - } + .intel-card { min-height: 126px; align-items: flex-start; } - .summary-card.ok, - .intel-card.ok, - .verdict-card.ok { + .summary-card.ok, .intel-card.ok, .verdict-card.ok { border-color: rgba(34,197,94,0.35); box-shadow: 0 0 0 1px rgba(34,197,94,0.08) inset, 0 0 26px rgba(34,197,94,0.06); } - - .summary-card.warning, - .intel-card.warning, - .verdict-card.warning { + .summary-card.warning, .intel-card.warning, .verdict-card.warning { border-color: rgba(234,179,8,0.35); box-shadow: 0 0 0 1px rgba(234,179,8,0.08) inset, 0 0 26px rgba(234,179,8,0.06); } - - .summary-card.critical, - .intel-card.critical, - .verdict-card.critical { + .summary-card.critical, .intel-card.critical, .verdict-card.critical { border-color: rgba(239,68,68,0.35); box-shadow: 0 0 0 1px rgba(239,68,68,0.08) inset, 0 0 26px rgba(239,68,68,0.06); } - - .summary-card.neutral, - .intel-card.neutral, - .verdict-card.neutral { + .summary-card.neutral, .intel-card.neutral, .verdict-card.neutral { border-color: rgba(113,113,122,0.35); box-shadow: 0 0 0 1px rgba(113,113,122,0.08) inset; } - .summary-dot { - width: 14px; - height: 14px; - border-radius: 999px; - } - - .summary-dot.ok { - background: #10b981; - box-shadow: 0 0 14px rgba(16,185,129,0.55); - } - - .summary-dot.warning { - background: #f59e0b; - box-shadow: 0 0 14px rgba(245,158,11,0.55); - } - - .summary-dot.critical { - background: #ef4444; - box-shadow: 0 0 14px rgba(239,68,68,0.55); - } - - .summary-dot.neutral { - background: #71717a; - box-shadow: 0 0 12px rgba(113,113,122,0.35); - } + .summary-dot { width: 14px; height: 14px; border-radius: 999px; } + .summary-dot.ok { background: #10b981; box-shadow: 0 0 14px rgba(16,185,129,0.55); } + .summary-dot.warning { background: #f59e0b; box-shadow: 0 0 14px rgba(245,158,11,0.55); } + .summary-dot.critical { background: #ef4444; box-shadow: 0 0 14px rgba(239,68,68,0.55); } + .summary-dot.neutral { background: #71717a; box-shadow: 0 0 12px rgba(113,113,122,0.35); } .summary-status.ok { color: #34d399; } .summary-status.warning { color: #facc15; } @@ -2266,16 +2693,9 @@ const uiHTML = ` color: #f4f4f5; } - body[data-theme="light"] .intel-value { - color: #0f172a; - } + body[data-theme="light"] .intel-value { color: #0f172a; } - .intel-sub { - font-size: 0.83rem; - color: #a1a1aa; - margin-top: 10px; - line-height: 1.35; - } + .intel-sub { font-size: 0.83rem; color: #a1a1aa; margin-top: 10px; line-height: 1.35; } .intel-kpi { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; @@ -2287,10 +2707,7 @@ const uiHTML = ` .dir-down { color: #34d399; } .dir-flat { color: #a1a1aa; } .dir-bad { color: #f87171; } - - .mini-mono { - font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; - } + .mini-mono { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; } .severity-pill { display: inline-flex; @@ -2304,48 +2721,17 @@ const uiHTML = ` letter-spacing: 0.04em; } - .severity-pill.info { - background: rgba(59,130,246,0.12); - color: #93c5fd; - border: 1px solid rgba(59,130,246,0.22); - } + .severity-pill.info { background: rgba(59,130,246,0.12); color: #93c5fd; border: 1px solid rgba(59,130,246,0.22); } + .severity-pill.warning { background: rgba(245,158,11,0.12); color: #fde68a; border: 1px solid rgba(245,158,11,0.22); } + .severity-pill.critical { background: rgba(239,68,68,0.12); color: #fca5a5; border: 1px solid rgba(239,68,68,0.22); } + body[data-theme="light"] .severity-pill.info { color: #1d4ed8; } + body[data-theme="light"] .severity-pill.warning { color: #b45309; } + body[data-theme="light"] .severity-pill.critical { color: #dc2626; } - .severity-pill.warning { - background: rgba(245,158,11,0.12); - color: #fde68a; - border: 1px solid rgba(245,158,11,0.22); - } + .alarm-table tbody tr:hover { background: rgba(255,255,255,0.03); } + body[data-theme="light"] .alarm-table tbody tr:hover { background: rgba(15,23,42,0.03); } - .severity-pill.critical { - background: rgba(239,68,68,0.12); - color: #fca5a5; - border: 1px solid rgba(239,68,68,0.22); - } - - body[data-theme="light"] .severity-pill.info { - color: #1d4ed8; - } - - body[data-theme="light"] .severity-pill.warning { - color: #b45309; - } - - body[data-theme="light"] .severity-pill.critical { - color: #dc2626; - } - - .alarm-table tbody tr:hover { - background: rgba(255,255,255,0.03); - } - - body[data-theme="light"] .alarm-table tbody tr:hover { - background: rgba(15,23,42,0.03); - } - - .limit-line-note { - font-size: 0.8rem; - color: #a1a1aa; - } + .limit-line-note { font-size: 0.8rem; color: #a1a1aa; } .process-offline { opacity: 0.35; @@ -2465,19 +2851,16 @@ const uiHTML = `
TOTAL %
0.0 {{.UnitPct}}
-
IMBALANCE
0.0 {{.UnitPct}}
abs(L - R)
-
BIAS
0.0 {{.UnitPct}}
L - R
-
LIMITS
Force W {{printf "%.0f" .WarningPercent}} / C {{printf "%.0f" .CriticalPercent}}
@@ -2495,7 +2878,6 @@ const uiHTML = `

Drift / Deterioration Intelligence

Averages, drift direction, imbalance deterioration and process stability
-
@@ -2507,47 +2889,32 @@ const uiHTML = `
-
-
-
-
AVG PEAK 5 MIN
-
--
-
No data
-
-
- -
-
-
AVG PEAK 1 HOUR
-
--
-
No data
-
-
- -
-
-
FORCE TREND
-
--
-
No data
-
-
- -
-
-
IMBALANCE TREND
-
--
-
No data
-
-
- -
-
-
PROCESS STABILITY
-
--
-
No data
-
-
+
+
AVG PEAK 5 MIN
+
--
+
No data
+
+
+
AVG PEAK 1 HOUR
+
--
+
No data
+
+
+
FORCE TREND
+
--
+
No data
+
+
+
IMBALANCE TREND
+
--
+
No data
+
+
+
PROCESS STABILITY
+
--
+
No data
+
{{end}} @@ -2561,7 +2928,6 @@ const uiHTML = `
Newest events first • clear events included
-
@@ -2575,9 +2941,7 @@ const uiHTML = ` - - - +
No events yet
No events yet
@@ -2596,7 +2960,6 @@ const uiHTML = `
NORMAL
-
0.0
{{.UnitPct}}
@@ -2612,7 +2975,6 @@ const uiHTML = `
{{end}} -
@@ -2628,7 +2990,6 @@ const uiHTML = `
NORMAL
-
0.0
{{.UnitPct}}
@@ -2644,7 +3005,6 @@ const uiHTML = `
{{end}} -
@@ -2660,7 +3020,6 @@ const uiHTML = `

Peak Trend

Piezo peak/stroke history from SQLite with visible warning and critical limits
-
@@ -2675,13 +3034,13 @@ const uiHTML = `
-
{{end}} +