libzypp 17.38.5
eventdispatcher_glib.cc
Go to the documentation of this file.
1#include "timer.h"
5
9#include <zypp-core/ng/base/UnixSignalSource>
10
11namespace zyppng {
12
13static int inline readMask () {
14 return ( G_IO_IN | G_IO_HUP );
15}
16
17static int inline writeMask () {
18 return ( G_IO_OUT );
19}
20
21static int inline excpMask () {
22 return ( G_IO_PRI );
23}
24
25static int inline evModeToMask ( int mode ) {
26 int cond = 0;
27 if ( mode & AbstractEventSource::Read ) {
28 cond = readMask() | G_IO_ERR;
29 }
30 if ( mode & AbstractEventSource::Write ) {
31 cond = cond | writeMask() | G_IO_ERR;
32 }
33 if ( mode & AbstractEventSource::Exception ) {
34 cond = cond | excpMask() | G_IO_ERR;
35 }
36 return cond;
37}
38
39static int inline gioConditionToEventTypes ( const GIOCondition rEvents, const int requestedEvs ) {
40 int ev = 0;
41 if ( ( rEvents & requestedEvs ) != 0 ) {
42 if ( ( rEvents & readMask() ) && ( requestedEvs & readMask() ) )
44 if ( ( rEvents & writeMask() ) && ( requestedEvs & writeMask() ) )
46 if ( ( rEvents & excpMask()) && ( requestedEvs & excpMask() ) )
48 if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
50 }
51 return ev;
52}
53
54static GSourceFuncs abstractEventSourceFuncs = {
58 nullptr,
59 nullptr,
60 nullptr
61};
62
64 GAbstractEventSource *src = nullptr;
65 src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
66 (void) new (&src->pollfds) std::vector<GUnixPollFD>();
67
68 src->eventSource = nullptr;
69 src->_ev = ev;
70 return src;
71}
72
74{
75 for ( GUnixPollFD &fd : src->pollfds ) {
76 if ( fd.tag )
77 g_source_remove_unix_fd( &src->source, fd.tag );
78 }
79
80 src->pollfds.clear();
81 src->pollfds.std::vector< GUnixPollFD >::~vector();
82 g_source_destroy( &src->source );
83 g_source_unref( &src->source );
84}
85
86gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
87{
88 //we can not yet determine if the GSource is ready, polling FDs also have no
89 //timeout, so lets continue
90 if ( timeout )
91 *timeout = -1;
92 return false;
93}
94
95//here we need to figure out which FDs are pending
97{
98 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
99
100 //check for pending and remove orphaned entries
101 bool hasPending = false;
102
103 for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
104 if ( fdIt->tag == nullptr ) {
105 //this pollfd was removed, clear it from the list
106 //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
107 //next check it is removed for good
108 fdIt = src->pollfds.erase( fdIt );
109 } else {
110 GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
111 if ( pendEvents & G_IO_NVAL ){
112 //that poll is broken, do we need to do more????
113 fdIt = src->pollfds.erase( fdIt );
114 } else {
115 hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
116 fdIt++;
117 }
118 }
119 }
120
121 //if the pollfds are empty trigger dispatch so this source can be removed
122 return hasPending || src->pollfds.empty();
123}
124
125//Trigger all event sources that have been activated
126gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
127{
128 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
129
130 if ( !src )
131 return G_SOURCE_REMOVE;
132
133 //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
134 //were we trigger all ready FDs
135 if ( src->pollfds.empty() ) {
136 auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
137
138 if ( it != src->_ev->_eventSources.end() ) {
140 src->_ev->_eventSources.erase( it );
141 return G_SOURCE_REMOVE;
142 }
143 }
144
145 for ( const GUnixPollFD &pollfd : src->pollfds ) {
146 //do not trigger orphaned ones
147 if ( pollfd.tag != nullptr ) {
148 GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
149
150 if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
151 int ev = gioConditionToEventTypes( pendEvents, pollfd.reqEvents );
152 // we require all event objects to be used in shared_ptr form, by doing this we make sure that the object is not destroyed
153 // while we still use it. However this WILL throw in case of using the EventSource outside of shared_ptr bounds
154 auto eventSourceLocked = src->eventSource->shared_this<AbstractEventSource>();
155 eventSourceLocked->onFdReady( pollfd.pollfd, ev );
156 }
157 }
158 }
159
160 return G_SOURCE_CONTINUE;
161}
162
163static GSourceFuncs glibTimerSourceFuncs = {
167 nullptr,
168 nullptr,
169 nullptr
170};
171
172//check when this timer expires and set the correct timeout
173gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
174{
175 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
176 if ( !source )
177 return false; //not ready for dispatch
178
179 if ( !source->_t )
180 return false;
181
182 uint64_t nextTimeout = source->_t->remaining();
183 if ( timeout ) {
184 //this would be a really looong timeout, but be safe
185 if ( nextTimeout > G_MAXINT )
186 *timeout = G_MAXINT;
187 else
188 *timeout = static_cast<gint>( nextTimeout );
189 }
190 return ( nextTimeout == 0 );
191}
192
193//this is essentially the same as prepare
195{
196 return prepare( source, nullptr );
197}
198
199//emit the expired timers, restart timers that are no single shots
200gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
201{
202 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
203 if ( !source )
204 return true;
205
206 if ( source->_t == nullptr )
207 return true;
208 //this will emit the expired signal and reset the timer
209 //or stop it in case its a single shot timer
210 source->_t->shared_this<Timer>()->expire();
211 return true;
212}
213
215{
216 GLibTimerSource *src = nullptr;
217 src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
218 src->_t = nullptr;
219 return src;
220}
221
223{
224 g_source_destroy( &src->source );
225 g_source_unref( &src->source );
226}
227
231static gboolean eventLoopIdleFunc ( gpointer user_data )
232{
233 auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
234 if ( dPtr ) {
235 if( dPtr->runIdleTasks() ) {
236 return G_SOURCE_CONTINUE;
237 }
238
239 g_source_unref ( dPtr->_idleSource );
240 dPtr->_idleSource = nullptr;
241 }
242 return G_SOURCE_REMOVE;
243}
244
246{
247 source = g_child_watch_source_new( pid );
248}
249
251 : tag( other.tag )
252 , source( other.source )
253 , callback( std::move( other.callback ) )
254{
255 other.source = nullptr;
256}
257
259{
260 if ( source ) {
261 g_source_destroy( source );
262 g_source_unref( source );
263 }
264}
265
267{
268 tag = other.tag;
269 source = other.source;
270 callback = std::move( other.callback );
271 other.source = nullptr;
272 return *this;
273}
274
276{
277 _myThreadId = std::this_thread::get_id();
278
279 //if we get a context specified ( usually when created for main thread ) we use it
280 //otherwise we create our own
281 if ( ctx ) {
282 _ctx = ctx;
283 g_main_context_ref ( _ctx );
284 } else {
285 _ctx = g_main_context_new();
286 }
287 // Enable this again once we switch to a full async API that requires a eventloop before calling any zypp functions
288 // g_main_context_push_thread_default( _ctx );
289}
290
292{
293 std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
294 GLibTimerSource::destruct( src );
295 });
296 std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
297 GAbstractEventSource::destruct( src );
298 });
299 _runningTimers.clear();
300 _eventSources.clear();
301
302 if ( _idleSource ) {
303 g_source_destroy( _idleSource );
304 g_source_unref ( _idleSource );
305 }
306
307 //g_main_context_pop_thread_default( _ctx );
308 g_main_context_unref( _ctx );
309}
310
312{
313 //run all user defined idle functions
314 //if they return true, they are executed again in the next idle run
315 decltype ( _idleFuncs ) runQueue;
316 runQueue.swap( _idleFuncs );
317
318 while ( runQueue.size() ) {
319 EventDispatcher::IdleFunction fun( std::move( runQueue.front() ) );
320 runQueue.pop();
321 if ( fun() )
322 _idleFuncs.push( std::move(fun) );
323 }
324
325 //keep this as the last thing to call after all user code was executed
326 if ( _unrefLater.size() )
327 _unrefLater.clear();
328
329 return _idleFuncs.size() || _unrefLater.size();
330}
331
333{
334 if ( !_idleSource ) {
335 _idleSource = g_idle_source_new ();
336 g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
337 g_source_attach ( _idleSource, _ctx );
338 }
339}
340
341std::shared_ptr<EventDispatcher> EventDispatcherPrivate::create()
342{
343 return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
344}
345
346void EventDispatcherPrivate::waitPidCallback( GPid pid, gint status, gpointer user_data )
347{
348 EventDispatcherPrivate *that = reinterpret_cast<EventDispatcherPrivate *>( user_data );
349
350 try {
351 auto data = std::move( that->_waitPIDs.at(pid) );
352 that->_waitPIDs.erase( pid );
353
354 if ( data.callback )
355 data.callback( pid, status );
356
357 g_spawn_close_pid( pid );
358
359 // no need to take care of releasing the GSource, the event loop took care of that
360
361 } catch ( const std::out_of_range &e ) {
362 return;
363 }
364}
365
367{
368 if ( !user_data )
369 return false;
370
371 GlibTimeoutData *data = reinterpret_cast<GlibTimeoutData *>(user_data);
372 return data->_callback();
373}
374
376{
377 if ( !user_data )
378 return;
379
380 GlibTimeoutData *data = reinterpret_cast<GlibTimeoutData *>(user_data);
381 delete data;
382}
383
385
387 : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx), *this ) )
388{
389}
390
394
396{
397 Z_D();
398 if ( notifier.eventDispatcher().lock().get() != this )
399 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
400
401 AbstractEventSource *notifyPtr = &notifier;
402
403 GAbstractEventSource *evSrc = nullptr;
404 auto &evSrcList = d->_eventSources;
405 auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ]( const auto elem ){ return elem->eventSource == notifyPtr; } );
406 if ( itToEvSrc == evSrcList.end() ) {
407
408 evSrc = GAbstractEventSource::create( d );
409 evSrc->eventSource = notifyPtr;
410 evSrcList.push_back( evSrc );
411
412 g_source_attach( &evSrc->source, d->_ctx );
413
414 } else
415 evSrc = (*itToEvSrc);
416
417 int cond = evModeToMask( mode );
418 auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
419 return currPollFd.pollfd == fd;
420 });
421
422 if ( it != evSrc->pollfds.end() ) {
423 //found
424 it->reqEvents = static_cast<GIOCondition>( cond );
425 g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
426 } else {
427 evSrc->pollfds.push_back(
429 static_cast<GIOCondition>(cond),
430 fd,
431 g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
432 }
433 );
434 }
435}
436
438{
439 Z_D();
440
441 AbstractEventSource *ptr = &notifier;
442
443 if ( notifier.eventDispatcher().lock().get() != this )
444 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
445
446 auto &evList = d->_eventSources;
447 auto it = std::find_if( evList.begin(), evList.end(), [ ptr ]( const auto elem ){ return elem->eventSource == ptr; } );
448
449 if ( it == evList.end() )
450 return;
451
452 auto &fdList = (*it)->pollfds;
453
454 if ( fd == -1 ) {
455 //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
456 //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
457 //for the fd's
458 for ( auto &pFD : fdList ) {
459 if ( pFD.tag )
460 g_source_remove_unix_fd( &(*it)->source, pFD.tag );
461 pFD.pollfd = -1;
462 pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
463 }
464 } else {
465 auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
466 if ( fdIt != fdList.end() ) {
467 if ( fdIt->tag )
468 g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
469 //also do not remove here, mark as orphaned only to not break iterating in dispatch()
470 fdIt->tag = nullptr;
471 fdIt->pollfd = -1;
472 }
473 }
474}
475
477{
478 Z_D();
479 //make sure timer is not double registered
480 for ( const GLibTimerSource *t : d->_runningTimers ) {
481 if ( t->_t == &timer )
482 return;
483 }
484
486 newSrc->_t = &timer;
487 d->_runningTimers.push_back( newSrc );
488
489 g_source_attach( &newSrc->source, d->_ctx );
490}
491
493{
494 Z_D();
495 auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ]( const GLibTimerSource *src ){
496 return src->_t == &timer;
497 });
498
499 if ( it != d->_runningTimers.end() ) {
500 GLibTimerSource *src = *it;
501 d->_runningTimers.erase( it );
503 }
504}
505
507{
508 return d_func()->_ctx;
509}
510
511bool EventDispatcher::waitForFdEvent( const int fd, int events , int &revents , int &timeout )
512{
513 GPollFD pollFd;
514 pollFd.fd = fd;
515 pollFd.events = evModeToMask(events);
516
517 bool eventTriggered = false;
518 zypp::AutoDispose<GTimer *> timer( g_timer_new(), &g_timer_destroy );
519 while ( !eventTriggered ) {
520 g_timer_start( *timer );
521 const int res = eintrSafeCall( g_poll, &pollFd, 1, timeout );
522 switch ( res ) {
523 case 0: //timeout
524 timeout = 0;
525 return false;
526 case -1: { // interrupt
527 // if timeout is -1 we wait until eternity
528 if ( timeout == -1 )
529 continue;
530
531 timeout -= g_timer_elapsed( *timer, nullptr );
532 if ( timeout < 0 ) timeout = 0;
533 if ( timeout <= 0 )
534 return false;
535
536 ERR << "g_poll error: " << strerror(errno) << std::endl;
537 return false;
538 }
539 case 1:
540 eventTriggered = true;
541 break;
542 }
543 }
544
545 revents = gioConditionToEventTypes( (GIOCondition)pollFd.revents, evModeToMask(events) );
546 return true;
547}
548
549void EventDispatcher::trackChildProcess( int pid, std::function<void (int, int)> callback )
550{
551 Z_D();
552 GlibWaitPIDData data ( pid );
553 data.callback = std::move(callback);
554
555 g_source_set_callback ( data.source, reinterpret_cast<GSourceFunc>(&EventDispatcherPrivate::waitPidCallback), d_ptr.get(), nullptr );
556 data.tag = g_source_attach ( data.source, d->_ctx );
557 d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
558}
559
561{
562 Z_D();
563 try {
564 d->_waitPIDs.erase( pid );
565 } catch ( const std::out_of_range &e ) {
566 return false;
567 }
568 return true;
569}
570
572{
573 Z_D();
574 // lazy init
575 UnixSignalSourceRef r;
576 if ( d->_signalSource.expired ()) {
577 d->_signalSource = r = UnixSignalSource::create();
578 } else {
579 r = d->_signalSource.lock ();
580 }
581 return r;
582}
583
585{
586 return g_main_context_iteration( d_func()->_ctx, false );
587}
588
590{
591 auto d = instance()->d_func();
592 d->_idleFuncs.push( std::move(callback) );
593 d->enableIdleSource();
594}
595
597{
598 GlibTimeoutData *userData = new GlibTimeoutData();
599 userData->_callback = std::move(callback);
600
601 GSource *source = g_timeout_source_new ( timeout );
602 g_source_set_callback (source, G_SOURCE_FUNC(&EventDispatcherPrivate::timeoutCallback), userData, &EventDispatcherPrivate::timeoutDestroyCallback );
603 g_source_attach (source, d_func()->_ctx );
604 g_source_unref (source);
605}
606
607void EventDispatcher::unrefLaterImpl( std::shared_ptr<void> &&ptr )
608{
609 Z_D();
610 d->_unrefLater.push_back( std::move(ptr) );
611 d->enableIdleSource();
612}
613
615{
616 d_func()->_unrefLater.clear();
617}
618
620{
621 return d_func()->_runningTimers.size();
622}
623
624std::shared_ptr<EventDispatcher> EventDispatcher::instance()
625{
627}
628
629void EventDispatcher::setThreadDispatcher(const std::shared_ptr<EventDispatcher> &disp)
630{
632}
633
634}
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition Exception.h:459
#define ERR
Definition Logger.h:102
struct _GPollFD GPollFD
Definition ZYppImpl.h:26
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition AutoDispose.h:95
Base class for Exception.
Definition Exception.h:153
std::weak_ptr< EventDispatcher > eventDispatcher() const
virtual void onFdReady(int fd, int events)=0
BasePrivate(Base &b)
Definition base_p.h:17
std::shared_ptr< T > shared_this() const
Definition base.h:113
std::unique_ptr< BasePrivate > d_ptr
Definition base.h:174
std::queue< EventDispatcher::IdleFunction > _idleFuncs
std::vector< std::shared_ptr< void > > _unrefLater
static bool timeoutCallback(gpointer user_data)
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
std::unordered_map< int, GlibWaitPIDData > _waitPIDs
std::vector< GAbstractEventSource * > _eventSources
std::vector< GLibTimerSource * > _runningTimers
static void timeoutDestroyCallback(gpointer user_data)
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
static std::shared_ptr< EventDispatcher > create()
friend class AbstractEventSource
UnixSignalSourceRef unixSignalSource()
virtual void registerTimer(Timer &timer)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
std::function< bool()> TimeoutFunction
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
std::function< bool()> IdleFunction
EventDispatcher(void *ctx=nullptr)
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
virtual void removeTimer(Timer &timer)
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource &notifier, int fd, int mode)
virtual void removeEventSource(AbstractEventSource &notifier, int fd=-1)
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
static std::shared_ptr< EventDispatcher > instance()
void invokeOnIdleImpl(IdleFunction &&callback)
void invokeAfterImpl(TimeoutFunction &&callback, uint32_t timeout)
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
static UnixSignalSourceRef create()
Callbacks light.
Definition Callback.h:146
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user.
static GSourceFuncs glibTimerSourceFuncs
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
std::string strerror(int errno_r) ZYPP_API
Return string describing the error_r code.
Definition String.cc:56
auto eintrSafeCall(Fun &&function, Args &&... args)
static int evModeToMask(int mode)
static int writeMask()
static GSourceFuncs abstractEventSourceFuncs
static int readMask()
static int excpMask()
static gboolean check(GSource *source)
std::vector< GUnixPollFD > pollfds
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
static void destruct(GAbstractEventSource *src)
static gboolean prepare(GSource *, gint *timeout)
static void destruct(GLibTimerSource *src)
static gboolean prepare(GSource *src, gint *timeout)
static gboolean check(GSource *source)
static GLibTimerSource * create()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
std::function< bool()> _callback
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
EventDispatcher::WaitPidCallback callback
std::shared_ptr< EventDispatcher > dispatcher()
Definition threaddata.cc:57
static ZYPP_API ThreadData & current()
Definition threaddata.cc:16
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
Definition threaddata.cc:42
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105