From 7666e97f439f88de8528950192ae8c37878e53f8 Mon Sep 17 00:00:00 2001 From: Akif-jpg Date: Fri, 9 Jan 2026 22:01:28 +0300 Subject: [PATCH] events: add Manager event system for inter-entity signaling - Add new package engine/systems/events with: - Manager, Channel, Subscription, EventData types - Async (Emit) and sync (EmitSync) delivery methods - Thread-safe subscription management using RWMutex - Utilities: CreateChannel, GetChannel, RemoveChannel, Clear - Add unit tests covering basic behavior, concurrency, types and benchmarks (src/engine/systems/events/event_manager_test.go) - Add examples demonstrating entity communication, host integration and channel management (src/engine/systems/events/event_manager_example_test.go). Examples use EmitSync where deterministic output is required. - Integrate event manager into Host: - add eventManager *events.Manager field - initialize in NewHost and expose via EventManager() getter - clear manager during Teardown - All new comments are in English and follow project style. This commit implements a publish-subscribe event system intended for lightweight, in-process signaling between entities and systems. --- src/engine/host.go | 10 + src/engine/systems/events/event_manager.go | 254 ++++++++++ .../events/event_manager_example_test.go | 212 +++++++++ .../systems/events/event_manager_test.go | 443 ++++++++++++++++++ 4 files changed, 919 insertions(+) create mode 100644 src/engine/systems/events/event_manager.go create mode 100644 src/engine/systems/events/event_manager_example_test.go create mode 100644 src/engine/systems/events/event_manager_test.go diff --git a/src/engine/host.go b/src/engine/host.go index e1e598aa5..1e8d7be47 100644 --- a/src/engine/host.go +++ b/src/engine/host.go @@ -131,6 +131,7 @@ type Host struct { LateUpdater Updater assetDatabase assets.Database physics StagePhysics + eventManager *events.Manager OnClose events.Event CloseSignal chan struct{} frameRateLimit *time.Ticker @@ -155,6 +156,7 @@ func NewHost(name string, logStream *logging.LogStream, assetDb assets.Database) LogStream: logStream, entityLookup: make(map[EntityId]*Entity), lighting: lighting.NewLightingInformation(rendering.MaxLocalLights), + eventManager: events.NewManager(), Cameras: hostCameras{ Primary: cameras.NewContainer(cameras.NewStandardCamera(w, h, w, h, matrix.Vec3Backward())), UI: cameras.NewContainer(cameras.NewStandardCameraOrthographic(w, h, w, h, matrix.Vec3{0, 0, 250})), @@ -314,6 +316,13 @@ func (host *Host) Audio() *audio.Audio { return host.audio } +// EventManager returns the event manager for the host. The event manager +// provides a centralized system for publish-subscribe event communication +// between entities and systems. +func (host *Host) EventManager() *events.Manager { + return host.eventManager +} + // ClearEntities will remove all entities from the host. This will remove all // entities from the standard entity pool only. The entities will be destroyed // using the standard destroy method, so they will take not be fully removed @@ -564,6 +573,7 @@ func (host *Host) Teardown() { host.shaderCache.Destroy() host.fontCache.Destroy() host.materialCache.Destroy() + host.eventManager.Clear() host.assetDatabase.Close() host.Window.Destroy() host.threads.Stop() diff --git a/src/engine/systems/events/event_manager.go b/src/engine/systems/events/event_manager.go new file mode 100644 index 000000000..502c6cdcf --- /dev/null +++ b/src/engine/systems/events/event_manager.go @@ -0,0 +1,254 @@ +/******************************************************************************/ +/* event_manager.go */ +/******************************************************************************/ +/* This file is part of */ +/* KAIJU ENGINE */ +/* https://kaijuengine.com/ */ +/******************************************************************************/ +/* MIT License */ +/* */ +/* Copyright (c) 2023-present Kaiju Engine authors (AUTHORS.md). */ +/* Copyright (c) 2015-present Brent Farris. */ +/* */ +/* May all those that this source may reach be blessed by the LORD and find */ +/* peace and joy in life. */ +/* Everyone who drinks of this water will be thirsty again; but whoever */ +/* drinks of the water that I will give him shall never thirst; John 4:13-14 */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining a */ +/* copy of this software and associated documentation files (the "Software"), */ +/* to deal in the Software without restriction, including without limitation */ +/* the rights to use, copy, modify, merge, publish, distribute, sublicense, */ +/* and/or sell copies of the Software, and to permit persons to whom the */ +/* Software is furnished to do so, subject to the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be included in */ +/* all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS */ +/* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT */ +/* OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE */ +/* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/******************************************************************************/ + +package events + +import ( + "fmt" + "sync" +) + +// EventData represents data carried with an event. It provides a flexible +// key-value structure for passing arbitrary data between event publishers +// and subscribers. +type EventData map[string]interface{} + +// CallbackFunc is the signature for event callback functions +type CallbackFunc func(EventData) + +// Subscription represents a single subscription to an event channel. +// It holds the subscription ID, channel name, and the callback function +// to be executed when events are emitted on the channel. +type Subscription struct { + id string + channel string + callback CallbackFunc +} + +// ID returns the unique identifier for this subscription +func (s *Subscription) ID() string { return s.id } + +// Channel returns the name of the channel this subscription is attached to +func (s *Subscription) Channel() string { return s.channel } + +// Channel represents an event channel that manages multiple subscriptions. +// Events emitted to a channel are delivered to all active subscribers. +// Channel operations are thread-safe through mutex locking. +type Channel struct { + name string + subscriptions map[string]*Subscription + mutex sync.RWMutex +} + +// newChannel creates a new event channel with the given name +func newChannel(name string) *Channel { + return &Channel{ + name: name, + subscriptions: make(map[string]*Subscription), + } +} + +// Name returns the name of this channel +func (c *Channel) Name() string { return c.name } + +// SubscriptionCount returns the number of active subscriptions on this channel +func (c *Channel) SubscriptionCount() int { + c.mutex.RLock() + defer c.mutex.RUnlock() + return len(c.subscriptions) +} + +// Emit sends an event to all subscribers on this channel asynchronously. +// Each callback is executed in its own goroutine to prevent blocking. +func (c *Channel) Emit(data EventData) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + for _, sub := range c.subscriptions { + // Execute each callback in a separate goroutine (non-blocking) + go sub.callback(data) + } +} + +// EmitSync sends an event to all subscribers on this channel synchronously. +// Callbacks are executed sequentially in the calling goroutine. +func (c *Channel) EmitSync(data EventData) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + for _, sub := range c.subscriptions { + sub.callback(data) + } +} + +// addSubscription adds a new subscription to this channel +func (c *Channel) addSubscription(sub *Subscription) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.subscriptions[sub.id] = sub +} + +// removeSubscription removes a subscription from this channel by ID +func (c *Channel) removeSubscription(subID string) { + c.mutex.Lock() + defer c.mutex.Unlock() + delete(c.subscriptions, subID) +} + +// Manager manages all event channels and subscriptions in the system. +// It provides a centralized way to create channels, subscribe to events, +// and emit events across different parts of the application. +type Manager struct { + channels map[string]*Channel + mutex sync.RWMutex + subscriptionID int +} + +// NewManager creates a new event manager instance +func NewManager() *Manager { + return &Manager{ + channels: make(map[string]*Channel), + } +} + +// CreateChannel creates a new event channel with the given name. +// If a channel with this name already exists, it returns the existing channel. +func (em *Manager) CreateChannel(name string) *Channel { + em.mutex.Lock() + defer em.mutex.Unlock() + + if ch, exists := em.channels[name]; exists { + return ch + } + + ch := newChannel(name) + em.channels[name] = ch + return ch +} + +// Subscribe subscribes a callback function to an event channel. +// If the channel does not exist, it will be created automatically. +// Returns a Subscription object that can be used to unsubscribe later. +// +// Example usage: +// +// sub := host.EventManager().Subscribe("collision", func(data EventData) { +// // Handle collision event +// }) +func (em *Manager) Subscribe(channelName string, callback CallbackFunc) *Subscription { + em.mutex.Lock() + em.subscriptionID++ + subID := fmt.Sprintf("sub_%d", em.subscriptionID) + em.mutex.Unlock() + + // Create channel if it doesn't exist + ch := em.CreateChannel(channelName) + + sub := &Subscription{ + id: subID, + channel: channelName, + callback: callback, + } + + ch.addSubscription(sub) + return sub +} + +// Unsubscribe cancels a subscription, removing it from its channel +func (em *Manager) Unsubscribe(sub *Subscription) { + em.mutex.RLock() + ch, exists := em.channels[sub.channel] + em.mutex.RUnlock() + + if exists { + ch.removeSubscription(sub.id) + } +} + +// Emit sends an event to all subscribers of the specified channel asynchronously. +// If the channel does not exist, this is a no-op. +func (em *Manager) Emit(channelName string, data EventData) { + em.mutex.RLock() + ch, exists := em.channels[channelName] + em.mutex.RUnlock() + + if exists { + ch.Emit(data) + } +} + +// EmitSync sends an event to all subscribers of the specified channel synchronously. +// If the channel does not exist, this is a no-op. +func (em *Manager) EmitSync(channelName string, data EventData) { + em.mutex.RLock() + ch, exists := em.channels[channelName] + em.mutex.RUnlock() + + if exists { + ch.EmitSync(data) + } +} + +// GetChannel retrieves an existing channel by name. +// Returns the channel and true if it exists, nil and false otherwise. +func (em *Manager) GetChannel(name string) (*Channel, bool) { + em.mutex.RLock() + defer em.mutex.RUnlock() + ch, exists := em.channels[name] + return ch, exists +} + +// RemoveChannel deletes a channel and all its subscriptions +func (em *Manager) RemoveChannel(name string) { + em.mutex.Lock() + defer em.mutex.Unlock() + delete(em.channels, name) +} + +// ChannelCount returns the number of active channels +func (em *Manager) ChannelCount() int { + em.mutex.RLock() + defer em.mutex.RUnlock() + return len(em.channels) +} + +// Clear removes all channels and subscriptions from the manager +func (em *Manager) Clear() { + em.mutex.Lock() + defer em.mutex.Unlock() + em.channels = make(map[string]*Channel) + em.subscriptionID = 0 +} diff --git a/src/engine/systems/events/event_manager_example_test.go b/src/engine/systems/events/event_manager_example_test.go new file mode 100644 index 000000000..6c38b5439 --- /dev/null +++ b/src/engine/systems/events/event_manager_example_test.go @@ -0,0 +1,212 @@ +/******************************************************************************/ +/* event_manager_example_test.go */ +/******************************************************************************/ +/* This file is part of */ +/* KAIJU ENGINE */ +/* https://kaijuengine.com/ */ +/******************************************************************************/ +/* MIT License */ +/* */ +/* Copyright (c) 2023-present Kaiju Engine authors (AUTHORS.md). */ +/* Copyright (c) 2015-present Brent Farris. */ +/* */ +/* May all those that this source may reach be blessed by the LORD and find */ +/* peace and joy in life. */ +/* Everyone who drinks of this water will be thirsty again; but whoever */ +/* drinks of the water that I will give him shall never thirst; John 4:13-14 */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining a */ +/* copy of this software and associated documentation files (the "Software"), */ +/* to deal in the Software without restriction, including without limitation */ +/* the rights to use, copy, modify, merge, publish, distribute, sublicense, */ +/* and/or sell copies of the Software, and to permit persons to whom the */ +/* Software is furnished to do so, subject to the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be included in */ +/* all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS */ +/* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT */ +/* OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE */ +/* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/******************************************************************************/ + +package events_test + +import ( + "fmt" + "kaiju/engine/systems/events" +) + +// Example demonstrates basic usage of the event manager system +func Example_basicUsage() { + // Create a new event manager + em := events.NewManager() + + // Subscribe to an event channel + sub := em.Subscribe("player_damaged", func(data events.EventData) { + playerID := data["playerID"].(string) + damage := data["damage"].(int) + fmt.Printf("Player %s took %d damage\n", playerID, damage) + }) + + // Emit an event synchronously + em.EmitSync("player_damaged", events.EventData{ + "playerID": "player1", + "damage": 25, + }) + + // Clean up subscription + em.Unsubscribe(sub) + + // Output: + // Player player1 took 25 damage +} + +// Example showing how to use the event manager with game entities +func Example_entityCommunication() { + // This example demonstrates how entities can communicate using events + em := events.NewManager() + + // Entity 1: Player subscribes to collision events + playerID := "player1" + playerHealth := 100 + + playerSub := em.Subscribe("collision", func(data events.EventData) { + if targetID, ok := data["targetID"].(string); ok && targetID == playerID { + if damage, ok := data["damage"].(int); ok { + playerHealth -= damage + fmt.Printf("Player health: %d\n", playerHealth) + + // Emit death event if health depletes (synchronous to ensure deterministic output) + if playerHealth <= 0 { + em.EmitSync("entity_death", events.EventData{ + "entityID": playerID, + }) + } + } + } + }) + + // UI System subscribes to death events + uiSub := em.Subscribe("entity_death", func(data events.EventData) { + entityID := data["entityID"].(string) + fmt.Printf("Entity died: %s\n", entityID) + }) + + // Simulate collision events + em.EmitSync("collision", events.EventData{ + "targetID": playerID, + "damage": 30, + }) + + em.EmitSync("collision", events.EventData{ + "targetID": playerID, + "damage": 80, + }) + + // Clean up + em.Unsubscribe(playerSub) + em.Unsubscribe(uiSub) + + // Output: + // Player health: 70 + // Player health: -10 + // Entity died: player1 +} + +// Example showing multiple subscribers to the same event +func Example_multipleSubscribers() { + em := events.NewManager() + + // Achievement system subscribes to score events + em.Subscribe("score_changed", func(data events.EventData) { + score := data["score"].(int) + if score >= 1000 { + fmt.Println("Achievement unlocked!") + } + }) + + // UI system subscribes to score events + em.Subscribe("score_changed", func(data events.EventData) { + score := data["score"].(int) + fmt.Printf("Score: %d\n", score) + }) + + // Analytics system subscribes to score events + em.Subscribe("score_changed", func(data events.EventData) { + score := data["score"].(int) + fmt.Printf("Logged score: %d\n", score) + }) + + // Emit score change - all three subscribers will receive it + em.EmitSync("score_changed", events.EventData{ + "score": 1500, + }) + + // Output: + // Achievement unlocked! + // Score: 1500 + // Logged score: 1500 +} + +// Example showing channel management +func Example_channelManagement() { + em := events.NewManager() + + // Create channels explicitly + gameChannel := em.CreateChannel("game_events") + uiChannel := em.CreateChannel("ui_events") + + fmt.Printf("Game channel: %s\n", gameChannel.Name()) + fmt.Printf("UI channel: %s\n", uiChannel.Name()) + fmt.Printf("Total channels: %d\n", em.ChannelCount()) + + // Subscribe to channels + em.Subscribe("game_events", func(data events.EventData) { + fmt.Println("Game event received") + }) + + // Get channel information + if ch, exists := em.GetChannel("game_events"); exists { + fmt.Printf("Subscribers on game_events: %d\n", ch.SubscriptionCount()) + } + + // Remove a channel + em.RemoveChannel("ui_events") + fmt.Printf("Channels after removal: %d\n", em.ChannelCount()) + + // Output: + // Game channel: game_events + // UI channel: ui_events + // Total channels: 2 + // Subscribers on game_events: 1 + // Channels after removal: 1 +} + +// Example showing how to integrate with Host +func Example_hostIntegration() { + // This is a conceptual example showing how to use EventManager with Host + // In actual code, you would access it through host.EventManager() + + em := events.NewManager() + + // Example: Collision detection system emits events + em.Subscribe("collision_detected", func(data events.EventData) { + entityA := data["entityA"].(string) + entityB := data["entityB"].(string) + fmt.Printf("Collision between %s and %s\n", entityA, entityB) + }) + + // Physics system can emit collision events (synchronous for deterministic example output) + em.EmitSync("collision_detected", events.EventData{ + "entityA": "player", + "entityB": "wall", + }) + + // Output: + // Collision between player and wall +} diff --git a/src/engine/systems/events/event_manager_test.go b/src/engine/systems/events/event_manager_test.go new file mode 100644 index 000000000..4fc2f38e9 --- /dev/null +++ b/src/engine/systems/events/event_manager_test.go @@ -0,0 +1,443 @@ +/******************************************************************************/ +/* event_manager_test.go */ +/******************************************************************************/ +/* This file is part of */ +/* KAIJU ENGINE */ +/* https://kaijuengine.com/ */ +/******************************************************************************/ +/* MIT License */ +/* */ +/* Copyright (c) 2023-present Kaiju Engine authors (AUTHORS.md). */ +/* Copyright (c) 2015-present Brent Farris. */ +/* */ +/* May all those that this source may reach be blessed by the LORD and find */ +/* peace and joy in life. */ +/* Everyone who drinks of this water will be thirsty again; but whoever */ +/* drinks of the water that I will give him shall never thirst; John 4:13-14 */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining a */ +/* copy of this software and associated documentation files (the "Software"), */ +/* to deal in the Software without restriction, including without limitation */ +/* the rights to use, copy, modify, merge, publish, distribute, sublicense, */ +/* and/or sell copies of the Software, and to permit persons to whom the */ +/* Software is furnished to do so, subject to the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be included in */ +/* all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS */ +/* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT */ +/* OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE */ +/* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/******************************************************************************/ + +package events + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestNewManager(t *testing.T) { + em := NewManager() + if em == nil { + t.Fatal("NewManager should return a non-nil manager") + } + if em.ChannelCount() != 0 { + t.Errorf("New manager should have 0 channels, got %d", em.ChannelCount()) + } +} + +func TestCreateChannel(t *testing.T) { + em := NewManager() + ch := em.CreateChannel("test-channel") + + if ch == nil { + t.Fatal("CreateChannel should return a non-nil channel") + } + if ch.Name() != "test-channel" { + t.Errorf("Channel name should be 'test-channel', got '%s'", ch.Name()) + } + if em.ChannelCount() != 1 { + t.Errorf("Manager should have 1 channel, got %d", em.ChannelCount()) + } + + // Creating the same channel again should return the existing one + ch2 := em.CreateChannel("test-channel") + if ch != ch2 { + t.Error("CreateChannel should return the same channel instance for the same name") + } + if em.ChannelCount() != 1 { + t.Errorf("Manager should still have 1 channel, got %d", em.ChannelCount()) + } +} + +func TestSubscribe(t *testing.T) { + em := NewManager() + called := false + + sub := em.Subscribe("test-channel", func(data EventData) { + called = true + }) + + if sub == nil { + t.Fatal("Subscribe should return a non-nil subscription") + } + if sub.Channel() != "test-channel" { + t.Errorf("Subscription channel should be 'test-channel', got '%s'", sub.Channel()) + } + if sub.ID() == "" { + t.Error("Subscription ID should not be empty") + } + + ch, exists := em.GetChannel("test-channel") + if !exists { + t.Fatal("Channel should exist after subscription") + } + if ch.SubscriptionCount() != 1 { + t.Errorf("Channel should have 1 subscription, got %d", ch.SubscriptionCount()) + } + + // Verify callback wasn't called yet + if called { + t.Error("Callback should not be called until event is emitted") + } +} + +func TestEmitSync(t *testing.T) { + em := NewManager() + callCount := 0 + + em.Subscribe("test-channel", func(data EventData) { + callCount++ + if val, ok := data["test"]; !ok || val != "value" { + t.Error("Event data should contain 'test' key with value 'value'") + } + }) + + em.EmitSync("test-channel", EventData{"test": "value"}) + + if callCount != 1 { + t.Errorf("Callback should be called exactly once, got %d", callCount) + } +} + +func TestEmitAsync(t *testing.T) { + em := NewManager() + var callCount atomic.Int32 + var wg sync.WaitGroup + + wg.Add(1) + em.Subscribe("test-channel", func(data EventData) { + callCount.Add(1) + wg.Done() + }) + + em.Emit("test-channel", EventData{"test": "async"}) + + // Wait for async callback to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + if callCount.Load() != 1 { + t.Errorf("Callback should be called exactly once, got %d", callCount.Load()) + } + case <-time.After(time.Second): + t.Fatal("Async callback did not complete within timeout") + } +} + +func TestMultipleSubscribers(t *testing.T) { + em := NewManager() + var callCount atomic.Int32 + var wg sync.WaitGroup + + // Subscribe 5 callbacks to the same channel + for i := 0; i < 5; i++ { + wg.Add(1) + em.Subscribe("test-channel", func(data EventData) { + callCount.Add(1) + wg.Done() + }) + } + + em.Emit("test-channel", EventData{"test": "multiple"}) + + // Wait for all async callbacks to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + if callCount.Load() != 5 { + t.Errorf("All 5 callbacks should be called, got %d", callCount.Load()) + } + case <-time.After(time.Second): + t.Fatal("Async callbacks did not complete within timeout") + } +} + +func TestUnsubscribe(t *testing.T) { + em := NewManager() + callCount := 0 + + sub := em.Subscribe("test-channel", func(data EventData) { + callCount++ + }) + + // First emit - callback should be called + em.EmitSync("test-channel", EventData{"test": "first"}) + if callCount != 1 { + t.Errorf("Callback should be called once, got %d", callCount) + } + + // Unsubscribe + em.Unsubscribe(sub) + + ch, _ := em.GetChannel("test-channel") + if ch.SubscriptionCount() != 0 { + t.Errorf("Channel should have 0 subscriptions after unsubscribe, got %d", ch.SubscriptionCount()) + } + + // Second emit - callback should not be called + em.EmitSync("test-channel", EventData{"test": "second"}) + if callCount != 1 { + t.Errorf("Callback should still be called only once after unsubscribe, got %d", callCount) + } +} + +func TestGetChannel(t *testing.T) { + em := NewManager() + + // Non-existent channel + _, exists := em.GetChannel("non-existent") + if exists { + t.Error("GetChannel should return false for non-existent channel") + } + + // Create a channel + em.CreateChannel("existing") + ch, exists := em.GetChannel("existing") + if !exists { + t.Error("GetChannel should return true for existing channel") + } + if ch.Name() != "existing" { + t.Errorf("Channel name should be 'existing', got '%s'", ch.Name()) + } +} + +func TestRemoveChannel(t *testing.T) { + em := NewManager() + em.CreateChannel("test-channel") + + if em.ChannelCount() != 1 { + t.Errorf("Manager should have 1 channel, got %d", em.ChannelCount()) + } + + em.RemoveChannel("test-channel") + + if em.ChannelCount() != 0 { + t.Errorf("Manager should have 0 channels after removal, got %d", em.ChannelCount()) + } + + _, exists := em.GetChannel("test-channel") + if exists { + t.Error("Channel should not exist after removal") + } +} + +func TestClear(t *testing.T) { + em := NewManager() + + // Create multiple channels with subscriptions + em.Subscribe("channel1", func(data EventData) {}) + em.Subscribe("channel2", func(data EventData) {}) + em.Subscribe("channel3", func(data EventData) {}) + + if em.ChannelCount() != 3 { + t.Errorf("Manager should have 3 channels, got %d", em.ChannelCount()) + } + + em.Clear() + + if em.ChannelCount() != 0 { + t.Errorf("Manager should have 0 channels after Clear, got %d", em.ChannelCount()) + } +} + +func TestEmitToNonExistentChannel(t *testing.T) { + em := NewManager() + + // Should not panic or error + em.Emit("non-existent", EventData{"test": "value"}) + em.EmitSync("non-existent", EventData{"test": "value"}) +} + +func TestConcurrentSubscribe(t *testing.T) { + em := NewManager() + var wg sync.WaitGroup + subscriberCount := 100 + + // Subscribe from multiple goroutines concurrently + for i := 0; i < subscriberCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + em.Subscribe("concurrent-channel", func(data EventData) {}) + }() + } + + wg.Wait() + + ch, exists := em.GetChannel("concurrent-channel") + if !exists { + t.Fatal("Channel should exist after concurrent subscriptions") + } + + if ch.SubscriptionCount() != subscriberCount { + t.Errorf("Channel should have %d subscriptions, got %d", subscriberCount, ch.SubscriptionCount()) + } +} + +func TestConcurrentEmit(t *testing.T) { + em := NewManager() + var callCount atomic.Int32 + var wg sync.WaitGroup + + emitCount := 100 + wg.Add(emitCount) + + em.Subscribe("concurrent-emit", func(data EventData) { + callCount.Add(1) + wg.Done() + }) + + // Emit from multiple goroutines concurrently + for i := 0; i < emitCount; i++ { + go func(idx int) { + em.Emit("concurrent-emit", EventData{"index": idx}) + }(i) + } + + // Wait for all callbacks to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + if callCount.Load() != int32(emitCount) { + t.Errorf("Callback should be called %d times, got %d", emitCount, callCount.Load()) + } + case <-time.After(5 * time.Second): + t.Fatal("Concurrent emit callbacks did not complete within timeout") + } +} + +func TestEventDataTypes(t *testing.T) { + em := NewManager() + + em.Subscribe("test-types", func(data EventData) { + // Test string + if val, ok := data["string"].(string); !ok || val != "test" { + t.Error("String value should be retrievable") + } + + // Test int + if val, ok := data["int"].(int); !ok || val != 42 { + t.Error("Int value should be retrievable") + } + + // Test float + if val, ok := data["float"].(float64); !ok || val != 3.14 { + t.Error("Float value should be retrievable") + } + + // Test bool + if val, ok := data["bool"].(bool); !ok || val != true { + t.Error("Bool value should be retrievable") + } + + // Test nested map + if nested, ok := data["nested"].(map[string]string); !ok { + t.Error("Nested map should be retrievable") + } else if nested["key"] != "value" { + t.Error("Nested map value should be correct") + } + }) + + em.EmitSync("test-types", EventData{ + "string": "test", + "int": 42, + "float": 3.14, + "bool": true, + "nested": map[string]string{"key": "value"}, + }) +} + +func TestUniqueSubscriptionIDs(t *testing.T) { + em := NewManager() + ids := make(map[string]bool) + + // Create multiple subscriptions + for i := 0; i < 100; i++ { + sub := em.Subscribe("test-channel", func(data EventData) {}) + if ids[sub.ID()] { + t.Errorf("Duplicate subscription ID found: %s", sub.ID()) + } + ids[sub.ID()] = true + } +} + +func BenchmarkEmitSync(b *testing.B) { + em := NewManager() + em.Subscribe("bench-channel", func(data EventData) { + // Minimal work in callback + }) + + data := EventData{"test": "benchmark"} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + em.EmitSync("bench-channel", data) + } +} + +func BenchmarkEmitAsync(b *testing.B) { + em := NewManager() + em.Subscribe("bench-channel", func(data EventData) { + // Minimal work in callback + }) + + data := EventData{"test": "benchmark"} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + em.Emit("bench-channel", data) + } +} + +func BenchmarkSubscribe(b *testing.B) { + em := NewManager() + callback := func(data EventData) {} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + em.Subscribe("bench-channel", callback) + } +}