diff --git a/amqtt/broker.py b/amqtt/broker.py index 48cd2e41..91f0b334 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -755,7 +755,7 @@ async def _handle_message_delivery( f"[MQTT-3.3.2-2] - {client_session.client_id} invalid TOPIC sent in PUBLISH message, closing connection", ) return False - if app_message.topic.startswith("$"): + if app_message.topic.startswith("$") and not self.config.get("allow_dollar_topics", False): self.logger.warning( f"[MQTT-4.7.2-1] - {client_session.client_id} cannot use a topic with a leading $ character." ) @@ -1115,7 +1115,7 @@ async def _publish_retained_messages_for_subscription(self, subscription: tuple[ ) def _matches(self, topic: str, a_filter: str) -> bool: - if topic.startswith("$") and (a_filter.startswith(("+", "#"))): + if topic.startswith("$") and (a_filter.startswith(("+", "#"))) and not self.config.get("allow_dollar_topics", False): self.logger.debug("[MQTT-4.7.2-1] - ignoring broadcasting $ topic to subscriptions starting with + or #") return False diff --git a/amqtt/contexts.py b/amqtt/contexts.py index 4da2fa14..8d5143bc 100644 --- a/amqtt/contexts.py +++ b/amqtt/contexts.py @@ -180,6 +180,8 @@ class BrokerConfig(Dictable): """*Deprecated field used to config EntryPoint-loaded plugins. See [`TopicTabooPlugin`](../plugins/packaged_plugins.md#taboo-topic-plugin) and [`TopicACLPlugin`](../plugins/packaged_plugins.md#acl-topic-plugin) for recommended configuration method.*""" + allow_dollar_topics: bool | None = False + """Controls whether the broker accepts client-sent topics starting with a dollar sign (`$`).""" plugins: dict[str, Any] | list[str | dict[str, Any]] | None = field(default_factory=default_broker_plugins) """The dictionary has a key of the dotted-module path of a class derived from `BasePlugin`, `BaseAuthPlugin` or `BaseTopicPlugin`; the value is a dictionary of configuration options for that plugin. See diff --git a/tests/test_dollar_topics.py b/tests/test_dollar_topics.py index 374f548b..0289051e 100644 --- a/tests/test_dollar_topics.py +++ b/tests/test_dollar_topics.py @@ -41,6 +41,40 @@ async def test_publish_to_dollar_sign_topics(): await asyncio.sleep(0.1) await b.shutdown() + +@pytest.mark.asyncio +async def test_publish_to_dollar_sign_topics_if_allowed(): + """Applications can use a topic with a leading $ character for their own purposes if it is allowed.""" + + cfg = { + 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, + 'allow_dollar_topics': True, + 'plugins': {'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True}}, + } + + b = Broker(config=cfg) + await b.start() + await asyncio.sleep(0.1) + c = MQTTClient(config={'auto_reconnect': False}) + await c.connect() + await asyncio.sleep(0.1) + await c.subscribe( + [('$#', QOS_0), + ('#', QOS_0)] + ) + await asyncio.sleep(0.1) + await c.publish('$MY', b'message should not be blocked') + await asyncio.sleep(0.1) + + msg = await c.deliver_message() + assert msg.topic == '$MY' + assert msg.data == b'message should not be blocked' + + await c.disconnect() + await asyncio.sleep(0.1) + await b.shutdown() + + @pytest.mark.asyncio async def test_hash_will_not_receive_dollar(): """A subscription to “#” will not receive any messages published to a topic beginning with a $ [MQTT-4.7.2-1].""" @@ -73,6 +107,39 @@ async def test_hash_will_not_receive_dollar(): await b.shutdown() +@pytest.mark.asyncio +async def test_hash_will_receive_dollar_if_allowed(): + """A subscription to “#” will not receive messages published to a topic beginning with a $ if it is allowed.""" + + cfg = { + 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, + 'allow_dollar_topics': True, + 'plugins': { + 'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True}, + 'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2} + } + } + + b = Broker(config=cfg) + await b.start() + await asyncio.sleep(0.1) + c = MQTTClient(config={'auto_reconnect': False}) + await c.connect() + await asyncio.sleep(0.1) + await c.subscribe( + [('#', QOS_0)] + ) + await asyncio.sleep(0.1) + + msg = await c.deliver_message() + assert msg.topic == '$SYS/broker/version' + assert b"aMQTT" in msg.data + + await c.disconnect() + await asyncio.sleep(0.1) + await b.shutdown() + + @pytest.mark.asyncio async def test_plus_will_not_receive_dollar(): """A subscription to “+/monitor/Clients” will not receive any messages published to “$SYS/monitor/Clients [MQTT-4.7.2-1]""" @@ -108,3 +175,42 @@ async def test_plus_will_not_receive_dollar(): await c.disconnect() await asyncio.sleep(0.1) await b.shutdown() + + +@pytest.mark.asyncio +async def test_plus_will_receive_dollar_if_allowed(): + """A subscription to “+/monitor/Clients” will receive any messages published to “$SYS/monitor/Clients if it is allowed.""" + # BrokerSysPlugin doesn't use $SYS/monitor/Clients, so this is an equivalent test with $SYS/broker topics + + cfg = { + 'listeners': {'default': {'type': 'tcp', 'bind': '127.0.0.1'}}, + 'allow_dollar_topics': True, + 'plugins': { + 'amqtt.plugins.authentication.AnonymousAuthPlugin': {"allow_anonymous": True}, + 'amqtt.plugins.sys.broker.BrokerSysPlugin': {"sys_interval": 2} + } + } + + b = Broker(config=cfg) + await b.start() + await asyncio.sleep(0.1) + c = MQTTClient(config={'auto_reconnect': False}) + await c.connect() + await asyncio.sleep(0.1) + await c.subscribe( + [('+/broker/#', QOS_0), + ('+/broker/time', QOS_0), + ('+/broker/clients/#', QOS_0), + ('+/broker/+/maximum', QOS_0) + ] + ) + await asyncio.sleep(0.1) + + + msg = await c.deliver_message() + assert msg.topic == '$SYS/broker/version' + assert b"aMQTT" in msg.data + + await c.disconnect() + await asyncio.sleep(0.1) + await b.shutdown()