Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.cmake
CMakeCache.txt
test/tester
src/libmsghub.a
examples/server
examples/client
32 changes: 32 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
PROJECT(msghub)
CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
#enable_testing()

SET(CMAKE_EXPORT_COMPILE_COMMANDS On)

# Threads
SET(THREADS_PREFER_PTHREAD_FLAG ON)
FIND_PACKAGE(Threads REQUIRED)

# Boost libraries
SET(Boost_USE_MULTITHREAD ON)
SET(Boost_USE_STATIC_LIBS OFF)

FIND_PACKAGE(Boost 1.65.0
COMPONENTS system thread unit_test_framework test_exec_monitor
REQUIRED)

LINK_DIRECTORIES(${Boost_LIBRARY_DIRS})
# https://cmake.org/cmake/help/latest/prop_tgt/CXX_STANDARD.html
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic ")
SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -g -O3 -march=native")
SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address,undefined")
SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -ggdb -fno-omit-frame-pointer -O0")

ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(test)
ADD_SUBDIRECTORY(examples)
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Examples
I. Create hub, subscribe on "any topic" and publish "new message" into "any topic":
```c++
// Message handler
void on_message(const std::string& topic, std::vector<char>& message)
void on_message(const std::string& topic, std::vector<char> const& message)
{
// handle message
}
Expand All @@ -34,7 +34,7 @@ I. Create hub, subscribe on "any topic" and publish "new message" into "any topi
msghub.subscribe("any topic", on_message);
// Current or any another client
msghub.publish("any topic", "new message");
io_service.run();
io_service.run(); // keep server active
}
```
II. Connect to hub on "localhost" and publish "new message" into "any topic":
Expand All @@ -45,6 +45,6 @@ II. Connect to hub on "localhost" and publish "new message" into "any topic":
msghub msghub(io_service);
msghub.connect("localhost", 0xbee);
msghub.publish("any topic", "new message");
io_service.run();
// msghub.join(); // implied
}
```
11 changes: 11 additions & 0 deletions containers/issue1
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM ubuntu:18.04
ENV DEBIAN_FRONTEND=noninteractive
RUN \
apt update; \
apt -qqyy install build-essential git cmake libboost-all-dev

RUN git clone https://github.com/sehe/msghub --depth 1 -b cmake
WORKDIR /msghub
RUN cmake . && make

CMD ["/bin/bash", "-c", "(./examples/server & ./examples/client ; kill %1; wait)"]
7 changes: 7 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#LINK_LIBRARIES(msghub)

ADD_EXECUTABLE(server server.cpp)
TARGET_LINK_LIBRARIES(server msghub)

ADD_EXECUTABLE(client client.cpp)
TARGET_LINK_LIBRARIES(client msghub)
16 changes: 16 additions & 0 deletions examples/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include <msghub.h>
#include <iostream>

int main() {
boost::asio::io_service io_service;
msghub msghub(io_service);
if (!msghub.connect("localhost", 1334)) {
std::cerr << "Unable to connect to hub\n";
} else {
if (!msghub.publish("Publish", "new message")) {
std::cerr << "Unable to post message on Publish channel\n";
}
}

//msghub.join(); // implied in destructor
}
28 changes: 28 additions & 0 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include <boost/asio.hpp>
#include <msghub.h>
#include <iostream>
#include <iomanip>

void on_message(const std::string& topic, std::vector<char> const& message)
{
std::cout << "on_message: " << std::quoted(topic) << " " <<
std::quoted(std::string(message.begin(), message.end())) << std::endl;
}

int main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
// Create hub to listen on 0xbee port
msghub msghub(io_service);
if (!msghub.create(1334))
std::cerr << "Error creating hub server\n";
// Subscribe on "any topic"
if (!msghub.subscribe("Publish", on_message))
std::cerr << "Error creating subscription\n";

// Current or any another client
//msghub.publish("Publish", "hello");

io_service.run(); // blocks for-ever
}
13 changes: 2 additions & 11 deletions pub/msghub.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class msghub_impl;
class msghub
{
public:
typedef std::function< void(const std::string& topic, std::vector<char>& message) > onmessage;
typedef std::function< void(const std::string& topic, std::vector<char> const& message) > onmessage;

public:
msghub( boost::asio::io_service& s );
Expand All @@ -24,19 +24,10 @@ class msghub
bool publish(const std::string& topic, const std::vector<char>& message);
bool publish(const std::string& topic, const std::string& message);

//template<typename T>
//publish(const std::string& topic, const T& buff)
//{
// std::copy(buff.begin(), buff.end(), back_inserter(data));
// type = msgtype;
// length = sizeof(type) + sizeof(length) + data.size();
//}


void join();

private:
boost::shared_ptr<msghub_impl> pimpl;
};

#endif
#endif
16 changes: 16 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SET(MSGHUB_SRC
msghub.cpp
msghub_impl.cpp
hubclient.cpp
hubconnection.cpp
hubmessage.cpp
)

ADD_LIBRARY(msghub STATIC ${MSGHUB_SRC})
TARGET_COMPILE_DEFINITIONS(msghub PUBLIC "-DBOOST_BIND_NO_PLACEHOLDERS=1")
TARGET_INCLUDE_DIRECTORIES(msghub SYSTEM PUBLIC ${Boost_INCLUDE_DIRS})
TARGET_LINK_LIBRARIES(msghub Threads::Threads)
TARGET_LINK_LIBRARIES(msghub ${Boost_SYSTEM_LIBRARY})
TARGET_LINK_LIBRARIES(msghub ${Boost_THREAD_LIBRARY})

TARGET_INCLUDE_DIRECTORIES(msghub SYSTEM PUBLIC ../pub/)
4 changes: 2 additions & 2 deletions src/hub.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ class hub
{
public:
virtual void distribute(boost::shared_ptr<hubclient> subscriber, hubmessage& msg) = 0;
virtual void deliver(boost::shared_ptr<hubconnection> publisher, hubmessage& msg) = 0;
virtual void deliver(hubmessage& msg) = 0;
};
#endif
#endif
54 changes: 39 additions & 15 deletions src/hubconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
//#include <boost/thread/thread.hpp>
//#include <boost/lexical_cast.hpp>
//#include <boost/thread/mutex.hpp>
#include <iostream>

using boost::asio::ip::tcp;

hubconnection::hubconnection(boost::asio::io_service& io_service, hub& courier)
: io_service_(io_service)
, socket_(io_service)
, courier_(courier)
, is_closing(false)
{}

bool hubconnection::init(const std::string& host, uint16_t port)
Expand Down Expand Up @@ -64,6 +66,7 @@ bool hubconnection::write(const hubmessage& msg, bool wait)
{
io_service_.post(boost::bind(&hubconnection::do_write, shared_from_this(), msg));
}
return true;
}
catch (std::exception&)
{
Expand All @@ -72,9 +75,9 @@ bool hubconnection::write(const hubmessage& msg, bool wait)

}

void hubconnection::close()
void hubconnection::close(bool forced)
{
io_service_.post(boost::bind(&hubconnection::do_close, shared_from_this()));
io_service_.post(boost::bind(&hubconnection::do_close, shared_from_this(), forced));
}

void hubconnection::handle_read_header(const boost::system::error_code& error)
Expand All @@ -89,15 +92,15 @@ void hubconnection::handle_read_header(const boost::system::error_code& error)
}
else
{
do_close();
do_close(true);
}
}

void hubconnection::handle_read_body(const boost::system::error_code& error)
{
if (!error)
{
courier_.deliver(shared_from_this(), inmsg_);
courier_.deliver(inmsg_);

boost::asio::async_read(socket_,
boost::asio::buffer(inmsg_.data(), inmsg_.header_length()),
Expand All @@ -106,15 +109,18 @@ void hubconnection::handle_read_body(const boost::system::error_code& error)
}
else
{
do_close();
do_close(true);
}
}

void hubconnection::do_write(hubmessage msg)
{
boost::mutex::scoped_lock lock(write_msgs_lock_);
bool iswriting = !outmsg_queue_.empty();
outmsg_queue_.push_back(msg);
bool iswriting = true;
{
boost::mutex::scoped_lock lock(write_msgs_lock_);
iswriting = !outmsg_queue_.empty();
outmsg_queue_.push_back(msg);
}
if (!iswriting)
{
boost::asio::async_write(socket_,
Expand All @@ -129,25 +135,43 @@ void hubconnection::handle_write(const boost::system::error_code& error)
{
if (!error)
{
boost::mutex::scoped_lock lock(write_msgs_lock_);
outmsg_queue_.pop_front();
if (!outmsg_queue_.empty())
bool remaining_messages = false;
{
boost::mutex::scoped_lock lock(write_msgs_lock_);
outmsg_queue_.pop_front();
remaining_messages = !outmsg_queue_.empty();
}
if (remaining_messages)
{
boost::asio::async_write(socket_,
boost::asio::buffer(outmsg_queue_.front().data(),
outmsg_queue_.front().length()),
boost::bind(&hubconnection::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
} else if (is_closing) {
do_close(false);
}
}
else
{
do_close();
do_close(true);
}
}

void hubconnection::do_close()
void hubconnection::do_close(bool forced)
{
is_closing = true;

// TODO: Unsubscribe?
socket_.close();

bool immediate = forced;
if (!forced) {
boost::mutex::scoped_lock lock(write_msgs_lock_);
immediate |= outmsg_queue_.empty();
}

if (immediate) {
if (socket_.is_open())
socket_.close();
}
}
10 changes: 6 additions & 4 deletions src/hubconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>
#include <deque>

#include <boost/atomic.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
Expand All @@ -29,23 +30,24 @@ class hubconnection : public boost::enable_shared_from_this<hubconnection>
hubconnection(boost::asio::io_service& io_service, hub& courier);
bool init(const std::string& host, uint16_t port);
bool write(const hubmessage& msg, bool wait = false);
void close();
void close(bool forced);

private:

void handle_read_header(const boost::system::error_code& error);
void handle_read_body(const boost::system::error_code& error);
void do_write(hubmessage msg);
void handle_write(const boost::system::error_code& error);
void do_close();
void do_close(bool forced);

private:
hub& courier_;
boost::asio::io_service& io_service_;
tcp::socket socket_;
hub& courier_;
hubmessage inmsg_;
hubmessage_queue outmsg_queue_;
boost::atomic_bool is_closing;
boost::mutex write_msgs_lock_;
};

#endif
#endif
Loading