37 LOG_DEBUG(logger,
"Event: {}, Topic: {}", ev, topic);
40 Lock<FastMutex> lock(mtxSubscribers);
42 if (subscribers.count(topic) > 0)
44 vector<EventHandlerBase*>& subs = subscribers.at(topic);
45 auto begin = subs.begin();
46 auto end = subs.end();
48 for (
auto it = begin; it != end; it++)
53 if ((*it) != subscriber)
62 D(assert(delayMs >= EVENT_BROKER_MIN_DELAY &&
63 "delayMs must be longer or equal to EVENT_BROKER_MIN_DELAY"));
65 delayMs = std::max(delayMs, EVENT_BROKER_MIN_DELAY);
67 Lock<FastMutex> lock(mtxDelayedEvents);
70 long long delayTicks =
static_cast<long long>(delayMs);
72 DelayedEvent dev{eventCounter++, ev, topic,
78 for (
auto it = delayedEvents.begin(); it != delayedEvents.end(); it++)
80 if (dev.deadline < (*it).deadline)
82 delayedEvents.insert(it, dev);
89 delayedEvents.push_back(dev);
96 Lock<FastMutex> lock(mtxDelayedEvents);
97 for (
auto it = delayedEvents.begin(); it != delayedEvents.end(); it++)
99 if ((*it).schedId ==
id)
101 delayedEvents.erase(it);
109 Lock<FastMutex> lock(mtxSubscribers);
110 subscribers[topic].push_back(subscriber);
115 Lock<FastMutex> lock(mtxSubscribers);
117 deleteSubscriber(subscribers.at(topic), subscriber);
122 Lock<FastMutex> lock(mtxSubscribers);
123 for (
auto it = subscribers.begin(); it != subscribers.end(); it++)
124 deleteSubscriber(it->second, subscriber);
129 Lock<FastMutex> lock(mtxDelayedEvents);
130 delayedEvents.clear();
134void EventBroker::run()
136 Lock<FastMutex> lock(mtxDelayedEvents);
139 while (delayedEvents.size() > 0 &&
143 DelayedEvent dev = delayedEvents.front();
144 delayedEvents.erase(delayedEvents.begin());
149 Unlock<FastMutex> unlock(lock);
150 post(dev.event, dev.topic);
157 if (delayedEvents.size() > 0)
160 if (delayedEvents.front().deadline < sleepUntil)
166 Unlock<FastMutex> unlock(lock);
174void EventBroker::deleteSubscriber(vector<EventHandlerBase*>& subs,
175 EventHandlerBase* subscriber)
177 auto it = subs.begin();
179 while (it != subs.end())
180 if (*it == subscriber)
#define LOG_DEBUG(logger,...)
bool shouldStop()
Tells whether or not the ActiveObject should stop its execution.
void post(const Event &ev, uint8_t topic, EventHandlerBase *subscriber=nullptr)
uint16_t postDelayed(const Event &ev, uint8_t topic, unsigned int delayMs)
void subscribe(EventHandlerBase *subscriber, uint8_t topic)
void removeDelayed(uint16_t id)
void unsubscribe(EventHandlerBase *subscriber, uint8_t topic)
Unsubscribe an EventHandler from a specific topic This function should be used only for testing purpo...
EventBroker()
Construct a new Event Broker object.
void clearDelayedEvents()
Unschedules all pending events. This function should be used only for testing purposes.
static StackLogger & getInstance()
void sleepUntil(long long absoluteTimeMs)
Sleep until a given time in milliseconds.
long long getOldTick()
Get the current time in milliseconds.
This file includes all the types the logdecoder script will decode.