Skip to content

Commit eedbbd6

Browse files
authored
add support for Pub/Sub via HTTP (#1184)
1 parent 4b3e058 commit eedbbd6

File tree

6 files changed

+126
-4
lines changed

6 files changed

+126
-4
lines changed

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
templates_path = ['_templates']
6464

6565
# The suffix of source filenames.
66-
source_suffix = '.rst'
66+
source_suffix = {'.rst': 'restructuredtext'}
6767

6868
locale_dirs = ['locale/'] # path is example but recommended.
6969

docs/pubsub.rst

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,30 @@ Example directive:
2727
broker:
2828
type: mqtt
2929
url: mqtt://localhost:1883
30+
channel: messages/a/data # optional
31+
32+
HTTP
33+
----
34+
35+
Example directive:
36+
37+
.. code-block:: yaml
38+
39+
pubsub:
40+
broker:
41+
type: http
42+
url: https://ntfy.sh
43+
channel: messages/a/data # optional
3044
3145
.. note::
3246

33-
For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883``
47+
For any Pub/Sub endpoints requiring authentication, encode the ``url`` value as follows:
48+
49+
* ``mqtt://username:password@localhost:1883``
50+
* ``https://username:password@localhost``
51+
52+
.. note::
3453

54+
If no ``channel`` is defined, the relevant OGC API endpoint is used.
3555

3656
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html

pycsw/broker/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,6 @@ def load_client(def_: dict) -> BasePubSubClient:
4848

4949

5050
CLIENTS = {
51-
'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient'
51+
'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient',
52+
'http': 'pycsw.broker.http.HTTPPubSubClient'
5253
}

pycsw/broker/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def __init__(self, publisher_def: dict):
5050

5151
self.type = None
5252
self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}'
53+
self.channel = publisher_def.get('channel')
5354

5455
self.show_link = publisher_def.get('show_link', True)
5556
self.broker = publisher_def['url']

pycsw/broker/http.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <[email protected]>
4+
# Angelos Tzotsos <[email protected]>
5+
#
6+
# Copyright (c) 2025 Tom Kralidis
7+
# Copyright (c) 2025 Angelos Tzotsos
8+
#
9+
# Permission is hereby granted, free of charge, to any person
10+
# obtaining a copy of this software and associated documentation
11+
# files (the "Software"), to deal in the Software without
12+
# restriction, including without limitation the rights to use,
13+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
14+
# copies of the Software, and to permit persons to whom the
15+
# Software is furnished to do so, subject to the following
16+
# conditions:
17+
#
18+
# The above copyright notice and this permission notice shall be
19+
# included in all copies or substantial portions of the Software.
20+
#
21+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
23+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
24+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
25+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
26+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
27+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
28+
# OTHER DEALINGS IN THE SOFTWARE.
29+
#
30+
# =================================================================
31+
32+
import logging
33+
34+
import requests
35+
36+
from pycsw.broker.base import BasePubSubClient
37+
38+
LOGGER = logging.getLogger(__name__)
39+
40+
41+
class HTTPPubSubClient(BasePubSubClient):
42+
"""HTTP client"""
43+
44+
def __init__(self, broker_url):
45+
"""
46+
Initialize object
47+
48+
:param publisher_def: provider definition
49+
50+
:returns: pycsw.pubsub.http.HTTPPubSubClient
51+
"""
52+
53+
super().__init__(broker_url)
54+
self.type = 'http'
55+
self.auth = None
56+
57+
msg = f'Initializing to broker {self.broker_safe_url} with id {self.client_id}' # noqa
58+
LOGGER.debug(msg)
59+
60+
if None not in [self.broker_url.username, self.broker_url.password]:
61+
LOGGER.debug('Setting credentials')
62+
self.auth = (
63+
self.broker_url.username,
64+
self.broker_url.password
65+
)
66+
67+
def connect(self) -> None:
68+
"""
69+
Connect to an HTTP broker
70+
71+
:returns: None
72+
"""
73+
74+
LOGGER.debug('No connection to HTTP')
75+
pass
76+
77+
def pub(self, channel: str, message: str, qos: int = 1) -> bool:
78+
"""
79+
Publish a message to a broker/channel
80+
81+
:param channel: `str` of topic
82+
:param message: `str` of message
83+
84+
:returns: `bool` of publish result
85+
"""
86+
87+
LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
88+
LOGGER.debug(f'Channel: {channel}')
89+
LOGGER.debug(f'Message: {message}')
90+
91+
url = f'{self.broker}/{channel}'
92+
93+
try:
94+
response = requests.post(url, auth=self.auth, json=message)
95+
response.raise_for_status()
96+
except Exception as err:
97+
LOGGER.debug(f'Message publishing failed: {err}')
98+
99+
def __repr__(self):
100+
return f'<HTTPPubSubClient> {self.broker_safe_url}'

pycsw/ogc/pubsub/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def publish_message(pubsub_client, action: str, collection: str = None,
4848
:returns: `bool` of whether message publishing was successful
4949
"""
5050

51-
channel = f'collections/{collection}'
51+
channel = pubsub_client.channel or f'collections/{collection}'
5252
type_ = f'org.ogc.api.collection.item.{action}'
5353

5454
if action in ['create', 'update']:

0 commit comments

Comments
 (0)