Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 106 additions & 18 deletions home/home.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"mu/internal/event"
"mu/news"
"mu/social"
"mu/stream"
"mu/markets"
"mu/reminder"
"mu/user"
"mu/video"
"mu/weather"
)
Expand Down Expand Up @@ -316,28 +316,38 @@ var e='';var d=(w.desc||'').toLowerCase();for(var k in emoji){if(d.indexOf(k)>=0
document.getElementById('home-date-weather').textContent=w.temp+'°C '+(e||'');
})()</script>`)

// Status card content (will be prepended to left column).
// Built by user.RenderStatusStream so the fragment endpoint and the
// home card share one code path. The #home-status-wrap element is
// polled every ~10 seconds for near-real-time updates, and the
// compose form submits via fetch so the stream refreshes in place.
// View toggle — Console (stream) or Cards (dashboard)
var viewerID string
if sess, _ := auth.TrySession(r); sess != nil {
viewerID = sess.Account
}
statusInner := user.RenderStatusStream(viewerID)
statusCardBody := `<div id="home-status-wrap">` + statusInner + `</div>` + statusCardScript
statusCardHTML := fmt.Sprintf(
app.CardTemplate,
"status", "status", "Status",
statusCardBody,
)

// Feed section — existing home cards below the agent
var leftHTML []string
if statusCardHTML != "" {
leftHTML = append(leftHTML, statusCardHTML)
b.WriteString(`<div id="home-tabs" style="display:flex;gap:6px;margin-bottom:14px">`)
for _, t := range []struct{ id, label string }{{"console", "Console"}, {"cards", "Cards"}} {
b.WriteString(fmt.Sprintf(`<a href="#" data-tab="%s" class="home-tab" style="padding:4px 14px;border-radius:14px;font-size:13px;text-decoration:none;color:#555">%s</a>`, t.id, t.label))
}
b.WriteString(`</div>`)

// ── Console view (stream) ──
consoleEvents := stream.Recent(stream.StreamLimit, viewerID)
consoleEvents = stream.DedupeAdjacent(consoleEvents)
b.WriteString(`<div id="home-console" style="display:none">`)
// Compose box (logged-in only).
if viewerID != "" {
b.WriteString(fmt.Sprintf(`<form id="stream-form" method="POST" action="/stream" style="margin-bottom:12px;display:flex;gap:8px">
<input type="text" name="content" id="stream-input" placeholder="Ask @micro anything or post an update..." maxlength="%d" autocomplete="off" style="flex:1;padding:8px 12px;border:1px solid #ddd;border-radius:6px;font-size:14px">
<button type="submit" style="padding:8px 16px;background:#000;color:#fff;border:none;border-radius:6px;cursor:pointer;font-size:14px">Send</button>
</form>`, stream.MaxContentLength))
}
b.WriteString(`<div id="stream-events" style="max-height:min(70vh,600px);overflow-y:auto;-webkit-overflow-scrolling:touch">`)
b.WriteString(stream.RenderEventList(consoleEvents, viewerID))
b.WriteString(`</div>`)
b.WriteString(consoleScript)
b.WriteString(`</div>`)

// ── Cards view (dashboard) ──
b.WriteString(`<div id="home-cards">`)

var leftHTML []string
var rightHTML []string

tooltips := map[string]string{
Expand Down Expand Up @@ -375,6 +385,26 @@ document.getElementById('home-date-weather').textContent=w.temp+'°C '+(e||'');
strings.Join(rightHTML, "\n")))
}

b.WriteString(`</div>`) // close #home-cards

// Tab toggle JS — persists choice in localStorage.
b.WriteString(`<script>
(function(){
var tabs=document.querySelectorAll('.home-tab');
var console=document.getElementById('home-console');
var cards=document.getElementById('home-cards');
var key='mu_home_view';
function show(id){
console.style.display=id==='console'?'block':'none';
cards.style.display=id==='cards'?'block':'none';
tabs.forEach(function(t){t.style.background=t.dataset.tab===id?'#000':'';t.style.color=t.dataset.tab===id?'#fff':'#555'});
try{localStorage.setItem(key,id)}catch(e){}
}
tabs.forEach(function(t){t.addEventListener('click',function(e){e.preventDefault();show(t.dataset.tab)})});
show(localStorage.getItem(key)||'console');
})();
</script>`)

// Auto-refresh: poll every 2 minutes, update card content in-place
displayMode := r.URL.Query().Get("mode") == "display"
refreshInterval := 120000 // 2 minutes
Expand Down Expand Up @@ -443,6 +473,64 @@ func htmlEsc(s string) string {
//
// The script is defensive: if anything throws, the form still falls
// back to its native POST + redirect behaviour.
// consoleScript handles polling + form submit for the console stream
// embedded on the home page.
const consoleScript = `<script>
(function(){
var eventsEl = document.getElementById('stream-events');
var formEl = document.getElementById('stream-form');
if (!eventsEl) return;
var pollInterval = 10000;
var inflight = false;

function csrfToken() {
var m = document.cookie.match(/(?:^|; )csrf_token=([^;]+)/);
return m ? decodeURIComponent(m[1]) : '';
}

function refresh(clear) {
if (inflight) return;
inflight = true;
fetch('/stream/fragment', { credentials: 'same-origin', cache: 'no-store' })
.then(function(r){ return r.ok ? r.text() : null; })
.then(function(html){
if (html == null) return;
var scroll = eventsEl.scrollTop;
eventsEl.innerHTML = html;
if (!clear) eventsEl.scrollTop = scroll;
})
.catch(function(){})
.then(function(){ inflight = false; });
}

if (formEl) {
formEl.addEventListener('submit', function(ev){
ev.preventDefault();
var input = document.getElementById('stream-input');
if (!input) return;
var text = input.value.trim();
if (!text) return;
var body = new URLSearchParams();
body.set('content', text);
var headers = { 'Content-Type': 'application/x-www-form-urlencoded' };
var tok = csrfToken();
if (tok) headers['X-CSRF-Token'] = tok;
input.value = '';
fetch('/stream', {
method: 'POST',
credentials: 'same-origin',
headers: headers,
body: body.toString()
}).then(function(){ refresh(true); })
.catch(function(){ formEl.submit(); });
});
}

setInterval(function(){ if (!document.hidden) refresh(); }, pollInterval);
document.addEventListener('visibilitychange', function(){ if (!document.hidden) refresh(); });
})();
</script>`

const statusCardScript = `<script>
(function(){
var wrap = document.getElementById('home-status-wrap');
Expand Down
17 changes: 17 additions & 0 deletions internal/api/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,23 @@ var tools = []Tool{
{Name: "cost", Type: "number", Description: "Budget in credits — max spend for agent (required for tasks)", Required: false},
},
},
// Stream (console)
{
Name: "stream",
Description: "Read the platform event stream — user messages, agent responses, system events (markets, news, reminders)",
Method: "GET",
Path: "/stream",
},
{
Name: "stream_post",
Description: "Post a message to the stream. Mention @micro to get an AI response. Costs 1 credit.",
Method: "POST",
Path: "/stream",
WalletOp: "social_post",
Params: []ToolParam{
{Name: "content", Type: "string", Description: "Message text (max 1024 chars). Use @micro to invoke the AI agent.", Required: true},
},
},
// Content controls
{
Name: "flag",
Expand Down
44 changes: 37 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"mu/places"
"mu/search"
"mu/social"
"mu/stream"
"mu/user"
"mu/video"
"mu/wallet"
Expand Down Expand Up @@ -173,6 +174,9 @@ func main() {
// load user presence tracking
user.Load()

// Load the stream (platform event timeline).
stream.Load()

// Wire user → blog callback (avoids direct import between building blocks)
user.GetUserPosts = func(authorName string) []user.UserPost {
posts := blog.GetPostsByAuthor(authorName)
Expand Down Expand Up @@ -224,6 +228,29 @@ func main() {
app.Log("status", "failed to post @micro reply: %v", err)
}
}
// Wire stream @micro replies — same agent, posts into the stream
// instead of the status profile.
stream.AIReplyHook = func(askerID, prompt string) {
if auth.IsBanned(askerID) {
return
}
answer, err := agent.Query(askerID, prompt)
if err != nil {
app.Log("stream", "@micro agent error for %s: %v", askerID, err)
stream.PostAgent("I couldn't answer that one — try again in a moment.")
return
}
answer = strings.TrimSpace(answer)
if answer == "" {
return
}
if !user.ModerateAIResponse(askerID, answer) {
app.Log("stream", "AI response for %s blocked by moderation", askerID)
return
}
stream.PostAgent(answer)
}

user.GetUserApps = func(authorID string) []user.UserApp {
appList := apps.GetAppsByAuthor(authorID)
result := make([]user.UserApp, len(appList))
Expand Down Expand Up @@ -792,6 +819,10 @@ func main() {
http.HandleFunc("/user/status", user.StatusHandler)
http.HandleFunc("/user/status/stream", user.StatusStreamHandler)

// Stream (console) routes
http.HandleFunc("/stream", stream.Handler)
http.HandleFunc("/stream/fragment", stream.FragmentHandler)

// redirect /reminder to reminder.dev
http.HandleFunc("/reminder", reminder.Handler)

Expand Down Expand Up @@ -1212,18 +1243,14 @@ func updatesHandler(w http.ResponseWriter, r *http.Request) {
result["mail"] = 0
}

// Status — new entries since last poll.
if since.IsZero() {
result["status"] = 0
} else {
result["status"] = user.StatusCountSince(since, viewerID)
}

// Social — new messages since last poll.
if since.IsZero() {
result["social"] = 0
result["stream"] = 0
} else {
result["status"] = user.StatusCountSince(since, viewerID)
result["social"] = social.CountSince(since)
result["stream"] = stream.CountSince(since)
}

w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -1262,6 +1289,9 @@ func chargedWriteOp(r *http.Request) string {
// Work
case path == "/work/post":
return wallet.OpSocialPost
// Stream (console)
case path == "/stream":
return wallet.OpSocialPost
}
return ""
}
Expand Down
54 changes: 54 additions & 0 deletions markets/markets.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"sync"
"time"

"math"

"mu/internal/app"
"mu/internal/data"
"mu/stream"

"github.com/piquette/finance-go/future"
"github.com/piquette/finance-go/quote"
Expand Down Expand Up @@ -102,6 +105,12 @@ func refreshMarkets() {
for {
prices, priceData := fetchPrices()
if prices != nil {
// Detect significant price moves BEFORE overwriting the cache.
marketsMutex.RLock()
oldPrices := cachedPrices
marketsMutex.RUnlock()
surfaceMarketMoves(oldPrices, prices, priceData)

marketsMutex.Lock()
cachedPrices = prices
cachedPriceData = priceData
Expand Down Expand Up @@ -294,6 +303,51 @@ func generateMarketsCardHTML(prices map[string]float64) string {
return sb.String()
}

// surfaceMarketMoves posts to the stream when a tracked asset moves
// more than the threshold since the last refresh. Only fires for a
// small set of key tickers to avoid flooding the stream.
func surfaceMarketMoves(oldPrices, newPrices map[string]float64, pd map[string]PriceData) {
if len(oldPrices) == 0 {
return // first fetch, no comparison
}
// Only surface moves for these tickers.
tracked := []string{"BTC", "ETH", "SOL", "GOLD", "OIL"}
threshold := 0.02 // 2%

for _, ticker := range tracked {
oldP, okOld := oldPrices[ticker]
newP, okNew := newPrices[ticker]
if !okOld || !okNew || oldP == 0 {
continue
}
pctChange := (newP - oldP) / oldP
if math.Abs(pctChange) < threshold {
continue
}
direction := "+"
if pctChange < 0 {
direction = ""
}
content := fmt.Sprintf("%s %s%.1f%% ($%s)", ticker, direction, pctChange*100, formatPrice(newP))
change24h := float64(0)
if d, ok := pd[ticker]; ok {
change24h = d.Change24h
}
stream.Publish(&stream.Event{
Type: stream.TypeMarket,
AuthorID: app.SystemUserID,
Content: content,
Metadata: map[string]any{
"ticker": ticker,
"price": newP,
"change_pct": pctChange * 100,
"change_24h": change24h,
},
})
app.Log("markets", "Stream: %s", content)
}
}

func indexMarketPrices(prices map[string]float64) {
app.Log("markets", "Indexing %d prices", len(prices))
timestamp := time.Now().Format(time.RFC3339)
Expand Down
9 changes: 9 additions & 0 deletions reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"mu/internal/app"
"mu/internal/data"
"mu/internal/event"
"mu/stream"
)

var (
Expand Down Expand Up @@ -82,6 +83,14 @@ func fetchReminder() {
reminderMutex.Unlock()
event.Publish(event.Event{Type: "reminder_updated"})

// Post the verse to the platform stream so it appears in the console.
stream.Publish(&stream.Event{
Type: stream.TypeReminder,
AuthorID: app.SystemUserID,
Content: strings.TrimSpace(verseText),
Metadata: map[string]any{"url": moreURL},
})

// Extract message and updated for indexing
message := ""
if m, ok := val["message"]; ok {
Expand Down
9 changes: 9 additions & 0 deletions social/social.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"mu/internal/event"
"mu/internal/flag"
"mu/news"
"mu/stream"
"mu/wallet"
)

Expand Down Expand Up @@ -221,6 +222,14 @@ func SurfaceBreaking(category, title, link string) {
Content: content,
PostedAt: time.Now(),
})

// Also post to the platform stream so it appears in the console.
stream.Publish(&stream.Event{
Type: stream.TypeNews,
AuthorID: app.SystemUserID,
Content: title,
Metadata: map[string]any{"url": link, "category": category},
})
}

func save() error {
Expand Down
Loading
Loading