libzypp 17.37.17
eventdispatcher_glib.cc
Go to the documentation of this file.
1#include "timer.h"
5
9#include <zypp-core/zyppng/base/UnixSignalSource>
10
11namespace zyppng {
12
13static int inline readMask () {
14 return ( G_IO_IN | G_IO_HUP );
15}
16
17static int inline writeMask () {
18 return ( G_IO_OUT );
19}
20
21static int inline excpMask () {
22 return ( G_IO_PRI );
23}
24
25static int inline evModeToMask ( int mode ) {
26 int cond = 0;
27 if ( mode & AbstractEventSource::Read ) {
28 cond = readMask() | G_IO_ERR;
29 }
30 if ( mode & AbstractEventSource::Write ) {
31 cond = cond | writeMask() | G_IO_ERR;
32 }
33 if ( mode & AbstractEventSource::Exception ) {
34 cond = cond | excpMask() | G_IO_ERR;
35 }
36 return cond;
37}
38
39static int inline gioConditionToEventTypes ( const GIOCondition rEvents, const int requestedEvs ) {
40 int ev = 0;
41 if ( ( rEvents & requestedEvs ) != 0 ) {
42 if ( ( rEvents & readMask() ) && ( requestedEvs & readMask() ) )
44 if ( ( rEvents & writeMask() ) && ( requestedEvs & writeMask() ) )
46 if ( ( rEvents & excpMask()) && ( requestedEvs & excpMask() ) )
48 if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
50 }
51 return ev;
52}
53
54static GSourceFuncs abstractEventSourceFuncs = {
58 nullptr,
59 nullptr,
60 nullptr
61};
62
64 GAbstractEventSource *src = nullptr;
65 src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
66 (void) new (&src->pollfds) std::vector<GUnixPollFD>();
67
68 src->eventSource = nullptr;
69 src->_ev = ev;
70 return src;
71}
72
74{
75 for ( GUnixPollFD &fd : src->pollfds ) {
76 if ( fd.tag )
77 g_source_remove_unix_fd( &src->source, fd.tag );
78 }
79
80 src->pollfds.clear();
81 src->pollfds.std::vector< GUnixPollFD >::~vector();
82 g_source_destroy( &src->source );
83 g_source_unref( &src->source );
84}
85
86gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
87{
88 //we can not yet determine if the GSource is ready, polling FDs also have no
89 //timeout, so lets continue
90 if ( timeout )
91 *timeout = -1;
92 return false;
93}
94
95//here we need to figure out which FDs are pending
97{
98 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
99
100 //check for pending and remove orphaned entries
101 bool hasPending = false;
102
103 for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
104 if ( fdIt->tag == nullptr ) {
105 //this pollfd was removed, clear it from the list
106 //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
107 //next check it is removed for good
108 fdIt = src->pollfds.erase( fdIt );
109 } else {
110 GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
111 if ( pendEvents & G_IO_NVAL ){
112 //that poll is broken, do we need to do more????
113 fdIt = src->pollfds.erase( fdIt );
114 } else {
115 hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
116 fdIt++;
117 }
118 }
119 }
120
121 //if the pollfds are empty trigger dispatch so this source can be removed
122 return hasPending || src->pollfds.empty();
123}
124
125//Trigger all event sources that have been activated
126gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
127{
128 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
129
130 if ( !src )
131 return G_SOURCE_REMOVE;
132
133 //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
134 //were we trigger all ready FDs
135 if ( src->pollfds.empty() ) {
136 auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
137
138 if ( it != src->_ev->_eventSources.end() ) {
140 src->_ev->_eventSources.erase( it );
141 return G_SOURCE_REMOVE;
142 }
143 }
144
145 for ( const GUnixPollFD &pollfd : src->pollfds ) {
146 //do not trigger orphaned ones
147 if ( pollfd.tag != nullptr ) {
148 GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
149
150 if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
151 int ev = gioConditionToEventTypes( pendEvents, pollfd.reqEvents );
152 // we require all event objects to be used in shared_ptr form, by doing this we make sure that the object is not destroyed
153 // while we still use it. However this WILL throw in case of using the EventSource outside of shared_ptr bounds
154 auto eventSourceLocked = src->eventSource->shared_this<AbstractEventSource>();
155 eventSourceLocked->onFdReady( pollfd.pollfd, ev );
156 }
157 }
158 }
159
160 return G_SOURCE_CONTINUE;
161}
162
163static GSourceFuncs glibTimerSourceFuncs = {
167 nullptr,
168 nullptr,
169 nullptr
170};
171
172//check when this timer expires and set the correct timeout
173gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
174{
175 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
176 if ( !source )
177 return false; //not ready for dispatch
178
179 if ( !source->_t )
180 return false;
181
182 uint64_t nextTimeout = source->_t->remaining();
183 if ( timeout ) {
184 //this would be a really looong timeout, but be safe
185 if ( nextTimeout > G_MAXINT )
186 *timeout = G_MAXINT;
187 else
188 *timeout = static_cast<gint>( nextTimeout );
189 }
190 return ( nextTimeout == 0 );
191}
192
193//this is essentially the same as prepare
195{
196 return prepare( source, nullptr );
197}
198
199//emit the expired timers, restart timers that are no single shots
200gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
201{
202 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
203 if ( !source )
204 return true;
205
206 if ( source->_t == nullptr )
207 return true;
208 //this will emit the expired signal and reset the timer
209 //or stop it in case its a single shot timer
210 source->_t->shared_this<Timer>()->expire();
211 return true;
212}
213
215{
216 GLibTimerSource *src = nullptr;
217 src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
218 src->_t = nullptr;
219 return src;
220}
221
223{
224 g_source_destroy( &src->source );
225 g_source_unref( &src->source );
226}
227
231static gboolean eventLoopIdleFunc ( gpointer user_data )
232{
233 auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
234 if ( dPtr ) {
235 if( dPtr->runIdleTasks() ) {
236 return G_SOURCE_CONTINUE;
237 }
238
239 g_source_unref ( dPtr->_idleSource );
240 dPtr->_idleSource = nullptr;
241 }
242 return G_SOURCE_REMOVE;
243}
244
246{
247 source = g_child_watch_source_new( pid );
248}
249
251 : tag( other.tag )
252 , source( other.source )
253 , callback( std::move( other.callback ) )
254{
255 other.source = nullptr;
256}
257
259{
260 if ( source ) {
261 g_source_destroy( source );
262 g_source_unref( source );
263 }
264}
265
267{
268 tag = other.tag;
269 source = other.source;
270 callback = std::move( other.callback );
271 other.source = nullptr;
272 return *this;
273}
274
276{
277 _myThreadId = std::this_thread::get_id();
278
279 //if we get a context specified ( usually when created for main thread ) we use it
280 //otherwise we create our own
281 if ( ctx ) {
282 _ctx = ctx;
283 g_main_context_ref ( _ctx );
284 } else {
285 _ctx = g_main_context_new();
286 }
287 // Enable this again once we switch to a full async API that requires a eventloop before calling any zypp functions
288 // g_main_context_push_thread_default( _ctx );
289}
290
292{
293 std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
294 GLibTimerSource::destruct( src );
295 });
296 std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
297 GAbstractEventSource::destruct( src );
298 });
299 _runningTimers.clear();
300 _eventSources.clear();
301
302 if ( _idleSource ) {
303 g_source_destroy( _idleSource );
304 g_source_unref ( _idleSource );
305 }
306
307 //g_main_context_pop_thread_default( _ctx );
308 g_main_context_unref( _ctx );
309}
310
312{
313 //run all user defined idle functions
314 //if they return true, they are executed again in the next idle run
315 decltype ( _idleFuncs ) runQueue;
316 runQueue.swap( _idleFuncs );
317
318 while ( runQueue.size() ) {
319 EventDispatcher::IdleFunction fun( std::move( runQueue.front() ) );
320 runQueue.pop();
321 if ( fun() )
322 _idleFuncs.push( std::move(fun) );
323 }
324
325 //keep this as the last thing to call after all user code was executed
326 if ( _unrefLater.size() )
327 _unrefLater.clear();
328
329 return _idleFuncs.size() || _unrefLater.size();
330}
331
333{
334 if ( !_idleSource ) {
335 _idleSource = g_idle_source_new ();
336 g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
337 g_source_attach ( _idleSource, _ctx );
338 }
339}
340
341std::shared_ptr<EventDispatcher> EventDispatcherPrivate::create()
342{
343 return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
344}
345
346void EventDispatcherPrivate::waitPidCallback( GPid pid, gint status, gpointer user_data )
347{
348 EventDispatcherPrivate *that = reinterpret_cast<EventDispatcherPrivate *>( user_data );
349
350 try {
351 auto data = std::move( that->_waitPIDs.at(pid) );
352 that->_waitPIDs.erase( pid );
353
354 if ( data.callback )
355 data.callback( pid, status );
356
357 g_spawn_close_pid( pid );
358
359 // no need to take care of releasing the GSource, the event loop took care of that
360
361 } catch ( const std::out_of_range &e ) {
362 return;
363 }
364}
365
367
369 : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx), *this ) )
370{
371}
372
376
378{
379 Z_D();
380 if ( notifier.eventDispatcher().lock().get() != this )
381 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
382
383 AbstractEventSource *notifyPtr = &notifier;
384
385 GAbstractEventSource *evSrc = nullptr;
386 auto &evSrcList = d->_eventSources;
387 auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ]( const auto elem ){ return elem->eventSource == notifyPtr; } );
388 if ( itToEvSrc == evSrcList.end() ) {
389
390 evSrc = GAbstractEventSource::create( d );
391 evSrc->eventSource = notifyPtr;
392 evSrcList.push_back( evSrc );
393
394 g_source_attach( &evSrc->source, d->_ctx );
395
396 } else
397 evSrc = (*itToEvSrc);
398
399 int cond = evModeToMask( mode );
400 auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
401 return currPollFd.pollfd == fd;
402 });
403
404 if ( it != evSrc->pollfds.end() ) {
405 //found
406 it->reqEvents = static_cast<GIOCondition>( cond );
407 g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
408 } else {
409 evSrc->pollfds.push_back(
411 static_cast<GIOCondition>(cond),
412 fd,
413 g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
414 }
415 );
416 }
417}
418
420{
421 Z_D();
422
423 AbstractEventSource *ptr = &notifier;
424
425 if ( notifier.eventDispatcher().lock().get() != this )
426 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
427
428 auto &evList = d->_eventSources;
429 auto it = std::find_if( evList.begin(), evList.end(), [ ptr ]( const auto elem ){ return elem->eventSource == ptr; } );
430
431 if ( it == evList.end() )
432 return;
433
434 auto &fdList = (*it)->pollfds;
435
436 if ( fd == -1 ) {
437 //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
438 //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
439 //for the fd's
440 for ( auto &pFD : fdList ) {
441 if ( pFD.tag )
442 g_source_remove_unix_fd( &(*it)->source, pFD.tag );
443 pFD.pollfd = -1;
444 pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
445 }
446 } else {
447 auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
448 if ( fdIt != fdList.end() ) {
449 if ( fdIt->tag )
450 g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
451 //also do not remove here, mark as orphaned only to not break iterating in dispatch()
452 fdIt->tag = nullptr;
453 fdIt->pollfd = -1;
454 }
455 }
456}
457
459{
460 Z_D();
461 //make sure timer is not double registered
462 for ( const GLibTimerSource *t : d->_runningTimers ) {
463 if ( t->_t == &timer )
464 return;
465 }
466
468 newSrc->_t = &timer;
469 d->_runningTimers.push_back( newSrc );
470
471 g_source_attach( &newSrc->source, d->_ctx );
472}
473
475{
476 Z_D();
477 auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ]( const GLibTimerSource *src ){
478 return src->_t == &timer;
479 });
480
481 if ( it != d->_runningTimers.end() ) {
482 GLibTimerSource *src = *it;
483 d->_runningTimers.erase( it );
485 }
486}
487
489{
490 return d_func()->_ctx;
491}
492
493bool EventDispatcher::waitForFdEvent( const int fd, int events , int &revents , int &timeout )
494{
495 GPollFD pollFd;
496 pollFd.fd = fd;
497 pollFd.events = evModeToMask(events);
498
499 bool eventTriggered = false;
500 zypp::AutoDispose<GTimer *> timer( g_timer_new(), &g_timer_destroy );
501 while ( !eventTriggered ) {
502 g_timer_start( *timer );
503 const int res = eintrSafeCall( g_poll, &pollFd, 1, timeout );
504 switch ( res ) {
505 case 0: //timeout
506 timeout = 0;
507 return false;
508 case -1: { // interrupt
509 // if timeout is -1 we wait until eternity
510 if ( timeout == -1 )
511 continue;
512
513 timeout -= g_timer_elapsed( *timer, nullptr );
514 if ( timeout < 0 ) timeout = 0;
515 if ( timeout <= 0 )
516 return false;
517
518 ERR << "g_poll error: " << strerror(errno) << std::endl;
519 return false;
520 }
521 case 1:
522 eventTriggered = true;
523 break;
524 }
525 }
526
527 revents = gioConditionToEventTypes( (GIOCondition)pollFd.revents, evModeToMask(events) );
528 return true;
529}
530
531void EventDispatcher::trackChildProcess( int pid, std::function<void (int, int)> callback )
532{
533 Z_D();
534 GlibWaitPIDData data ( pid );
535 data.callback = std::move(callback);
536
537 g_source_set_callback ( data.source, reinterpret_cast<GSourceFunc>(&EventDispatcherPrivate::waitPidCallback), d_ptr.get(), nullptr );
538 data.tag = g_source_attach ( data.source, d->_ctx );
539 d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
540}
541
543{
544 Z_D();
545 try {
546 d->_waitPIDs.erase( pid );
547 } catch ( const std::out_of_range &e ) {
548 return false;
549 }
550 return true;
551}
552
554{
555 Z_D();
556 // lazy init
557 UnixSignalSourceRef r;
558 if ( d->_signalSource.expired ()) {
559 d->_signalSource = r = UnixSignalSource::create();
560 } else {
561 r = d->_signalSource.lock ();
562 }
563 return r;
564}
565
567{
568 return g_main_context_iteration( d_func()->_ctx, false );
569}
570
572{
573 auto d = instance()->d_func();
574 d->_idleFuncs.push( std::move(callback) );
575 d->enableIdleSource();
576}
577
578void EventDispatcher::unrefLaterImpl( std::shared_ptr<void> &&ptr )
579{
580 Z_D();
581 d->_unrefLater.push_back( std::move(ptr) );
582 d->enableIdleSource();
583}
584
586{
587 d_func()->_unrefLater.clear();
588}
589
591{
592 return d_func()->_runningTimers.size();
593}
594
595std::shared_ptr<EventDispatcher> EventDispatcher::instance()
596{
598}
599
600void EventDispatcher::setThreadDispatcher(const std::shared_ptr<EventDispatcher> &disp)
601{
603}
604
605}
struct _GPollFD GPollFD
Definition ZYppImpl.h:26
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition AutoDispose.h:95
Base class for Exception.
Definition Exception.h:153
std::weak_ptr< EventDispatcher > eventDispatcher() const
virtual void onFdReady(int fd, int events)=0
BasePrivate(Base &b)
Definition base_p.h:17
std::shared_ptr< T > shared_this() const
Definition base.h:113
std::unique_ptr< BasePrivate > d_ptr
Definition base.h:174
std::queue< EventDispatcher::IdleFunction > _idleFuncs
std::vector< std::shared_ptr< void > > _unrefLater
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
std::unordered_map< int, GlibWaitPIDData > _waitPIDs
std::vector< GAbstractEventSource * > _eventSources
std::vector< GLibTimerSource * > _runningTimers
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
static std::shared_ptr< EventDispatcher > create()
friend class AbstractEventSource
UnixSignalSourceRef unixSignalSource()
virtual void registerTimer(Timer &timer)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
std::function< bool()> IdleFunction
EventDispatcher(void *ctx=nullptr)
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
virtual void removeTimer(Timer &timer)
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource &notifier, int fd, int mode)
virtual void removeEventSource(AbstractEventSource &notifier, int fd=-1)
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
static std::shared_ptr< EventDispatcher > instance()
void invokeOnIdleImpl(IdleFunction &&callback)
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
static UnixSignalSourceRef create()
Callbacks light.
Definition Callback.h:146
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user.
static GSourceFuncs glibTimerSourceFuncs
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
std::string strerror(int errno_r) ZYPP_API
Return string describing the error_r code.
Definition String.cc:56
auto eintrSafeCall(Fun &&function, Args &&... args)
static int evModeToMask(int mode)
static int writeMask()
static GSourceFuncs abstractEventSourceFuncs
static int readMask()
static int excpMask()
static gboolean check(GSource *source)
std::vector< GUnixPollFD > pollfds
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
static void destruct(GAbstractEventSource *src)
static gboolean prepare(GSource *, gint *timeout)
static void destruct(GLibTimerSource *src)
static gboolean prepare(GSource *src, gint *timeout)
static gboolean check(GSource *source)
static GLibTimerSource * create()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
EventDispatcher::WaitPidCallback callback
std::shared_ptr< EventDispatcher > dispatcher()
Definition threaddata.cc:57
static ZYPP_API ThreadData & current()
Definition threaddata.cc:16
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
Definition threaddata.cc:42
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition Exception.h:459
#define ERR
Definition Logger.h:102
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105