9 #include <zypp-core/zyppng/base/UnixSignalSource> 14 return ( G_IO_IN | G_IO_HUP );
41 if ( ( rEvents & requestedEvs ) != 0 ) {
48 if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
66 (void)
new (&src->
pollfds) std::vector<GUnixPollFD>();
77 g_source_remove_unix_fd( &src->
source, fd.
tag );
81 src->
pollfds.std::vector< GUnixPollFD >::~vector();
82 g_source_destroy( &src->
source );
83 g_source_unref( &src->
source );
101 bool hasPending =
false;
103 for (
auto fdIt = src->
pollfds.begin(); fdIt != src->
pollfds.end(); ) {
104 if ( fdIt->tag ==
nullptr ) {
108 fdIt = src->
pollfds.erase( fdIt );
110 GIOCondition pendEvents = g_source_query_unix_fd(
source, fdIt->tag );
111 if ( pendEvents & G_IO_NVAL ){
113 fdIt = src->
pollfds.erase( fdIt );
115 hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
122 return hasPending || src->
pollfds.empty();
131 return G_SOURCE_REMOVE;
141 return G_SOURCE_REMOVE;
147 if ( pollfd.
tag !=
nullptr ) {
148 GIOCondition pendEvents = g_source_query_unix_fd(
source, pollfd.
tag );
150 if ( (pendEvents & pollfd.
reqEvents ) != 0 ) {
160 return G_SOURCE_CONTINUE;
182 uint64_t nextTimeout =
source->_t->remaining();
185 if ( nextTimeout > G_MAXINT )
188 *timeout =
static_cast<gint
>( nextTimeout );
190 return ( nextTimeout == 0 );
206 if (
source->_t ==
nullptr )
224 g_source_destroy( &src->
source );
225 g_source_unref( &src->
source );
235 if( dPtr->runIdleTasks() ) {
236 return G_SOURCE_CONTINUE;
239 g_source_unref ( dPtr->_idleSource );
240 dPtr->_idleSource =
nullptr;
242 return G_SOURCE_REMOVE;
247 source = g_child_watch_source_new( pid );
252 , source( other.source )
253 , callback( std::move( other.callback ) )
255 other.source =
nullptr;
261 g_source_destroy(
source );
269 source = other.source;
270 callback = std::move( other.callback );
271 other.source =
nullptr;
283 g_main_context_ref (
_ctx );
285 _ctx = g_main_context_new();
308 g_main_context_unref(
_ctx );
318 while ( runQueue.size() ) {
351 auto data = std::move( that->
_waitPIDs.at(pid) );
355 data.callback( pid, status );
357 g_spawn_close_pid( pid );
361 }
catch (
const std::out_of_range &e ) {
386 auto &evSrcList = d->_eventSources;
387 auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ](
const auto elem ){
return elem->eventSource == notifyPtr; } );
388 if ( itToEvSrc == evSrcList.end() ) {
392 evSrcList.push_back( evSrc );
394 g_source_attach( &evSrc->
source, d->_ctx );
397 evSrc = (*itToEvSrc);
400 auto it = std::find_if( evSrc->
pollfds.begin(), evSrc->
pollfds.end(), [fd](
const auto &currPollFd ) {
401 return currPollFd.pollfd == fd;
404 if ( it != evSrc->
pollfds.end() ) {
406 it->reqEvents =
static_cast<GIOCondition
>( cond );
407 g_source_modify_unix_fd( &evSrc->
source, it->tag, static_cast<GIOCondition>(cond) );
411 static_cast<GIOCondition
>(cond),
413 g_source_add_unix_fd( &evSrc->
source, fd, static_cast<GIOCondition>(cond) )
428 auto &evList = d->_eventSources;
429 auto it = std::find_if( evList.begin(), evList.end(), [ ptr ](
const auto elem ){
return elem->eventSource == ptr; } );
431 if ( it == evList.end() )
434 auto &fdList = (*it)->pollfds;
440 for (
auto &pFD : fdList ) {
442 g_source_remove_unix_fd( &(*it)->source, pFD.tag );
447 auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ](
const auto &pFd ){
return pFd.pollfd == fd; } );
448 if ( fdIt != fdList.end() ) {
450 g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
463 if ( t->
_t == &timer )
469 d->_runningTimers.push_back( newSrc );
471 g_source_attach( &newSrc->
source, d->_ctx );
477 auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ](
const GLibTimerSource *src ){
478 return src->_t == &timer;
481 if ( it != d->_runningTimers.end() ) {
483 d->_runningTimers.erase( it );
490 return d_func()->_ctx;
499 bool eventTriggered =
false;
501 while ( !eventTriggered ) {
502 g_timer_start( *timer );
503 const int res =
eintrSafeCall( g_poll, &pollFd, 1, timeout );
513 timeout -= g_timer_elapsed( *timer,
nullptr );
514 if ( timeout < 0 ) timeout = 0;
518 ERR <<
"g_poll error: " <<
strerror(errno) << std::endl;
522 eventTriggered =
true;
535 data.
callback = std::move(callback);
538 data.
tag = g_source_attach ( data.
source, d->_ctx );
539 d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
546 d->_waitPIDs.erase( pid );
547 }
catch (
const std::out_of_range &e ) {
557 UnixSignalSourceRef r;
558 if ( d->_signalSource.expired ()) {
561 r = d->_signalSource.lock ();
568 return g_main_context_iteration( d_func()->
_ctx,
false );
574 d->_idleFuncs.push( std::move(callback) );
575 d->enableIdleSource();
581 d->_unrefLater.push_back( std::move(ptr) );
582 d->enableIdleSource();
587 d_func()->_unrefLater.clear();
592 return d_func()->_runningTimers.size();
~EventDispatcherPrivate() override
virtual void removeTimer(Timer &timer)
std::vector< std::shared_ptr< void > > _unrefLater
std::vector< GAbstractEventSource * > _eventSources
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
std::function< bool()> IdleFunction
static UnixSignalSourceRef create()
static void destruct(GAbstractEventSource *src)
virtual void onFdReady(int fd, int events)=0
static std::shared_ptr< EventDispatcher > create()
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
~EventDispatcher() override
static gboolean check(GSource *source)
std::shared_ptr< EventDispatcher > dispatcher()
GlibWaitPIDData(GPid pid)
static GLibTimerSource * create()
EventDispatcher::WaitPidCallback callback
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user...
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
std::string strerror(int errno_r)
Return string describing the error_r code.
static void destruct(GLibTimerSource *src)
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
The Timer class provides repetitive and single-shot timers.
static GSourceFuncs glibTimerSourceFuncs
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource ¬ifier, int fd, int mode)
std::vector< GLibTimerSource * > _runningTimers
static std::shared_ptr< EventDispatcher > instance()
void clearUnrefLaterList()
ulong runningTimers() const
static GSourceFuncs abstractEventSourceFuncs
EventDispatcherPrivate * _ev
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
virtual void registerTimer(Timer &timer)
static gboolean prepare(GSource *, gint *timeout)
static ZYPP_API ThreadData & current()
std::vector< GUnixPollFD > pollfds
virtual void removeEventSource(AbstractEventSource ¬ifier, int fd=-1)
Base class for Exception.
std::queue< EventDispatcher::IdleFunction > _idleFuncs
auto eintrSafeCall(Fun &&function, Args &&... args)
static int evModeToMask(int mode)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static gboolean prepare(GSource *src, gint *timeout)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
std::weak_ptr< EventDispatcher > eventDispatcher() const
AbstractEventSource * eventSource
std::unique_ptr< BasePrivate > d_ptr
bool untrackChildProcess(int pid)
void invokeOnIdleImpl(IdleFunction &&callback)
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
UnixSignalSourceRef unixSignalSource()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
std::shared_ptr< T > shared_this() const
static gboolean check(GSource *source)
std::thread::id _myThreadId
std::unordered_map< int, GlibWaitPIDData > _waitPIDs