73 MIL <<
"Provider is not started, NOT scheduling" << std::endl;
78 DBG_PRV <<
"Scheduling triggered during scheduling, returning immediately." << std::endl;
83#ifdef _SC_NPROCESSORS_ONLN
84 sysconf(_SC_NPROCESSORS_ONLN) * 2;
90 constexpr auto findLaziestWorker = [](
const auto &workerQueues,
const auto &idleNames ) {
91 auto candidate = workerQueues.end();
95 for (
const auto &name : idleNames ) {
96 auto thisElem = workerQueues.find(name);
97 if ( thisElem == workerQueues.end() )
100 const auto idleS = thisElem->second->idleSince();
102 && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
103 candidateIdleSince = *idleS;
104 candidate = thisElem;
108 if ( candidate != workerQueues.end() )
109 MIL_PRV <<
"Found idle worker:" << candidate->first <<
" idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
117 if ( (*iMedia)->refCount() > 1 ) {
118 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" refcount is not zero" << std::endl;
124 if ( (*iMedia)->_idleSince && std::chrono::steady_clock::now() - (*iMedia)->_idleSince.value() >= std::chrono::hours(1) ) {
125 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->attachedUrl() << std::endl;
129 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" downloading worker and not timed out yet." << std::endl;
133 auto bQueue = (*iMedia)->_backingQueue.lock();
137 url.setAuthority( (*iMedia)->_name );
140 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->attachedUrl() << std::endl;
141 bQueue->enqueue ( *req );
145 ERR <<
"Could not send detach request, creating the request failed" << std::endl;
148 ERR <<
"Could not send detach request since no backing queue was defined" << std::endl;
157 const auto schedStart = std::chrono::steady_clock::now();
158 MIL_PRV <<
"Start scheduling" << std::endl;
161 const auto dur = std::chrono::steady_clock::now() - schedStart;
162 MIL_PRV <<
"Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
166 for (
auto it =
_items.begin (); it !=
_items.end(); ) {
182 for(
auto queueIter =
_queues.begin(); queueIter !=
_queues.end(); queueIter ++ ) {
184 const auto &scheme = queueIter->_schemeName;
185 auto &queue = queueIter->_requests;
192 MIL_PRV <<
"Start scheduling for scheme:" << scheme <<
" queue size is: " << queue.size() << std::endl;
196 ERR <<
"Scheme: " << scheme <<
" failed to return a valid configuration." << std::endl;
198 while( queue.size() ) {
199 auto item = std::move( queue.front() );
209 const auto &config = configOpt.get();
213 for(
auto i = queue.begin (); i != queue.end(); ) {
217 while ( i != queue.end() && !(*i) ) {
221 if ( i == queue.end() )
224 ProvideRequestRef item = *i;
228 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
231 item->owner()->finishReq(
nullptr, item,
ZYPP_EXCPT_PTR(
zypp::Exception(
"Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
235 MIL_PRV <<
"Trying to schedule request: " << item->origin().authority() << std::endl;
238 int existingTypeWorkers = 0;
241 int existingConnections = 0;
244 std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
247 std::vector<std::string> idleWorkers;
250 std::vector<zypp::Url> mirrsWithoutWorker;
251 for (
const auto &endpoint : item->origin() ) {
254 MIL <<
"Mirror URL " << endpoint <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
258 if( item->owner()->canRedirectTo( item, endpoint.url() ) )
259 mirrsWithoutWorker.push_back( endpoint.url() );
261 MIL_PRV <<
"URL was rejected" << endpoint << std::endl;
266 if( mirrsWithoutWorker.size() == 0 ) {
267 MIL <<
"Request has NO usable URLs" << std::endl;
279 existingTypeWorkers ++;
280 existingConnections += workerQueue->activeRequests();
282 if ( workerQueue->isIdle() )
288 for (
auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
290 if ( u.getHost() == workerQueue->hostname() ) {
292 possibleHostWorkers.push_back( {u, workerQueue.get()} );
293 i = mirrsWithoutWorker.erase( i );
302 MIL <<
"Current stats: " << std::endl;
303 MIL <<
"Existing type workers: " << existingTypeWorkers << std::endl;
304 MIL <<
"Existing active connections: " << existingConnections << std::endl;
305 MIL <<
"Possible host workers: "<< possibleHostWorkers.size() << std::endl;
306 MIL <<
"Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
311 MIL_PRV <<
"Reached maximum nr of connections, break" << std::endl;
318 && mirrsWithoutWorker.size() ) {
320 MIL_PRV <<
"Free worker slots and available mirror URLs, starting a new worker" << std::endl;
324 for(
const auto &
url : mirrsWithoutWorker ) {
327 if ( !item->owner()->safeRedirectTo ( item,
url ) )
330 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
331 if ( !q->startup( scheme,
_workDir / scheme /
url.getHost(),
url.getHost() ) ) {
335 MIL_PRV <<
"Started worker for " <<
url.getHost() <<
" enqueing request" << std::endl;
337 item->setActiveUrl(
url);
354 if ( possibleHostWorkers.size() ) {
356 MIL_PRV <<
"No free worker slots, looking for the best existing worker" << std::endl;
358 while( possibleHostWorkers.size () ) {
359 std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
360 for (
auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
361 if ( i->second->activeRequests () < candidate->second->activeRequests () )
365 if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
366 possibleHostWorkers.erase( candidate );
370 MIL_PRV <<
"Using existing worker " << candidate->first.getHost() <<
" to download request" << std::endl;
373 item->setActiveUrl( candidate->first );
374 candidate->second->enqueue( item );
386 if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
388 MIL_PRV <<
"No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
390 auto candidate = findLaziestWorker(
_workerQueues, idleWorkers );
399 for(
const auto &
url : mirrsWithoutWorker ) {
401 if ( !item->owner()->safeRedirectTo ( item,
url ) )
404 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
405 if ( !q->startup( scheme,
_workDir / scheme /
url.getHost(),
url.getHost() ) ) {
409 MIL_PRV <<
"Replaced worker for " <<
url.getHost() <<
", enqueing request" << std::endl;
411 item->setActiveUrl(
url);
428 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
434 for(
auto i = queue.begin (); i != queue.end(); ) {
438 while ( i != queue.end() && !(*i) ) {
442 if ( i == queue.end() )
446 ProvideRequestRef item = *i;
450 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
452 if ( item->owner () )
453 item->owner()->finishReq(
nullptr, item,
ZYPP_EXCPT_PTR(
zypp::Exception(
"CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
457 MIL_PRV <<
"Trying to schedule request: " << item->origin().authority() << std::endl;
460 int existingTypeWorkers = 0;
461 int existingSchemeWorkers = 0;
464 std::vector< ProvideQueue* > possibleWorkers;
467 std::vector<std::string> idleWorkers;
473 for (
const auto &tmpurl : item->origin() ) {
475 MIL <<
"Mirror URL " << tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
483 if( !
url.isValid() ) {
484 MIL <<
"Request has NO usable URLs" << std::endl;
498 existingTypeWorkers ++;
500 existingSchemeWorkers++;
501 if ( workerQueue->canScheduleMore() )
502 possibleWorkers.push_back(workerQueue.get());
505 if ( workerQueue->isIdle() )
510 MIL <<
"Current stats: " << std::endl;
511 MIL <<
"Existing type workers: " << existingTypeWorkers << std::endl;
512 MIL <<
"Possible CPU workers: "<< possibleWorkers.size() << std::endl;
516 if ( possibleWorkers.size() ) {
518 for (
auto &w : possibleWorkers ) {
520 MIL_PRV <<
"Using existing idle worker to provide request" << std::endl;
522 item->owner()->redirectTo ( item,
url );
523 item->setActiveUrl(
url );
535 if ( existingTypeWorkers < cpuLimit ) {
537 MIL_PRV <<
"Free CPU slots, starting a new worker" << std::endl;
540 item->owner()->redirectTo ( item,
url );
542 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
543 if ( q->startup( scheme,
_workDir / scheme ) ) {
545 item->setActiveUrl(
url);
562 if ( possibleWorkers.size() ) {
563 MIL_PRV <<
"No free CPU slots, looking for the best existing worker" << std::endl;
565 if( possibleWorkers.size () ) {
566 std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
567 for (
auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
568 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
573 item->owner()->redirectTo ( item,
url );
575 MIL_PRV <<
"Using existing worker to provide request" << std::endl;
576 item->setActiveUrl(
url );
577 (*candidate)->enqueue( item );
585 if ( idleWorkers.size() ) {
587 MIL_PRV <<
"No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
589 auto candidate = findLaziestWorker(
_workerQueues, idleWorkers );
595 item->owner()->redirectTo ( item,
url );
597 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
598 if ( q->startup( scheme,
_workDir / scheme ) ) {
600 MIL_PRV <<
"Replaced worker, enqueing request" << std::endl;
602 item->setActiveUrl(
url);
618 MIL_PRV <<
"No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
623 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
630 for(
auto i = queue.begin (); i != queue.end(); ) {
634 while ( i != queue.end() && !(*i) ) {
638 if ( i == queue.end() )
642 ProvideRequestRef item = *i;
643 MIL_PRV <<
"Trying to schedule request: " << item->origin().authority() << std::endl;
648 for (
const auto &tmpurl : item->origin() ) {
650 MIL <<
"Mirror URL " << tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
658 if( !
url.isValid() ) {
659 MIL <<
"Request has NO usable URLs" << std::endl;
669 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
670 if ( !q->startup( scheme,
_workDir / scheme ) ) {
671 ERR <<
"Worker startup failed!" << std::endl;
680 MIL_PRV <<
"Started worker, enqueing request" << std::endl;
684 MIL_PRV <<
"Found worker, enqueing request" << std::endl;
689 item->owner()->redirectTo ( item,
url );
691 item->setActiveUrl(
url);