libzypp 17.37.17
asyncqueue.cc
Go to the documentation of this file.
2#include <glib-unix.h>
3#include <ostream>
4
6
7namespace zyppng {
8
11
13 {
14 std::lock_guard lock(_watchLock);
15 _watches.insert( &watch );
16 }
17
19 {
20 std::lock_guard lock(_watchLock);
21 _watches.erase( &watch );
22 }
23
25 {
26 std::lock_guard lock(_watchLock);
27 std::for_each( _watches.begin(), _watches.end(), []( AsyncQueueWatch *w ){
28 w->postNotifyEvent();
29 });
30 }
31
33 , _queue( std::move(q) )
34 {
35 GError *error = NULL;
36
37 if (!g_unix_open_pipe (fds, FD_CLOEXEC, &error))
38 ERR << "Creating pipes for AsyncQueueWatch: " << error->message << std::endl;
39
40 if (!g_unix_set_fd_nonblocking (fds[0], TRUE, &error) ||
41 !g_unix_set_fd_nonblocking (fds[1], TRUE, &error))
42 ERR << "Set pipes non-blocking for AsyncQueueWatch: "<< error->message << std::endl;
43 }
44
46 {
47 close (fds[0]);
48 close (fds[1]);
49 }
50
52
53 AsyncQueueWatch::AsyncQueueWatch(std::shared_ptr<zyppng::AsyncQueueBase> &&queue )
54 : AsyncQueueWatch( *( new AsyncQueueWatchPrivate( std::move(queue), *this ) ) )
55 { }
56
60
61 std::shared_ptr<AsyncQueueWatch> AsyncQueueWatch::create( std::shared_ptr<AsyncQueueBase> queue )
62 {
63 std::shared_ptr<AsyncQueueWatch> ptr ( new AsyncQueueWatch( std::move(queue) ) );
64 auto d = ptr->d_func();
65 ptr->updateFdWatch( d->fds[0], AbstractEventSource::Read );
66 d->_queue->addWatch( *ptr );
67 return ptr;
68 }
69
71 {
72 // sync point, since the queue locks all its watches before changing or notifying them
73 // we should never run into a bad situation where the AsyncQueueWatch is deleted while notified from a different thread.
74 // In case watches are notified this will block and the other way round
75 d_func()->_queue->removeWatch( *this );
76 }
77
79 {
80 Z_D();
81 int res = -1;
82 guint8 one = 1;
83
84 do {
85 errno = 0;
86 res = write (d->fds[1], &one, sizeof one);
87 } while (G_UNLIKELY (res == -1 && errno == EINTR));
88 }
89
91 {
92 return d_func()->_sigMessageAvailable;
93 }
94
96 {
97 Z_D();
98 char buffer[16];
99
100 /* read until it is empty */
101 while (read (d->fds[0], buffer, sizeof buffer) == sizeof buffer);
102 d->_sigMessageAvailable.emit();
103 }
104
106 {
107 }
108
109
110}
111
AbstractEventSourcePrivate(AbstractEventSource &p)
void addWatch(AsyncQueueWatch &watch)
Definition asyncqueue.cc:12
virtual ~AsyncQueueBase()
Definition asyncqueue.cc:9
void removeWatch(AsyncQueueWatch &watch)
Definition asyncqueue.cc:18
std::set< AsyncQueueWatch * > _watches
Definition asyncqueue.h:34
std::recursive_mutex _watchLock
Definition asyncqueue.h:35
AsyncQueueWatchPrivate(std::shared_ptr< AsyncQueueBase > &&q, AsyncQueueWatch &p)
Definition asyncqueue.cc:32
std::shared_ptr< AsyncQueueBase > _queue
~AsyncQueueWatch() override
Definition asyncqueue.cc:70
void onSignal(int signal) override
AsyncQueueWatch(std::shared_ptr< AsyncQueueBase > &&queue)
void onFdReady(int fd, int events) override
Definition asyncqueue.cc:95
SignalProxy< void()> sigMessageAvailable()
Definition asyncqueue.cc:90
static std::shared_ptr< AsyncQueueWatch > create(std::shared_ptr< AsyncQueueBase > queue)
Definition asyncqueue.cc:61
Definition Arch.h:364
#define ERR
Definition Logger.h:102
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105