libzypp 17.37.17
asyncqueue.h
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9#ifndef ZYPP_NG_THREAD_ASYNCQUEUE_H_INCLUDED
10#define ZYPP_NG_THREAD_ASYNCQUEUE_H_INCLUDED
11
12#include <zypp-core/zyppng/base/AbstractEventSource>
13
14#include <queue>
15#include <set>
16#include <mutex>
17#include <memory>
18#include <optional>
19#include <condition_variable>
20
21namespace zyppng {
22
23 class AsyncQueueWatch;
24
26 public:
27 virtual ~AsyncQueueBase();
28
29 void addWatch ( AsyncQueueWatch &watch );
30 void removeWatch ( AsyncQueueWatch &watch );
31 void notifyWatches ( );
32
33 private:
34 std::set<AsyncQueueWatch *> _watches;
35 std::recursive_mutex _watchLock;
36 };
37
42 template< class Message >
43 class AsyncQueue : public AsyncQueueBase {
44
45 public:
46
47 using Ptr = std::shared_ptr<AsyncQueue>;
48
49 static Ptr create () {
50 return Ptr( new AsyncQueue() );
51 }
52
53 AsyncQueue(const AsyncQueue&) = delete;
54 AsyncQueue& operator=(const AsyncQueue&) = delete;
55
63 template< typename T = Message >
64 void pushUnlocked ( T &&value ) {
65 _messages.push( std::forward<T>(value) );
66 }
67
72 template< typename T = Message >
73 void push ( T &&value ) {
74 {
75 std::lock_guard lk( _mut );
76 pushUnlocked( std::forward<T>(value) );
77 }
78 notify();
79 }
80
85 std::unique_lock<std::mutex> lk( _mut );
86 _cv.wait( lk, [this](){ return _messages.size() > 0; } );
87 Message msg = std::move( _messages.front() );
88 _messages.pop();
89 return msg;
90 }
91
96 std::optional<Message> tryPop () {
97 std::lock_guard lk( _mut );
98 return tryPopUnlocked();
99 }
100
105 std::optional<Message> tryPopUnlocked () {
106 if ( _messages.size() ) {
107 Message msg = std::move( _messages.front() );
108 _messages.pop();
109 return msg;
110 }
111 return {};
112 }
113
118 void lock () {
119 _mut.lock();
120 }
121
126 void unlock () {
127 _mut.unlock();
128 }
129
134 void notify () {
135 _cv.notify_all();
137 }
138
139 private:
140 AsyncQueue() = default;
141 std::queue<Message> _messages;
142 std::mutex _mut;
143 std::condition_variable _cv;
144 };
145
148 {
150 public:
151
152 static std::shared_ptr<AsyncQueueWatch> create ( std::shared_ptr<AsyncQueueBase> queue );
153 ~AsyncQueueWatch() override;
154
155 void postNotifyEvent ();
156
158
159 // AbstractEventSource interface
160 void onFdReady(int fd, int events) override;
161 void onSignal(int signal) override;
162
163 protected:
164 AsyncQueueWatch( std::shared_ptr<AsyncQueueBase> &&queue );
166 };
167
168}
169
170
171#endif
void addWatch(AsyncQueueWatch &watch)
Definition asyncqueue.cc:12
virtual ~AsyncQueueBase()
Definition asyncqueue.cc:9
void removeWatch(AsyncQueueWatch &watch)
Definition asyncqueue.cc:18
std::set< AsyncQueueWatch * > _watches
Definition asyncqueue.h:34
std::recursive_mutex _watchLock
Definition asyncqueue.h:35
void onSignal(int signal) override
AsyncQueueWatch(std::shared_ptr< AsyncQueueBase > &&queue)
void onFdReady(int fd, int events) override
Definition asyncqueue.cc:95
SignalProxy< void()> sigMessageAvailable()
Definition asyncqueue.cc:90
static std::shared_ptr< AsyncQueueWatch > create(std::shared_ptr< AsyncQueueBase > queue)
Definition asyncqueue.cc:61
std::condition_variable _cv
Definition asyncqueue.h:143
std::optional< Message > tryPopUnlocked()
Definition asyncqueue.h:105
AsyncQueue(const AsyncQueue &)=delete
std::optional< Message > tryPop()
Definition asyncqueue.h:96
AsyncQueue()=default
std::queue< Message > _messages
Definition asyncqueue.h:141
void pushUnlocked(T &&value)
Definition asyncqueue.h:64
static Ptr create()
Definition asyncqueue.h:49
std::shared_ptr< AsyncQueue > Ptr
Definition asyncqueue.h:47
AsyncQueue & operator=(const AsyncQueue &)=delete
void push(T &&value)
Definition asyncqueue.h:73
#define ZYPP_DECLARE_PRIVATE(Class)
Definition zyppglobal.h:87
#define LIBZYPP_NG_EXPORT
Definition zyppglobal.h:8