diff --git a/home/home.go b/home/home.go index 28b5d105..cbe2c610 100644 --- a/home/home.go +++ b/home/home.go @@ -19,9 +19,9 @@ import ( "mu/internal/event" "mu/news" "mu/social" + "mu/stream" "mu/markets" "mu/reminder" - "mu/user" "mu/video" "mu/weather" ) @@ -316,28 +316,38 @@ var e='';var d=(w.desc||'').toLowerCase();for(var k in emoji){if(d.indexOf(k)>=0 document.getElementById('home-date-weather').textContent=w.temp+'°C '+(e||''); })()`) - // Status card content (will be prepended to left column). - // Built by user.RenderStatusStream so the fragment endpoint and the - // home card share one code path. The #home-status-wrap element is - // polled every ~10 seconds for near-real-time updates, and the - // compose form submits via fetch so the stream refreshes in place. + // View toggle — Console (stream) or Cards (dashboard) var viewerID string if sess, _ := auth.TrySession(r); sess != nil { viewerID = sess.Account } - statusInner := user.RenderStatusStream(viewerID) - statusCardBody := `
` + statusInner + `
` + statusCardScript - statusCardHTML := fmt.Sprintf( - app.CardTemplate, - "status", "status", "Status", - statusCardBody, - ) - - // Feed section — existing home cards below the agent - var leftHTML []string - if statusCardHTML != "" { - leftHTML = append(leftHTML, statusCardHTML) + b.WriteString(`
`) + for _, t := range []struct{ id, label string }{{"console", "Console"}, {"cards", "Cards"}} { + b.WriteString(fmt.Sprintf(`%s`, t.id, t.label)) + } + b.WriteString(`
`) + + // ── Console view (stream) ── + consoleEvents := stream.Recent(stream.StreamLimit, viewerID) + consoleEvents = stream.DedupeAdjacent(consoleEvents) + b.WriteString(``) + + // ── Cards view (dashboard) ── + b.WriteString(`
`) + + var leftHTML []string var rightHTML []string tooltips := map[string]string{ @@ -375,6 +385,26 @@ document.getElementById('home-date-weather').textContent=w.temp+'°C '+(e||''); strings.Join(rightHTML, "\n"))) } + b.WriteString(`
`) // close #home-cards + + // Tab toggle JS — persists choice in localStorage. + b.WriteString(``) + // Auto-refresh: poll every 2 minutes, update card content in-place displayMode := r.URL.Query().Get("mode") == "display" refreshInterval := 120000 // 2 minutes @@ -443,6 +473,64 @@ func htmlEsc(s string) string { // // The script is defensive: if anything throws, the form still falls // back to its native POST + redirect behaviour. +// consoleScript handles polling + form submit for the console stream +// embedded on the home page. +const consoleScript = `` + const statusCardScript = `` diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 00000000..7115ff8b --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,286 @@ +// Package stream is the platform-level event stream. Every building +// block publishes to it. Users interact with it via the console. +// The agent responds in it. It is the operational surface of Mu. +// +// This is NOT status updates (profile feature) and NOT social +// (threaded forum). It's a single append-only timeline of typed +// events that powers the home console. +package stream + +import ( + "encoding/json" + "fmt" + "sort" + "strings" + "sync" + "time" + + "mu/internal/app" + "mu/internal/auth" + "mu/internal/data" +) + +// Event types. +const ( + TypeUser = "user" // human typed in the console + TypeAgent = "agent" // @micro response + TypeSystem = "system" // mail notification, account event + TypeMarket = "market" // price movement + TypeNews = "news" // breaking headline + TypeReminder = "reminder" // daily reminder +) + +// Event is a single entry in the stream. +type Event struct { + ID string `json:"id"` + Type string `json:"type"` + AuthorID string `json:"author_id"` + Author string `json:"author"` + Content string `json:"content"` + Metadata map[string]any `json:"metadata,omitempty"` + CreatedAt time.Time `json:"created_at"` +} + +// MaxEvents is the number of events kept in memory / on disk. +const MaxEvents = 500 + +// MaxContentLength caps event content. +const MaxContentLength = 1024 + +var ( + mu sync.RWMutex + events []*Event // newest first +) + +func init() { + b, err := data.LoadFile("stream.json") + if err != nil { + return + } + var loaded []*Event + if json.Unmarshal(b, &loaded) == nil { + mu.Lock() + events = loaded + mu.Unlock() + } +} + +// Load initialises the stream package. +func Load() { + app.Log("stream", "Loaded %d events", len(events)) +} + +func save() { + data.SaveJSON("stream.json", events) +} + +// Publish appends an event to the stream. This is the single entry +// point — every publisher (user, agent, system, markets, news, +// reminder) calls this. +func Publish(e *Event) { + if e.Content == "" { + return + } + if len(e.Content) > MaxContentLength { + e.Content = e.Content[:MaxContentLength-1] + "…" + } + if e.ID == "" { + e.ID = fmt.Sprintf("%d", time.Now().UnixNano()) + } + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now() + } + if e.Author == "" && e.AuthorID != "" { + if acc, err := auth.GetAccount(e.AuthorID); err == nil { + e.Author = acc.Name + } else if e.AuthorID == app.SystemUserID { + e.Author = app.SystemUserName + } else { + e.Author = e.AuthorID + } + } + + mu.Lock() + events = append([]*Event{e}, events...) + if len(events) > MaxEvents { + events = events[:MaxEvents] + } + save() + mu.Unlock() +} + +// PostUser is a convenience for human messages from the console. +func PostUser(accountID, content string) *Event { + name := accountID + if acc, err := auth.GetAccount(accountID); err == nil { + name = acc.Name + } + e := &Event{ + Type: TypeUser, + AuthorID: accountID, + Author: name, + Content: content, + } + Publish(e) + return e +} + +// PostAgent is a convenience for @micro responses. +func PostAgent(content string) *Event { + e := &Event{ + Type: TypeAgent, + AuthorID: app.SystemUserID, + Author: app.SystemUserName, + Content: content, + } + Publish(e) + return e +} + +// PostSystem posts a system notification (mail, account events). +func PostSystem(content string, meta map[string]any) *Event { + e := &Event{ + Type: TypeSystem, + AuthorID: app.SystemUserID, + Author: app.SystemUserName, + Content: content, + Metadata: meta, + } + Publish(e) + return e +} + +// Recent returns the most recent events, newest first, up to max. +// viewerID is used to include banned users' own posts. +func Recent(max int, viewerID string) []*Event { + mu.RLock() + defer mu.RUnlock() + + var result []*Event + for _, e := range events { + // Banned users' events are hidden from everyone except themselves. + if e.Type == TypeUser && e.AuthorID != viewerID && auth.IsBanned(e.AuthorID) { + continue + } + result = append(result, e) + if len(result) >= max { + break + } + } + return result +} + +// Since returns events newer than the given time. +func Since(since time.Time) []*Event { + mu.RLock() + defer mu.RUnlock() + + var result []*Event + for _, e := range events { + if !e.CreatedAt.After(since) { + break // events are newest-first, so once we pass since we're done + } + result = append(result, e) + } + return result +} + +// CountSince returns the number of events newer than since. +func CountSince(since time.Time) int { + mu.RLock() + defer mu.RUnlock() + + count := 0 + for _, e := range events { + if !e.CreatedAt.After(since) { + break + } + count++ + } + return count +} + +// DedupeAdjacent removes consecutive identical user+content pairs +// from a slice (display-time, doesn't modify storage). +func DedupeAdjacent(events []*Event) []*Event { + if len(events) <= 1 { + return events + } + result := []*Event{events[0]} + for i := 1; i < len(events); i++ { + prev := result[len(result)-1] + cur := events[i] + if cur.AuthorID == prev.AuthorID && cur.Content == prev.Content { + continue + } + result = append(result, cur) + } + return result +} + +// Clear wipes all events. Admin use only. +func Clear() { + mu.Lock() + events = nil + save() + mu.Unlock() +} + +// ClearByAuthor removes all events from a specific author. +func ClearByAuthor(authorID string) { + mu.Lock() + var filtered []*Event + for _, e := range events { + if e.AuthorID != authorID { + filtered = append(filtered, e) + } + } + events = filtered + save() + mu.Unlock() +} + +// All returns a sorted copy of all events (for admin/export). +func All() []*Event { + mu.RLock() + defer mu.RUnlock() + result := make([]*Event, len(events)) + copy(result, events) + sort.Slice(result, func(i, j int) bool { + return result[i].CreatedAt.After(result[j].CreatedAt) + }) + return result +} + +// MicroMention is the trigger token for AI responses in the stream. +const MicroMention = "@micro" + +// ContainsMicro checks for @micro mention with word boundaries. +func ContainsMicro(text string) bool { + lower := strings.ToLower(text) + idx := 0 + for { + i := strings.Index(lower[idx:], MicroMention) + if i < 0 { + return false + } + pos := idx + i + if pos > 0 { + c := lower[pos-1] + if isWordChar(c) { + idx = pos + len(MicroMention) + continue + } + } + after := pos + len(MicroMention) + if after < len(lower) && isWordChar(lower[after]) { + idx = after + continue + } + return true + } +} + +func isWordChar(c byte) bool { + return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '-' +}