libzypp 17.37.17
providequeue.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
9
12#include "private/provide_p.h"
14
19#include <zypp-media/MediaException>
20#include <zypp-media/auth/CredentialManager>
21
22#include <zypp-core/Globals.h>
23#include <bitset>
24
25namespace zyppng {
26
28 {
29 if ( !_request )
30 return false;
31 return ( _request->code () == ProvideMessage::Code::Attach );
32 }
33
35 {
36 if ( !_request )
37 return false;
38 return ( _request->code () == ProvideMessage::Code::Prov );
39 }
40
42 {
43 if ( !_request )
44 return false;
45 return ( _request->code () == ProvideMessage::Code::Detach );
46 }
47
50
52 {
54 if ( this->_activeItems.size() || this->_waitQueue.size() ) {
55 DBG << "Queue shutdown with Items still running" << std::endl;
56 }
57 }
58 immediateShutdown(std::make_exception_ptr(zypp::media::MediaException("Cancelled by queue shutdown")));
59 }
60
61 bool ProvideQueue::startup(const std::string &workerScheme, const zypp::filesystem::Pathname &workDir, const std::string &hostname ) {
62
63 if ( _workerProc ) {
64 ERR << "Queue Worker was already initialized" << std::endl;
65 return true;
66 }
67
69
70 const auto &pN = _parent.workerPath() / ( "zypp-media-"+workerScheme ) ;
71 MIL << "Trying to start " << pN << std::endl;
72 const auto &pi = zypp::PathInfo( pN );
73 if ( !pi.isExist() ) {
74 ERR << "Failed to find worker for " << workerScheme << std::endl;
75 return false;
76 }
77
78 if ( !pi.userMayX() ) {
79 ERR << "Failed to start worker for " << workerScheme << " binary " << pi.asString() << " is not executable." << std::endl;
80 return false;
81 }
82
83 if ( zypp::filesystem::assert_dir( workDir ) != 0 ) {
84 ERR << "Failed to assert working directory '" << workDir << "' for worker " << workerScheme << std::endl;
85 return false;
86 }
87
88 _currentExe = pN;
90 _workerProc->setWorkingDirectory ( workDir );
92 return doStartup();
93 }
94
95
96 void ProvideQueue::enqueue( ProvideRequestRef request )
97 {
98 Item i;
99 i._request = request;
100 i._request->provideMessage().setRequestId( nextRequestId() );
101 request->setCurrentQueue( shared_this<ProvideQueue>() );
102 _waitQueue.push_back( std::move(i) );
103 if ( _parent.isRunning() )
104 scheduleNext();
105 }
106
107 void ProvideQueue::cancel( ProvideRequest *item , std::exception_ptr error )
108 {
109 const auto &isSameItem = [item]( const Item &i ){
110 if ( i.isDetachRequest () )
111 return false;
112 return i._request.get() == item;
113 };
114
115 if ( !item )
116 return;
117
118 if ( item->code() != ProvideMessage::Code::Attach
119 && item->code() != ProvideMessage::Code::Prov ) {
120 ERR << "Can not cancel a " << item->code() << " request!" << std::endl;
121 return;
122 }
123
124 if ( auto i = std::find_if( _waitQueue.begin(), _waitQueue.end(), isSameItem ); i != _waitQueue.end() ) {
125 auto &reqRef = i->_request;
126 reqRef->setCurrentQueue(nullptr);
127 if ( reqRef->owner() )
128 reqRef->owner()->finishReq( this, reqRef, error );
129 _waitQueue.erase(i);
130 _parent.schedule( ProvidePrivate::FinishReq ); // let the parent scheduler run since we have a open spot now
131 } else if ( auto i = std::find_if( _activeItems.begin(), _activeItems.end(), isSameItem ); i != _activeItems.end() ) {
132 cancelActiveItem(i, error);
133 }
134 }
135
136 std::list<ProvideQueue::Item>::iterator ProvideQueue::dequeueActive( std::list<Item>::iterator it )
137 {
138 if ( it == _activeItems.end() )
139 return it;
140
141 if ( it->_request )
142 it->_request->setCurrentQueue( nullptr );
143
144 auto i = _activeItems.erase(it);
145 _parent.schedule ( ProvidePrivate::FinishReq ); // Trigger the scheduler
146 scheduleNext (); // keep the active items full
147 return i;
148 }
149
150 void ProvideQueue::fatalWorkerError( const std::exception_ptr &reason )
151 {
152 immediateShutdown( reason ? reason : std::make_exception_ptr( zypp::media::MediaException("Fatal worker error")) );
153 }
154
155 void ProvideQueue::immediateShutdown( const std::exception_ptr &reason )
156 {
157 _queueShuttingDown = true;
158
159 while ( _waitQueue.size() ) {
160 auto &item = _waitQueue.front();
161 auto &reqRef = item._request;
162 if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
163 reqRef->owner()->finishReq( this, reqRef, reason );
164 _waitQueue.pop_front();
165 }
166
167 for ( auto i = _activeItems.begin(); i != _activeItems.end(); ) {
168 auto &reqRef = i->_request;
169 if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
170 i = cancelActiveItem(i, reason );
171 } else {
172 i++;
173 }
174 }
175
176 if ( _workerProc && _workerProc->isRunning() ) {
177 _workerProc->flush();
178 _workerProc->closeWriteChannel();
179 _workerProc->waitForExit();
181 }
182 }
183
184 std::list< ProvideQueue::Item >::iterator ProvideQueue::cancelActiveItem( std::list< Item >::iterator i , const std::__exception_ptr::exception_ptr &error )
185 {
186 auto &reqRef = i->_request;
187
188 // already in cancelling process or finished
189 if ( i->_state == Item::Cancelling || i->_state == Item::Finished )
190 return (++i);
191
192 // not possible but lets be safe
193 if ( i->_state == Item::Pending ) {
194 reqRef->setCurrentQueue(nullptr);
195 if ( reqRef->owner() )
196 reqRef->owner()->finishReq( this, reqRef, error );
197 return dequeueActive(i);
198 }
199
200 // we first need to cancel the item
201 auto c = ProvideMessage::createCancel ( i->_request->provideMessage().requestId() );
202 if( !_messageStream->sendMessage(c) )
203 ERR << "Failed to send cancel message to worker" << std::endl;
204
205 i->_state = Item::Cancelling;
206 reqRef->setCurrentQueue(nullptr);
207 if ( reqRef->owner() )
208 reqRef->owner()->finishReq( this, reqRef, error );
209 reqRef.reset();
210 return (++i);
211 }
212
214 {
215 if ( _queueShuttingDown )
216 return;
217
218 while ( _waitQueue.size() && canScheduleMore() ) {
219 auto item = std::move( _waitQueue.front() );
220 _waitQueue.pop_front();
221
222 auto &reqRef = item._request;
223 if ( !reqRef->activeUrl() ) {
224 ERR << "Item without active URL enqueued, this is a BUG." << std::endl;
225 if ( reqRef->owner() )
226 reqRef->owner()->finishReq( this, reqRef, ZYPP_EXCPT_PTR (zypp::media::MediaException("Item needs a activeURL to be queued.")) );
227 continue;
228 }
229
230 if ( !_messageStream->sendMessage( reqRef->provideMessage() ) ) {
231 ERR << "Failed to send message to worker process." << std::endl;
232 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
233 return;
234 }
235
236 item._state = Item::Queued;
237 _activeItems.push_back( std::move(item) );
238 _idleSince.reset();
239 }
240
241 if ( _waitQueue.empty() && _activeItems.empty() ) {
243 if ( !_idleSince )
244 _idleSince = std::chrono::steady_clock::now();
245 _sigIdle.emit();
246 }
247 }
248
250 {
251 return ( _activeItems.size() == 0 || ( _capabilities.cfg_flags () & zyppng::WorkerCaps::Pipeline ) == zyppng::WorkerCaps::Pipeline );
252 }
253
255 {
256 return ( empty() );
257 }
258
259 std::optional<ProvideQueue::TimePoint> ProvideQueue::idleSince() const
260 {
261 return _idleSince;
262 }
263
265 {
266 return ( _activeItems.empty() && _waitQueue.empty() );
267 }
268
270 {
271 return _activeItems.size() + _waitQueue.size();
272 }
273
275 {
276 return _activeItems.size();
277 }
278
280 {
281 zypp::ByteCount dlSize;
282 for ( const auto &i : _waitQueue ) {
283 if ( i.isDetachRequest () )
284 continue;
285
286 auto &reqRef = i._request;
287 if ( reqRef->code() != ProvideMessage::Code::Prov )
288 continue;
289 dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
290 }
291 for ( const auto &i : _activeItems ) {
292 if ( i.isDetachRequest () )
293 continue;
294 auto &reqRef = i._request;
295 if ( reqRef->code() != ProvideMessage::Code::Prov )
296 continue;
297 dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
298 }
299 return dlSize;
300 }
301
302 const std::string &ProvideQueue::hostname() const
303 {
304 return _myHostname;
305 }
306
308 {
309 return _capabilities;
310 }
311
313 {
314 return _sigIdle;
315 }
316
318 {
319 if ( _currentExe.empty() )
320 return false;
321
322 //const char *argv[] = { "gdbserver", ":10000", _currentExe.c_str(), nullptr };
323 const char *argv[] = { _currentExe.c_str(), nullptr };
324 if ( !_workerProc->start( argv) ) {
325 ERR << "Failed to execute worker" << std::endl;
326
327 _messageStream.reset ();
328 _workerProc.reset ();
329
330 return false;
331 }
332
333 // make sure the default read channel is StdOut so RpcMessageStream gets all the rpc messages
334 _workerProc->setReadChannel ( Process::StdOut );
335
336 // we are ready to send the data
337
339 // @TODO actually write real config data :D
340 conf.insert ( { AGENT_STRING_CONF.data (), "ZYpp " LIBZYPP_VERSION_STRING } );
341 conf.insert ( { ATTACH_POINT.data (), _workerProc->workingDirectory().asString() } );
342 conf.insert ( { PROVIDER_ROOT.data (), _parent.z_func()->providerWorkdir().asString() } );
343
344 const auto &cleanupOnErr = [&](){
346 _messageStream.reset ();
347 _workerProc->close();
348 _workerProc.reset();
349 return false;
350 };
351
352 if ( !_messageStream->sendMessage( conf ) ) {
353 ERR << "Failed to send initial message to queue worker" << std::endl;
354 return cleanupOnErr();
355 }
356
357 // wait for the data to be written
358 _workerProc->flush ();
359
360 // wait until we receive a message
361 const auto &caps = _messageStream->nextMessageWait();
362 if ( !caps || caps->command() != WorkerCaps::typeName ) {
363 ERR << "Worker did not sent a capabilities message, aborting" << std::endl;
364 return cleanupOnErr();
365 }
366
367 {
368 auto p = _messageStream->parseMessage<WorkerCaps>( *caps );
369 if ( !p )
370 return cleanupOnErr();
371
372 _capabilities = std::move(*p);
373 }
374
375 DBG << "Received config for worker: " << this->_currentExe.asString() << " Worker Type: " << this->_capabilities.worker_type() << " Flags: " << std::bitset<32>( _capabilities.cfg_flags() ).to_string() << std::endl;
376
377 // now we can set up signals and start processing messages
381
382 // make sure we do not miss messages
384 return true;
385 }
386
388
389 const auto &getRequest = [&]( const auto &exp ) -> decltype(_activeItems)::iterator {
390 if ( !exp ) {
391 ERR << "Ignoring invalid request!" << std::endl;
392 return _activeItems.end();
393 }
394
395 auto i = std::find_if( _activeItems.begin(), _activeItems.end(), [&]( const auto &elem ) {
396 if ( ! elem._request )
397 return false;
398 return exp->requestId() == elem._request->provideMessage().requestId();
399 });
400
401 if ( i == _activeItems.end() ) {
402 ERR << "Ignoring unknown request ID: " << exp->requestId() << std::endl;
403 return _activeItems.end();
404 }
405
406 return i;
407 };
408
409 const auto &sendErrorToWorker = [&]( const uint32_t reqId, const MessageCodes code, const std::string &reason, bool transient = false ) {
410 auto r = ProvideMessage::createErrorResponse ( reqId, code, reason, transient );
411 if ( !_messageStream->sendMessage( r ) ) {
412 ERR << "Failed to send Error message to worker process." << std::endl;
413 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
414 return false;
415 }
416 return true;
417 };
418
419 const bool doesDownload = this->_capabilities.worker_type() == Config::Downloading;
420 const bool fileNeedsCleanup = doesDownload || ( _capabilities.worker_type() == Config::CPUBound && _capabilities.cfg_flags() & Config::FileArtifacts );
421
422 while ( auto msg = _messageStream->nextMessage () ) {
423
424 if ( msg->command() == ProvideMessage::typeName ) {
425
426 const auto &provMsg = ProvideMessage::create(*msg);
427 if ( !provMsg ) {
428 fatalWorkerError( provMsg.error() );
429 return;
430 }
431
432 const auto &reqIter = getRequest( provMsg );
433 if ( reqIter == _activeItems.end() ) {
434 if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
435 const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
436 if ( !_parent.isInCache(locFName) ) {
437 MIL << "Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
438 zypp::filesystem::unlink( locFName );
439 }
440 }
441 continue;
442 }
443
444 auto &req = *reqIter;
445 auto &reqRef =req._request;
446
447 const auto code = provMsg->code();
448
449 if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
450
451 // send the message to the item but don't dequeue
452 if ( reqRef && reqRef->owner() )
453 reqRef->owner()->informalMessage ( *this, reqRef, *provMsg );
454 continue;
455
456 } else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
457
458 if ( req._state == Item::Cancelling ) {
459 req._state = Item::Finished;
460 dequeueActive( reqIter );
461 continue;
462 }
463
464 if ( code == ProvideMessage::Code::ProvideFinished ) {
465
466 // we are going to register the file to the cache if this is a downloading worker, so it can not leak
467 // no matter if the item does the correct dance or not, this code is duplicated by all ProvideItems that receive ProvideFinished
468 // results that require file cleanups.
469 // we keep the ref around until after sending the result to the item. At that point it should take a reference
470 std::optional<zypp::ManagedFile> dataRef;
471
472 if ( !reqIter->isFileRequest() ) {
473 ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
475 return;
476 }
477
478 const auto &isCheckExistsOnlyVal = reqRef->provideMessage().value( ProvideMsgFields::CheckExistOnly );
479 bool isCheckExistsOnly = isCheckExistsOnlyVal.valid() ? isCheckExistsOnlyVal.asBool() : false;
480
481 // when a worker is downloading we keep a internal book of cache files
482 if ( doesDownload && !isCheckExistsOnly ) {
483 const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
484 if ( provMsg->value( ProvideFinishedMsgFields::CacheHit, false ).asBool()) {
485 dataRef = _parent.addToFileCache ( locFName );
486 if ( !dataRef ) {
487 MIL << "CACHE MISS, file " << locFName << " was already removed, queueing again" << std::endl;
488 if ( reqRef->owner() )
489 reqRef->owner()->cacheMiss( reqRef );
490 reqRef->provideMessage().setRequestId( InvalidId );
491 req._state = Item::Pending;
492 _waitQueue.push_front( req );
493 dequeueActive( reqIter );
494 continue;
495 }
496 } else {
497
498 dataRef = _parent.addToFileCache ( locFName );
499
500 // unlikely this can happen but better be safe than sorry
501 if ( !dataRef ) {
502 req._state = Item::Finished;
503 reqRef->setCurrentQueue(nullptr);
504 auto resp = ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError, "File vanished between downloading and adding it to cache." );
505 if ( reqRef->owner() )
506 reqRef->owner()->finishReq( *this, reqRef, resp );
507 dequeueActive( reqIter );
508 continue;
509 }
510 }
511 }
512 }
513
514 // send the message to the item and dequeue
515 reqRef->setCurrentQueue(nullptr);
516 if ( reqRef->owner() )
517 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
518 req._state = Item::Finished;
519 dequeueActive( reqIter );
520 continue;
521
522 } else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
523
524 if ( req._state == Item::Cancelling ) {
525 req._state = Item::Finished;
526 dequeueActive( reqIter );
527 continue;
528 }
529
530 // send the message to the item and dequeue
531 reqRef->setCurrentQueue(nullptr);
532
533 if ( reqRef->owner() )
534 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
535
536 req._state = Item::Finished;
537 dequeueActive( reqIter );
538 continue;
539
540 } else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
541
542 // redir is like a finished message, we can simply forgot about a cancelling request
543 if ( req._state == Item::Cancelling ) {
544 req._state = Item::Finished;
545 dequeueActive( reqIter );
546 continue;
547 }
548
549 // send the message to the item and dequeue
550 reqRef->setCurrentQueue(nullptr);
551 if ( reqRef->owner() )
552 reqRef->owner()->finishReq( *this, reqRef, *provMsg );
553 req._state = Item::Finished;
554 dequeueActive( reqIter );
555 continue;
556
557 } else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
558
559 ERR << "Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
560 fatalWorkerError ( ZYPP_EXCPT_PTR( zypp::media::MediaException("Controller message received from worker.") ) );
561 return;
562
563 } else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
564
565 if ( code == ProvideMessage::Code::AuthDataRequest ) {
566 if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
567 ERR << "Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
569 return;
570 }
571
572 // if the file was cancelled we send a failure back
573 if( reqIter->_state == Item::Cancelling ) {
574 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item was cancelled") )
575 return;
576 continue;
577 }
578
579 // we need a owner item to fetch the auth data for us
580 if ( !reqRef->owner() ) {
581 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Request has no owner" ) )
582 return;
583 continue;
584 }
585
586 if ( !reqRef->activeUrl() ) {
587 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item has no active URL, this is a bug." ) )
588 return;
589 continue;
590 }
591
592 try {
593 zypp::Url u( provMsg->value( AuthDataRequestMsgFields::EffectiveUrl ).asString() );
594
595 std::map<std::string, std::string> extraVals;
596 for( const auto &hdr : provMsg->headers() ) {
597
600 continue;
601
602 if ( !hdr.second.isString() ) {
603 WAR << "Ignoring non string value for " << hdr.first << std::endl;
604 continue;
605 }
606
607 extraVals[hdr.first] = hdr.second.asString();
608 }
609
610 const auto &authOpt = reqRef->owner()->authenticationRequired( *this, reqRef, u, provMsg->value( AuthDataRequestMsgFields::LastAuthTimestamp ).asInt64(), extraVals );
611 if ( !authOpt ) {
612 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "No auth given by user." ) )
613 return;
614 continue;
615 }
616
617 auto r = ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
618 if ( !_messageStream->sendMessage( r ) ) {
619 ERR << "Failed to send AuthorizationInfo to worker process." << std::endl;
620 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
621 return;
622 }
623 continue;
624
625 } catch ( const zypp::Exception &e ) {
626 ZYPP_CAUGHT(e);
627 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.asString() ) )
628 return;
629 continue;
630 }
631
632 } else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
633
634 if ( !reqIter->isAttachRequest() ) {
635 ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
637 return;
638 }
639
640 // if the file was cancelled we send a failure back
641 if( reqIter->_state == Item::Cancelling ) {
642 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Item was cancelled" ) )
643 return;
644 continue;
645 }
646
647 MIL << "Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
648
649 //const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc
650 std::vector<std::string> freeDevs;
651 for ( const auto &val : provMsg->values( MediaChangeRequestMsgFields::Device) ) {
652 freeDevs.push_back( val.asString() );
653 }
654
655 std::optional<std::string> desc;
656 const auto &descVal = provMsg->value( MediaChangeRequestMsgFields::Desc );
657 if ( descVal.valid () && descVal.isString() )
658 desc = descVal.asString();
659
660 auto res = _parent._sigMediaChange.emit(
661 _parent.queueName(*this),
662 provMsg->value( MediaChangeRequestMsgFields::Label ).asString(),
663 provMsg->value( MediaChangeRequestMsgFields::MediaNr ).asInt(),
664 freeDevs,
665 desc
666 );
667
668 auto action = res ? *res : Provide::Action::ABORT;
669 switch ( action ) {
671 MIL << "Sending back a MediaChanged message, retrying to find medium " << std::endl;
672 auto r = ProvideMessage::createMediaChanged ( reqIter->_request->provideMessage().requestId() );
673 if ( !_messageStream->sendMessage( r ) ){
674 ERR << "Failed to send MediaChanged to worker process." << std::endl;
675 fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
676 return;
677 }
678 continue;
679 }
681 MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
682 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Cancelled by User" ) )
683 return;
684 continue;
685 }
687 MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
688 if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip, "Skipped by User" ) )
689 return;
690 continue;
691 }
692 }
693 } else {
694 // if there is a unsupported worker request we need to stop immediately because the worker will be blocked until it gets a answer
695 ERR << "Unsupported worker request: "<<code<<", this is a fatal error!" << std::endl;
697 return;
698 }
699
700 } else {
701 // unknown code
702 ERR << "Received unsupported message " << msg->command() << " with code " << code << " ignoring! " << std::endl;
703 }
704
705 } else {
706 ERR << "Received unsupported message " << msg->command() << "ignoring" << std::endl;
707 }
708 }
709 }
710
716 {
717 // read all stderr data so we get the full logs
718 auto ba = _workerProc->channelReadLine(Process::StdErr);
719 while ( !ba.empty() ) {
720 forwardToLog(std::string( ba.data(), ba.size() ) );
721 ba = _workerProc->channelReadLine(Process::StdErr);
722 }
723 }
724
725 void ProvideQueue::forwardToLog( std::string &&logLine )
726 {
728 zypp::base::LogControl::instance ().logRawLine( std::move(logLine) );
729 else
730 MIL << "Message from worker: " << _capabilities.worker_name() << ":" << logLine << std::endl;
731 }
732
734 // ignore stdout here
735 if ( channel == Process::StdOut )
736 return;
737
738 // forward the stderr output to the log bypassing the formatter
739 // the worker already formatted the line
740 while ( _workerProc->canReadLine(Process::StdErr) ) {
741 const auto &data = _workerProc->channelReadLine( Process::StdErr );
742 if ( data.empty() )
743 return;
744
745 forwardToLog(std::string( data.data(), data.size() ) );
746 }
747 }
748
749 void ProvideQueue::procFinished(int exitCode)
750 {
751 // process all pending messages
753
754 // get all of the log lines
756
757 // shut down
758 // @todo implement worker restart in case of a unexpected exit
759 if ( !_queueShuttingDown )
760 immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
761
762#if 0
763 if ( !_queueShuttingDown ) {
764
766 if ( _crashCounter > 3 ) {
767 immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
768 return;
769 }
770
771 MIL << "Unexpected queue worker exit with code: " << exitCode << std::endl;
772 // try to spawn the worker again, move active items back to wait list and start over
773
774 if ( !doStartup () ) {
775
776 }
777 }
778#endif
779 }
780
782 return _parent.nextRequestId();
783 }
784}
Store and operate with byte count.
Definition ByteCount.h:32
Base class for Exception.
Definition Exception.h:153
std::string asString() const
Error message provided by dumpOn as string.
Definition Exception.cc:124
Url manipulation class.
Definition Url.h:93
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
static LogControl instance()
Singleton access.
Definition LogControl.h:102
Wrapper class for stat/lstat.
Definition PathInfo.h:226
Just inherits Exception to separate media exceptions.
std::shared_ptr< T > shared_this() const
Definition base.h:113
WeakPtr parent() const
Definition base.cc:26
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition base.h:142
SignalProxy< void(uint)> sigChannelReadyRead()
Definition iodevice.cc:373
static Ptr create()
Definition process.cpp:49
SignalProxy< void(int)> sigFinished()
Definition process.cpp:294
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createCancel(const uint32_t reqId)
static constexpr std::string_view typeName
static ProvideMessage createMediaChanged(const uint32_t reqId)
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::ByteCount expectedProvideSize() const
ProvideQueue(ProvidePrivate &parent)
StompFrameStreamRef _messageStream
void immediateShutdown(const std::exception_ptr &reason)
std::deque< Item > _waitQueue
void cancel(ProvideRequest *item, std::exception_ptr error)
uint requestCount() const
zypp::Pathname _currentExe
Signal< void()> _sigIdle
bool canScheduleMore() const
void processReadyRead(int channel)
const std::string & hostname() const
std::optional< TimePoint > _idleSince
ProvidePrivate & _parent
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const Config & workerConfig() const
std::optional< TimePoint > idleSince() const
zyppng::WorkerCaps Config
void procFinished(int exitCode)
uint activeRequests() const
std::list< Item > _activeItems
void forwardToLog(std::string &&logLine)
void enqueue(ProvideRequestRef request)
SignalProxy< void()> sigIdle()
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
static constexpr uint32_t InvalidId
SignalProxy< void()> sigMessageReceived()
static Ptr create(IODevice::Ptr iostr)
static constexpr std::string_view typeName
int unlink(const Pathname &path)
Like 'unlink'.
Definition PathInfo.cc:705
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
Definition PathInfo.cc:324
constexpr std::string_view EffectiveUrl("effective_url")
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
constexpr std::string_view Label("label")
constexpr std::string_view Desc("desc")
constexpr std::string_view MediaNr("media_nr")
constexpr std::string_view Device("device")
constexpr std::string_view LocalFilename("local_filename")
constexpr std::string_view CacheHit("cacheHit")
constexpr std::string_view ExpectedFilesize("expected_filesize")
constexpr std::string_view CheckExistOnly("check_existance_only")
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
bool provideDebugEnabled()
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
ProvideRequestRef _request
Provides API related macros.
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition Exception.h:475
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition Exception.h:463
#define DBG
Definition Logger.h:99
#define MIL
Definition Logger.h:100
#define ERR
Definition Logger.h:102
#define WAR
Definition Logger.h:101