libzypp 17.37.17
networkrequestdispatcher.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9#include <zypp-core/Globals.h>
14#include <zypp-core/zyppng/base/Timer>
15#include <zypp-core/zyppng/base/SocketNotifier>
16#include <zypp-core/zyppng/base/EventDispatcher>
18#include <assert.h>
19
20#include <zypp/base/Logger.h>
21#include <zypp/base/String.h>
22#include <zypp-core/base/DtorReset>
23
24using namespace boost;
25
26L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_CURL_DEBUG)
27
28
29namespace zyppng {
30
31static const std::string & defaultAgentString()
32{
33 // we need to add the release and identifier to the
34 // agent string.
35 // The target could be not initialized, and then this information
36 // is guessed.
37 static const std::string _value(
39 "ZYpp " LIBZYPP_VERSION_STRING " (curl %s)"
40 , curl_version_info(CURLVERSION_NOW)->version
41 )
42 );
43 return _value;
44}
45
46
48 : BasePrivate( p )
49 , _timer( Timer::create() )
50 , _multi ( curl_multi_init() )
52{
54
55 curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
56 curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
57 curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
58 curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
59
60 // disabled explicit pipelining since it breaks our tests on releases < 15.2
61 // we could consider enabling it starting with a specific CURL version
62 // curl_multi_setopt( _multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX|CURLPIPE_HTTP1 );
63
64 _timer->setSingleShot( true );
66}
67
73
74//called by curl to setup a timer
75int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
76{
77 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
78 assert( that != nullptr );
79
80 if ( timeout_ms >= 0 ) {
81 that->_timer->start( static_cast<uint64_t>(timeout_ms) );
82 } else {
83 //cancel the timer
84 that->_timer->stop();
85 }
86 return 0;
87}
88
90{
91 handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
92}
93
94int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
95{
96 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
97 assert( that != nullptr );
98 return that->socketCallback( easy, s, what, socketp );
99}
100
101int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
102{
103 std::shared_ptr<SocketNotifier> socketp;
104
105 if ( _socketHandler.count( s ) == 0 ) {
106 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
107 return 0;
108
109 socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
110 _socketHandler.insert( std::make_pair( s, socketp ) );
111
113 } else {
114 socketp = _socketHandler[s];
115 }
116
117 //should never happen
118 if ( !socketp ) {
119 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
120 return 0;
121
122 if ( _socketHandler.count( s ) > 0 )
123 _socketHandler.erase( s );
124
125 void *privatePtr = nullptr;
126 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127 privatePtr = nullptr; //make sure this was not filled with bad info
128 }
129
130 if ( privatePtr ) {
131 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
132 //we stop the download, if we can not listen for socket changes we can not correctly do anything
133 setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
134 return 0;
135 } else {
136 //a broken handle without anything assigned, also should never happen but make sure and clean it up
137 WAR << "Cleaning up unassigned easy handle" << std::endl;
138 curl_multi_remove_handle( _multi, easy );
139 curl_easy_cleanup( easy );
140 return 0;
141 }
142 }
143
144 //remove the socket
145 if ( what == CURL_POLL_REMOVE ) {
146 socketp->setEnabled( false );
147 _socketHandler.erase( s );
148 return 0;
149 }
150
151 if ( what == CURL_POLL_IN ) {
152 socketp->setMode( SocketNotifier::Read );
153 } else if ( what == CURL_POLL_OUT ) {
154 socketp->setMode( SocketNotifier::Write );
155 } else if ( what == CURL_POLL_INOUT ) {
156 socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
157 }
158
159 socketp->setEnabled();
160 return 0;
161}
162
164{
165 int evBitmask = 0;
166 if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167 evBitmask |= CURL_CSELECT_IN;
169 evBitmask |= CURL_CSELECT_OUT;
171 evBitmask |= CURL_CSELECT_ERR;
172
173 handleMultiSocketAction( listener.socket(), evBitmask );
174}
175
176void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
177{
178 int running = 0;
179
180 // when inside a curl callback we can not call another multi curl API,
181 // for now just lock the thing, but we should consider rewriting this
182 // to post events instead of doing direct calls simply to decouple from
183 // that limitation
184 CURLMcode rc = CURLM_OK;
185 {
186 zypp::DtorReset lockSet( _locked );
187 _locked = true;
188 rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
189 }
190 if (rc != 0) {
191 //we can not recover from a error like that, cancel all and stop
193 cancelAll( err );
194 //emit error
195 _lastError = err;
196 _sigError.emit( *z_func() );
197 return;
198 }
199
200 // make sure we dequeue pending requests ( in case a call to dequeue was blocked during the API call )
201 zypp::OnScopeExit scopeFinally([this](){
202 this->dequeuePending();
203 });
204
205 int msgs_left = 0;
206 CURLMsg *msg = nullptr;
207 while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
208 if(msg->msg == CURLMSG_DONE) {
209 CURL *easy = msg->easy_handle;
210 CURLcode res = msg->data.result;
211
212 void *privatePtr = nullptr;
213 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214 WAR << "Unable to get CURLINFO_PRIVATE" << std::endl;
215 continue;
216 }
217
218 if ( !privatePtr ) {
219 //broken easy handle not associated, should never happen but clean it up
220 WAR << "Cleaning up unassigned easy handle" << std::endl;
221 curl_multi_remove_handle( _multi, easy );
222 curl_easy_cleanup( easy );
223 continue;
224 }
225
226 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
227 request->dequeueNotify();
228
229 if ( request->hasMoreWork() && ( res == CURLE_OK || request->canRecover() ) ) {
230 std::string errBuf = "Broken easy handle in request";
231 if ( !request->_easyHandle ) {
233 setFinished( *request->z_func(), e );
234 continue;
235 }
236
237 // remove the handle from multi to change options
238 curl_multi_remove_handle( _multi, request->_easyHandle );
239
240 errBuf = "Failed to reinitialize the request";
241 if ( !request->prepareToContinue ( errBuf ) ) {
243 setFinished( *request->z_func(), e );
244 } else {
245 // add the request back to the multi handle, it is not done
246 if ( !addRequestToMultiHandle( *request->z_func() ) )
247 continue;
248
249 request->aboutToStart( );
250 }
251 } else {
252 // trigger notification about file downloaded
253 // we create a error from the CURL code, there might be a already cached Result which will be used instead
254 // in cases like a RangeFail where we could not recover but there also is no real error code
255 NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->errorMessage() );
256 setFinished( *request->z_func(), e );
257 }
258 //attention request could be deleted from here on
259 }
260 }
261}
262
264{
265 //prevent dequeuePending from filling up the runningDownloads again
266 zypp::DtorReset lockReset( _locked );
267 _locked = true;
268
269 while ( _runningDownloads.size() ) {
270 std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
271 setFinished(*req, result );
272 }
273 while ( _pendingDownloads.size() ) {
274 std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
275 setFinished(*req, result );
276 }
277}
278
280{
281 auto delReq = []( auto &list, NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
282 auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
283 return req.d_func() == r->d_func();
284 } );
285 if ( it != list.end() ) {
286 auto ptr = *it;
287 list.erase( it );
288 return ptr;
289 }
290 return nullptr;
291 };
292
293 // We have a tricky situation if a network request is called when inside a callback. In those cases, it is
294 // not allowed to call curl_multi_remove_handle. We need to tell the callback to fail, so the download
295 // is cancelled by curl itself. We also need to store the current result for later
296 auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
297 if ( rmode ) {
298 if ( rmode->_isInCallback ) {
299 // the first cached result wins)
300 if ( !rmode->_cachedResult )
301 rmode->_cachedResult = result;
302 return;
303 } else if ( rmode->_cachedResult ) {
304 result = rmode->_cachedResult.value();
305 }
306 }
307
308 auto rLocked = delReq( _runningDownloads, req );
309 if ( !rLocked )
310 rLocked = delReq( _pendingDownloads, req );
311
312 void *easyHandle = req.d_func()->_easyHandle;
313 if ( easyHandle ) {
314 MIL_MEDIA << "Removing easy handle: " << easyHandle << std::endl;
315 curl_multi_remove_handle( _multi, easyHandle );
316 }
317
318 req.d_func()->_dispatcher = nullptr;
319
320 //first set the result, the Request might have a checksum to check as well so a currently
321 //successful request could fail later on
322 req.d_func()->setResult( std::move(result) );
323 _sigDownloadFinished.emit( *z_func(), req );
324
325 //we got a open slot, try to dequeue or send the finished signals if all queues are empty
327}
328
330{
331 CURLMcode rc = curl_multi_add_handle( _multi, req.d_func()->_easyHandle );
332 if ( rc != 0 ) {
334 return false;
335 }
336
337 MIL_MEDIA << "Added easy handle: " << req.d_func()->_easyHandle << std::endl;
338 // make sure to wake up once to register what we have now
339 _timer->start(0);
340 return true;
341}
342
344{
345 if ( !_isRunning || _locked )
346 return;
347
348 while ( _maxConnections == -1 || ( (std::size_t)_maxConnections > _runningDownloads.size() ) ) {
349 if ( !_pendingDownloads.size() )
350 break;
351
352 std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
353 _pendingDownloads.pop_front();
354
355 std::string errBuf = "Failed to initialize easy handle";
356 if ( !req->d_func()->initialize( errBuf ) ) {
357 //@TODO store the CURL error in the errors extra info
359 continue;
360 }
361
362 if ( !addRequestToMultiHandle( *req ) )
363 continue;
364
365 req->d_func()->aboutToStart();
366 _sigDownloadStarted.emit( *z_func(), *req );
367
368 _runningDownloads.push_back( std::move(req) );
369 }
370
371 //check for empty queues
372 if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
373 //once we finished all requests, cancel the timer too, so curl is not called without requests
374 _timer->stop();
375 _sigQueueFinished.emit( *z_func() );
376 }
377}
378
379ZYPP_IMPL_PRIVATE(NetworkRequestDispatcher)
380
381NetworkRequestDispatcher::NetworkRequestDispatcher( )
382 : Base( * new NetworkRequestDispatcherPrivate ( *this ) )
383{
384
385}
386
387bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
388{
389 curl_version_info_data *curl_info = nullptr;
390 curl_info = curl_version_info(CURLVERSION_NOW);
391 // curl_info does not need any free (is static)
392 if (curl_info->protocols)
393 {
394 const char * const *proto = nullptr;
395 std::string scheme( url.getScheme() );
396 bool found = false;
397 for(proto=curl_info->protocols; !found && *proto; ++proto) {
398 if( scheme == std::string((const char *)*proto))
399 found = true;
400 }
401 return found;
402 }
403 return true;
404}
405
406void NetworkRequestDispatcher::setMaximumConcurrentConnections( const int maxConn )
407{
408 d_func()->_maxConnections = maxConn;
409}
410
411int NetworkRequestDispatcher::maximumConcurrentConnections () const
412{
413 return d_func()->_maxConnections;
414}
415
416void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
417{
418 if ( !req )
419 return;
420 Z_D();
421
422 if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
423 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
424 return;
425 }
426
427 if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
428 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
429 return;
430 }
431
432 req->d_func()->_dispatcher = this;
433 if ( req->priority() == NetworkRequest::Normal )
434 d->_pendingDownloads.push_back( req );
435 else {
436 auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ]( const auto &pendingReq ){
437 return pendingReq->priority() < prio;
438 });
439
440 //if we have a valid iterator, decrement we found a pending download request with lower prio, insert before that
441 if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
442 it--;
443 d->_pendingDownloads.insert( it, req );
444 }
445
446 //dequeue if running and we have capacity
447 d->dequeuePending();
448}
449
450void NetworkRequestDispatcher::setAgentString( const std::string &agent )
451{
452 Z_D();
453 if ( agent.empty() )
454 d->_userAgent = defaultAgentString();
455 else
456 d->_userAgent = agent;
457}
458
459const std::string &NetworkRequestDispatcher::agentString() const
460{
461 return d_func()->_userAgent;
462}
463
464void NetworkRequestDispatcher::setHostSpecificHeader( const std::string &host, const std::string &headerName, const std::string &value )
465{
466 Z_D();
467 if ( value.empty() ) {
468 if ( auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
469 if ( auto v = i->second.find( headerName ); v != i->second.end() ) {
470 i->second.erase (v);
471 }
472 if ( i->second.empty() )
473 d->_customHeaders.erase(i);
474 }
475 return;
476 }
477 d->_customHeaders[host][headerName] = value;
478}
479
480const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders() const
481{
482 return d_func()->_customHeaders;
483}
484
485void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
486{
487 cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
488}
489
490void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
491{
492 Z_D();
493
494 if ( req.d_func()->_dispatcher != this ) {
495 //TODO throw exception
496 return;
497 }
498
499 d->setFinished( req, err );
500}
501
502void NetworkRequestDispatcher::cancelAll(std::string reason)
503{
504 cancelAll( NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
505}
506
507void NetworkRequestDispatcher::cancelAll(const NetworkRequestError &err)
508{
509 d_func()->cancelAll ( err );
510}
511
512void NetworkRequestDispatcher::run()
513{
514 Z_D();
515 d->_isRunning = true;
516
517 if ( d->_pendingDownloads.size() )
518 d->dequeuePending();
519}
520
521void NetworkRequestDispatcher::reschedule()
522{
523 Z_D();
524 if ( !d->_pendingDownloads.size() )
525 return;
526
527 std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &a, const auto &b ){
528 return a->priority() < b->priority();
529 });
530
531 d->dequeuePending();
532}
533
534size_t NetworkRequestDispatcher::count()
535{
536 Z_D();
537 return d->_pendingDownloads.size() + d->_runningDownloads.size();
538}
539
540const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
541{
542 return d_func()->_lastError;
543}
544
545SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
546{
547 return d_func()->_sigDownloadStarted;
548}
549
550SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
551{
552 return d_func()->_sigDownloadFinished;
553}
554
555SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
556{
557 return d_func()->_sigQueueFinished;
558}
559
560SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
561{
562 return d_func()->_sigError;
563}
564
565}
Assign a vaiable a certain value when going out of scope.
Definition dtorreset.h:50
BasePrivate(Base &b)
Definition base_p.h:17
NetworkRequestDispatcherPrivate(NetworkRequestDispatcher &p)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
Signal< void(NetworkRequestDispatcher &)> _sigError
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
void setFinished(NetworkRequest &req, NetworkRequestError result)
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
void onSocketActivated(const SocketNotifier &listener, int events)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
void cancelAll(const NetworkRequestError &result)
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
The NetworkRequestError class Represents a error that occured in.
std::string errorMessage() const
Definition request.cc:569
bool prepareToContinue(std::string &errBuf)
Definition request.cc:414
static Ptr create(int socket, int evTypes, bool enable=true)
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
unsigned short a
unsigned short b
#define MIL_MEDIA
Boost libraries.
void globalInitCurlOnce()
Definition curlhelper.cc:64
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Definition String.cc:39
AutoDispose< void > OnScopeExit
zypp::Url Url
Definition url.h:15
static const std::string & defaultAgentString()
Provides API related macros.
#define WAR
Definition Logger.h:101
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition Logger.h:117
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105