libzypp 17.37.17
asyncdatasource.cpp
Go to the documentation of this file.
2
4#include <zypp-core/zyppng/base/AutoDisconnect>
5#include <zypp-core/zyppng/base/EventDispatcher>
7
8namespace zyppng {
9
11 {
12 if ( _writeNotifier.get() == &notify ) {
13 if ( evTypes & SocketNotifier::Error ) {
14 DBG << "Closing due to error when polling" << std::endl;
16 return;
17 }
18 readyWrite();
19 } else {
20
21 auto dev = std::find_if( _readFds.begin(), _readFds.end(),
22 [ &notify ]( const auto &dev ){ return ( dev._readNotifier.get() == &notify ); } );
23
24 if ( dev == _readFds.end() ) {
25 return;
26 }
27
28 readyRead( std::distance( _readFds.begin(), dev ) );
29 }
30 }
31
33 {
34 auto bytesToRead = z_func()->rawBytesAvailable( channel );
35 if ( bytesToRead == 0 ) {
36 // make sure to check if bytes are available even if the ioctl call returns something different
37 bytesToRead = 4096;
38 }
39
40 auto &_readBuf = _readChannels[channel];
41 char *buf = _readBuf.reserve( bytesToRead );
42 const auto bytesRead = z_func()->readData( channel, buf, bytesToRead );
43
44 if ( bytesRead <= 0 ) {
45 _readBuf.chop( bytesToRead );
46
47 switch( bytesRead ) {
48 // remote close , close the read channel
49 case 0: {
51 break;
52 }
53 // no data is available , just try again later
54 case -2: break;
55 // anything else
56 default:
57 case -1: {
59 break;
60 }
61 }
62 return;
63 }
64
65 if ( bytesToRead > bytesRead )
66 _readBuf.chop( bytesToRead-bytesRead );
67
68 if ( channel == _currentReadChannel )
69 _readyRead.emit();
70
71 _channelReadyRead.emit( channel );
72 return;
73 }
74
76 {
77 const auto nwrite = _writeBuffer.frontSize();
78 if ( !nwrite ) {
79 // disable Write notifications so we do not wake up without the need to
80 _writeNotifier->setEnabled( false );
81 return;
82 }
83
84 const auto nBuf = _writeBuffer.front();
85 const auto written = eintrSafeCall( ::write, _writeFd, nBuf, nwrite );
86 if ( written == -1 ) {
87 switch ( errno ) {
88 case EACCES:
90 return;
91 case EAGAIN:
92#if EAGAIN != EWOULDBLOCK
93 case EWOULDBLOCK:
94#endif
95 return;
96 case EPIPE:
97 case ECONNRESET:
99 return;
100 default:
102 return;
103 }
104 return;
105 }
106 _writeBuffer.discard( written );
107 _sigBytesWritten.emit( written );
108
109 if ( _writeBuffer.size() == 0 )
110 _sigAllBytesWritten.emit();
111 }
112
114 {
115 bool sig = _writeFd >= 0;
116 _writeNotifier.reset();
117 _writeFd = -1;
118 _writeBuffer.clear();
120 if ( sig )
121 _sigWriteFdClosed.emit( reason );
122 }
123
125 {
126 auto &readFd = _readFds[channel];
127 // we do not clear the read buffer so code has the opportunity to read whats left in there
128 bool sig = readFd._readFd >= 0;
129 readFd._readNotifier.reset();
130 readFd._readFd = -1;
131 if ( sig ) {
132 z_func()->finishReadChannel( channel );
133 _sigReadFdClosed.emit( channel, reason );
134 }
135 }
136
138
141
145
147 {
148 return std::shared_ptr<AsyncDataSource>( new AsyncDataSource );
149 }
150
151
152 bool AsyncDataSource::openFds ( const std::vector<int>& readFds, int writeFd )
153 {
154 Z_D();
155
156 if ( d->_mode != IODevice::Closed )
157 return false;
158
159 IODevice::OpenMode mode;
160
161 bool error = false;
162 for ( const auto readFd : readFds ) {
163 if ( readFd >= 0 ) {
164 mode |= IODevice::ReadOnly;
165 d->_readFds.push_back( {
166 readFd,
168 });
170 ERR << "Failed to set read FD to non blocking" << std::endl;
171 error = true;
172 break;
173 }
174 d->_readFds.back()._readNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
175 }
176 }
177
178 if ( writeFd >= 0 && !error ) {
179 mode |= IODevice::WriteOnly;
181 ERR << "Failed to set write FD to non blocking" << std::endl;
182 error = true;
183 } else {
184 d->_writeFd = writeFd;
185 d->_writeNotifier = SocketNotifier::create( writeFd, SocketNotifier::Write | AbstractEventSource::Error, false );
187 }
188 }
189
190 if( error || !IODevice::open( mode ) ) {
191 d->_mode = IODevice::Closed;
192 d->_readFds.clear();
193 d->_writeNotifier.reset();
194 d->_writeFd = -1;
195 return false;
196 }
197
198 // make sure we have enough read buffers
199 setReadChannelCount( d->_readFds.size() );
200 return true;
201 }
202
203 int64_t zyppng::AsyncDataSource::writeData( const char *data, int64_t count )
204 {
205 Z_D();
206 if ( count > 0 ) {
207 // we always use the write buffer, to make sure the fd is actually writeable
208 d->_writeBuffer.append( data, count );
209 d->_writeNotifier->setEnabled( true );
210 }
211 return count;
212 }
213
214 int64_t zyppng::AsyncDataSource::readData( uint channel, char *buffer, int64_t bufsize )
215 {
216 Z_D();
217 if ( channel >= d->_readFds.size() ) {
218 ERR << constants::outOfRangeErrMsg << std::endl;
219 throw std::logic_error( constants::outOfRangeErrMsg.data() );
220 }
221 const auto read = eintrSafeCall( ::read, d->_readFds[channel]._readFd, buffer, bufsize );
222 if ( read < 0 ) {
223 switch ( errno ) {
224 #if EAGAIN != EWOULDBLOCK
225 case EWOULDBLOCK:
226 #endif
227 case EAGAIN: {
228 return -2;
229 }
230 default:
231 break;
232 }
233 }
234 return read;
235 }
236
237 int64_t AsyncDataSource::rawBytesAvailable( uint channel ) const
238 {
239 Z_D();
240
241 if ( channel >= d->_readFds.size() ) {
242 ERR << constants::outOfRangeErrMsg << std::endl;
243 throw std::logic_error( constants::outOfRangeErrMsg.data() );
244 }
245
246 if ( isOpen() && canRead() )
247 return zyppng::bytesAvailableOnFD( d->_readFds[channel]._readFd );
248 return 0;
249 }
250
252 {
253 Z_D();
254 if ( channel >= d->_readFds.size() ) {
255 ERR << constants::outOfRangeErrMsg << std::endl;
256 throw std::logic_error( constants::outOfRangeErrMsg.data() );
257 }
258 }
259
261 {
262 Z_D();
263 for( uint i = 0; i < d->_readFds.size(); ++i ) {
264 auto &readChan = d->_readFds[i];
265 readChan._readNotifier.reset();
266 if ( readChan._readFd >= 0) {
268 d->_sigReadFdClosed.emit( i, UserRequest );
269 }
270 }
271 d->_readFds.clear();
272
273 d->_writeNotifier.reset();
274 d->_writeBuffer.clear();
275 if ( d->_writeFd >= 0 ) {
276 d->_writeFd = -1;
277 d->_sigWriteFdClosed.emit( UserRequest );
278 }
279
281 }
282
284 {
285 Z_D();
286
287 // if we are open writeOnly, simply call close();
288 if ( !canRead() ) {
289 close();
290 return;
291 }
292
293 d->_mode = ReadOnly;
294 d->_writeNotifier.reset();
295 d->_writeBuffer.clear();
296
297 if ( d->_writeFd >= 0 ) {
298 d->_writeFd = -1;
299 d->_sigWriteFdClosed.emit( UserRequest );
300 }
301 }
302
303 bool AsyncDataSource::waitForReadyRead( uint channel, int timeout )
304 {
305 Z_D();
306 if ( !canRead() )
307 return false;
308
309 if ( channel >= d->_readFds.size() ) {
310 ERR << constants::outOfRangeErrMsg << std::endl;
311 throw std::logic_error( constants::outOfRangeErrMsg.data() );
312 }
313
314 bool gotRR = false;
315 auto rrConn = AutoDisconnect( d->_channelReadyRead.connect([&]( uint activated ){
316 gotRR = ( channel == activated );
317 }) );
318
319 // we can only wait if we are open for reading and still have a valid fd
320 auto &channelRef = d->_readFds[ channel ];
321 while ( readFdOpen(channel) && canRead() && !gotRR ) {
322 int rEvents = 0;
323 if ( EventDispatcher::waitForFdEvent( channelRef._readFd, AbstractEventSource::Read | AbstractEventSource::Error , rEvents, timeout ) ) {
324 //simulate signal from read notifier
325 d->notifierActivated( *channelRef._readNotifier, rEvents );
326 } else {
327 //timeout
328 return false;
329 }
330 }
331 return gotRR;
332 }
333
335 {
336 Z_D();
337 if ( !canWrite() )
338 return;
339
340 int timeout = -1;
341 while ( canWrite() && d->_writeBuffer.frontSize() ) {
342 int rEvents = 0;
344 //simulate signal from write notifier
345 d->readyWrite();
346 } else {
347 //timeout
348 return;
349 }
350 }
351 }
352
354 {
355 return d_func()->_sigWriteFdClosed;
356 }
357
359 {
360 return d_func()->_sigReadFdClosed;
361 }
362
364 {
365 Z_D();
366 if ( !d->_readChannels.size() )
367 return false;
368 return readFdOpen( d_func()->_currentReadChannel );
369 }
370
371 bool AsyncDataSource::readFdOpen(uint channel) const
372 {
373 Z_D();
374 if ( channel >= d->_readFds.size() ) {
375 ERR << constants::outOfRangeErrMsg << std::endl;
376 throw std::logic_error( constants::outOfRangeErrMsg.data() );
377 }
378 auto &channelRef = d->_readFds[ channel ];
379 return ( channelRef._readNotifier && channelRef._readFd >= 0 );
380 }
381
383 {
384 return d_func()->_writeBuffer.size();
385 }
386
387}
void closeReadChannel(uint channel, AsyncDataSource::ChannelCloseReason reason)
void closeWriteChannel(AsyncDataSource::ChannelCloseReason reason)
void notifierActivated(const SocketNotifier &notify, int evTypes)
Signal< void(uint, AsyncDataSource::ChannelCloseReason)> _sigReadFdClosed
std::vector< ReadChannelDev > _readFds
Signal< void(AsyncDataSource::ChannelCloseReason)> _sigWriteFdClosed
int64_t writeData(const char *data, int64_t count) override
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
int64_t bytesPending() const override
void readChannelChanged(uint channel) override
bool waitForReadyRead(uint channel, int timeout) override
bool openFds(const std::vector< int > &readFds, int writeFd=-1)
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
std::shared_ptr< AsyncDataSource > Ptr
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
int64_t rawBytesAvailable(uint channel) const override
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
Signal< void()> _sigAllBytesWritten
Definition iodevice_p.h:47
Signal< void(int64_t)> _sigBytesWritten
Definition iodevice_p.h:46
std::vector< IOBuffer > _readChannels
Definition iodevice_p.h:39
Signal< void() > _readyRead
Definition iodevice_p.h:44
IODevice::OpenMode _mode
Definition iodevice_p.h:43
Signal< void(uint) > _channelReadyRead
Definition iodevice_p.h:45
void setReadChannelCount(uint channels)
Definition iodevice.cc:37
bool canWrite() const
Definition iodevice.cc:90
void finishReadChannel(uint channel)
Definition iodevice.cc:44
virtual void close()
Definition iodevice.cc:30
bool canRead() const
Definition iodevice.cc:85
bool isOpen() const
Definition iodevice.cc:95
ByteArray read(int64_t maxSize)
Definition iodevice.cc:127
virtual bool open(const OpenMode mode)
Definition iodevice.cc:16
static Ptr create(int socket, int evTypes, bool enable=true)
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
@ FailedToSetMode
Failed to block or unblock the fd.
Definition IOTools.h:23
BlockingMode setFDBlocking(int fd, bool mode)
Definition IOTools.cc:31
constexpr std::string_view outOfRangeErrMsg("Channel index out of range")
auto eintrSafeCall(Fun &&function, Args &&... args)
int64_t bytesAvailableOnFD(int fd)
#define DBG
Definition Logger.h:99
#define ERR
Definition Logger.h:102
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105