37 if (!g_unix_open_pipe (
fds, FD_CLOEXEC, &error))
38 ERR <<
"Creating pipes for AsyncQueueWatch: " << error->message << std::endl;
40 if (!g_unix_set_fd_nonblocking (
fds[0], TRUE, &error) ||
41 !g_unix_set_fd_nonblocking (
fds[1], TRUE, &error))
42 ERR <<
"Set pipes non-blocking for AsyncQueueWatch: "<< error->message << std::endl;
63 std::shared_ptr<AsyncQueueWatch> ptr (
new AsyncQueueWatch( std::move(queue) ) );
64 auto d = ptr->d_func();
66 d->_queue->addWatch( *ptr );
75 d_func()->_queue->removeWatch( *
this );
86 res = write (d->fds[1], &one,
sizeof one);
87 }
while (G_UNLIKELY (res == -1 && errno == EINTR));
92 return d_func()->_sigMessageAvailable;
101 while (read (d->fds[0], buffer,
sizeof buffer) ==
sizeof buffer);
102 d->_sigMessageAvailable.emit();
AbstractEventSourcePrivate(AbstractEventSource &p)
void addWatch(AsyncQueueWatch &watch)
virtual ~AsyncQueueBase()
void removeWatch(AsyncQueueWatch &watch)
std::set< AsyncQueueWatch * > _watches
std::recursive_mutex _watchLock
~AsyncQueueWatchPrivate() override
AsyncQueueWatchPrivate(std::shared_ptr< AsyncQueueBase > &&q, AsyncQueueWatch &p)
std::shared_ptr< AsyncQueueBase > _queue
~AsyncQueueWatch() override
void onSignal(int signal) override
AsyncQueueWatch(std::shared_ptr< AsyncQueueBase > &&queue)
void onFdReady(int fd, int events) override
SignalProxy< void()> sigMessageAvailable()
static std::shared_ptr< AsyncQueueWatch > create(std::shared_ptr< AsyncQueueBase > queue)
#define ZYPP_IMPL_PRIVATE(Class)