libzypp 17.37.17
provideworker.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
9
10#include "provideworker.h"
11#include <zypp-core/base/DtorReset>
13#include <zypp-core/Url.h>
14#include <zypp-core/Date.h>
15#include <zypp-core/zyppng/pipelines/AsyncResult>
20#include <zypp-core/zyppng/base/AutoDisconnect>
21#include <zypp-core/zyppng/base/EventDispatcher>
22#include <zypp-media/MediaConfig>
23#include <ostream>
24
26
27#undef ZYPP_BASE_LOGGER_LOGGROUP
28#define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
29
30namespace zyppng::worker {
31
32 using namespace zyppng::operators;
33
36
37 ProvideWorker::ProvideWorker(std::string_view workerName) : _workerName(workerName)
38 {
39 // do not change the order of these calls, otherwise showing the threadname does not work
40 // enableLogForwardingMode will initialize the log which would override the current thread name
42 ThreadData::current().setName( workerName );
43
44 // we use a singleshot timer that triggers message handling
46 _msgAvail->setSingleShot(true);
47
48 // another timer to trigger a delayed shutdown
51 }, *this );
52 _delayedShutdown->setSingleShot(true);
53 }
54
57
58 StompFrameStreamRef ProvideWorker::messageStream() const
59 {
60 return _stream;
61 }
62
63 expected<void> ProvideWorker::run( int recv, int send )
64 {
65 // reentry not supported
66 assert ( !_isRunning );
67
69 _isRunning = true;
70
71 initLog();
72
73 zypp::OnScopeExit cleanup([&](){
74 _stream.reset();
75 _controlIO.reset();
76 _loop.reset();
77 });
78
80 if ( !_controlIO->openFds( { recv }, send ) ) {
81 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to open control FDs")) );
82 }
83
86
88
89 return executeHandshake () | and_then( [&]() {
90 AutoDisconnect disC[] = {
93 };
94 _loop->run();
95 if ( _fatalError )
98 });
99 }
100
101 std::deque<ProvideWorkerItemRef> &ProvideWorker::requestQueue()
102 {
103 return _pendingProvides;
104 }
105
109
113
115 {
116 // by default we log to strErr, if user code wants to change that it can overload this function
118 }
119
120 ProvideWorkerItemRef ProvideWorker::makeItem( ProvideMessage &&spec )
121 {
122 return std::make_shared<ProvideWorkerItem>( std::move(spec) );
123 }
124
125 void ProvideWorker::provideStart(const uint32_t id, const zypp::Url &url, const zypp::filesystem::Pathname &localFile, const zypp::Pathname &stagingFile )
126 {
127 if ( !_stream->sendMessage( ProvideMessage::createProvideStarted ( id
128 , url
129 , localFile.empty () ? std::optional<std::string>() : localFile.asString ()
130 , stagingFile.empty () ? std::optional<std::string>() : stagingFile.asString ()
131 ) ) ) {
132 ERR << "Failed to send ProvideStart message" << std::endl;
133 }
134 }
135
136 void ProvideWorker::provideSuccess(const uint32_t id, bool cacheHit, const zypp::filesystem::Pathname &localFile, const HeaderValueMap extra )
137 {
138 MIL_PRV << "Sending provideSuccess for id " << id << " file " << localFile << std::endl;
139 auto msg = ProvideMessage::createProvideFinished( id ,localFile.asString() ,cacheHit);
140 for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
141 for ( const auto &val : i->second )
142 msg.addValue( i->first, val );
143 }
144 if ( !_stream->sendMessage( msg ) ) {
145 ERR << "Failed to send ProvideSuccess message" << std::endl;
146 }
147 }
148
149 void ProvideWorker::provideFailed(const uint32_t id, const ProvideMessage::Code code, const std::string &reason, const bool transient, const HeaderValueMap extra )
150 {
151 MIL_PRV << "Sending provideFailed for request " << id << " err: " << reason << std::endl;
152 auto msg = ProvideMessage::createErrorResponse ( id, code, reason, transient );
153 for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
154 for ( const auto &val : i->second )
155 msg.addValue( i->first, val );
156 }
157 if ( !_stream->sendMessage( msg ) ) {
158 ERR << "Failed to send ProvideFailed message" << std::endl;
159 }
160 }
161
162
163 void ProvideWorker::provideFailed ( const uint32_t id, const ProvideMessage::Code code, const bool transient, const zypp::Exception &e )
164 {
166 if ( !e.historyEmpty() ) {
168 }
169 provideFailed( id
170 , code
171 , e.asUserString()
172 , transient
173 , extra );
174 }
175
176
177 void ProvideWorker::attachSuccess(const uint32_t id, const std::optional<std::string> &localMountPoint)
178 {
179 MIL_PRV << "Sending attachSuccess for request " << id << std::endl;
180 if ( !_stream->sendMessage( ProvideMessage::createAttachFinished ( id, localMountPoint ) ) ) {
181 ERR << "Failed to send AttachFinished message" << std::endl;
182 } else {
183 MIL << "Sent back attach success" << std::endl;
184 }
185 }
186
187 void ProvideWorker::detachSuccess(const uint32_t id)
188 {
189 MIL_PRV << "Sending detachSuccess for request " << id << std::endl;
190 if ( !_stream->sendMessage( ProvideMessage::createDetachFinished ( id ) ) ) {
191 ERR << "Failed to send DetachFinished message" << std::endl;
192 }
193 }
194
195 expected<ProvideMessage> ProvideWorker::sendAndWaitForResponse( const ProvideMessage &request , const std::vector<uint> &responseCodes )
196 {
197 // make sure immediateShutdown is not called while we are blocking here
200
201 if ( !_stream->sendMessage( request ) )
202 return expected<ProvideMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send message")) );
203
204 // flush the io device, this will block until all bytes are written
205 _controlIO->flush();
206
207 while ( !_fatalError ) {
208
209 const auto &msg = _stream->nextMessageWait() | [&]( auto &&nextMessage ) {
210 if ( !nextMessage ) {
211 if ( _fatalError )
213 else
214 return expected<zypp::PluginFrame>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to wait for response")) );
215 }
216 return expected<zypp::PluginFrame>::success( std::move(*nextMessage) );
217 } | and_then ( [&]( auto && m) {
218 return parseReceivedMessage(m);
219 } );
220
221 if ( !msg ) {
222 ERR << "Failed to receive message" << std::endl;
223 return msg;
224 }
225
226 if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
227 return msg;
228 }
229
230 // remember other messages for later
231 MIL << "Remembering message for later: " << msg->code () << std::endl;
232 _pendingMessages.push_back(*msg);
233 _msgAvail->start(0);
234 }
236 }
237
238 ProvideWorker::MediaChangeRes ProvideWorker::requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc )
239 {
240 return sendAndWaitForResponse( ProvideMessage::createMediaChangeRequest ( id, label, mediaNr, devices, desc ), { ProvideMessage::Code::MediaChanged, ProvideMessage::Code::MediaChangeAbort, ProvideMessage::Code::MediaChangeSkip } )
241 | [&]( expected<ProvideMessage> &&m ) {
242 if ( !m ) {
243 MIL << "Failed to wait for message, aborting the request " << std::endl;
245 }
246 MIL << "Wait finished, with messages still pending: " << this->_pendingMessages.size() << " and provs still pending: " << this->_pendingProvides.size() << std::endl;
247 if ( m->code() == ProvideMessage::Code::MediaChanged )
249 else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
251 else
253 };
254 }
255
256 expected<AuthInfo> ProvideWorker::requireAuthorization( const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername, const int64_t lastTimestamp, const std::map<std::string, std::string> &extraFields )
257 {
258 return sendAndWaitForResponse( ProvideMessage::createAuthDataRequest( id, url, lastTriedUsername, lastTimestamp, extraFields ), { ProvideMessage::Code::AuthInfo, ProvideMessage::Code::NoAuthData } )
259 | and_then( [&]( ProvideMessage &&m ) {
260 if ( m.code() == ProvideMessage::Code::AuthInfo ) {
261
262 AuthInfo inf;
263 for( const auto &hdr : m.headers () ) {
264 if ( hdr.first == AuthInfoMsgFields::Username ) {
265 inf.username = hdr.second.asString();
266 } else if ( hdr.first == AuthInfoMsgFields::Password ) {
267 inf.password = hdr.second.asString();
268 } else if ( hdr.first == AuthInfoMsgFields::AuthTimestamp ) {
269 inf.last_auth_timestamp = hdr.second.asInt64();
270 } else {
271 if ( !hdr.second.isString() ) {
272 ERR << "Ignoring invalid extra value, " << hdr.first << " is not of type string" << std::endl;
273 }
274 inf.extraKeys[hdr.first] = hdr.second.asString();
275 }
276 }
277 return expected<AuthInfo>::success(inf);
278
279 }
281 });
282 }
283
285 {
286 return *_controlIO.get();
287 }
288
290 {
291 const auto &helo = _stream->nextMessageWait();
292 if ( !helo ) {
293 ERR << "Could not receive a handshake message, aborting" << std::endl;
294 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to receive handshake message")) );;
295 }
296
297 auto exp = _stream->parseMessage<zyppng::worker::Configuration>( *helo );
298 if ( !exp ) {
299 invalidMessageReceived( exp.error() );
300 return expected<void>::error(exp.error());
301 }
302
303 return std::move(*exp) | [&]( auto conf ) {
304
305 _workerConf = std::move(conf);
306
307 auto &mediaConf = zypp::MediaConfig::instance();
308 for( const auto &[key,value] : _workerConf ) {
309 zypp::Url keyUrl( key );
310 if ( keyUrl.getScheme() == "zconfig" && keyUrl.getAuthority() == "main" ) {
311 mediaConf.setConfigValue( keyUrl.getAuthority(), zypp::Pathname(keyUrl.getPathName()).basename(), value );
312 }
313 }
314
315 return initialize( _workerConf ) | and_then([&]( WorkerCaps &&caps ){
316
317 caps.set_worker_name( _workerName.data() );
318
319 caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
320 if ( !_stream->sendMessage ( caps ) ) {
321 return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send capabilities")) );
322 }
323 return expected<void>::success ();
324 });
325 };
326 }
327
329 {
330 if ( _fatalError )
331 return;
332
333 while ( _pendingMessages.size () ) {
334 auto m = _pendingMessages.front ();
335 _pendingMessages.pop_front ();
337 }
338
339 if ( !_fatalError && _pendingProvides.size() ) {
340 provide();
341 }
342
343 // keep poking until there are no provides anymore
344 if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
345 _msgAvail->start(0);
346 }
347
348 }
349
351 {
352 if ( _inControllerRequest ) {
353 _delayedShutdown->start(0);
354 return;
355 }
356
358 _loop->quit ();
359 }
360
362 {
363 MIL << "Read FD closed, exiting." << std::endl;
365 }
366
368 {
369 MIL << "Write FD closed, exiting." << std::endl;
371 }
372
374 {
375 while ( auto message = _stream->nextMessage() ) {
376 if ( _fatalError )
377 break;
378 pushSingleMessage(*message);
379 }
380 }
381
383 {
384 invalidMessageReceived( std::exception_ptr() );
385 }
386
387 void ProvideWorker::invalidMessageReceived( std::exception_ptr p )
388 {
389 ERR << "Received a invalid message on the input stream, aborting" << std::endl;
390 if ( p )
391 _fatalError = p;
392 else
395 _loop->quit();
396 }
397
399 {
400 const auto code = provide.code();
401 // we only accept requests here
402 if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
403
404 MIL_PRV << "Received request: " << code << std::endl;
405
406 if ( code == ProvideMessage::Code::Cancel ) {
407 const auto &i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [ id = provide.requestId() ]( const auto &it ){ return it->_spec.requestId() == id; } );
408 if ( i != _pendingProvides.end() ) {
409 switch ( (*i)->_state ) {
411 _stream->sendMessage ( ProvideMessage::createErrorResponse ( provide.requestId (), ProvideMessage::Code::Cancelled, "Cancelled by user." ) );
412 _pendingProvides.erase(i);
413 break;
415 cancel(i);
416 break;
418 break;
419 }
420 MIL << "Received Cancel for unknown request: " << provide.requestId() << ", ignoring!" << std::endl;
421 }
422 return;
423 }
424
426 return;
427 }
428 ERR << "Unsupported request with code: " << code << " received!" << std::endl;
429 }
430
432 {
433 const auto &handle = [&]( const zypp::PluginFrame &message ){
434 return parseReceivedMessage (message )
435 | and_then( [&]( ProvideMessage &&provide ){
436 _pendingMessages.push_back(provide);
437 _msgAvail->start(0);
439 })
440 | or_else( [&]( std::exception_ptr ) -> expected<void> {
441 return expected<void>::error( ZYPP_EXCPT_PTR( std::invalid_argument(zypp::str::Str()<<"Unknown message received: " << message.command() )) );
442 });
443 };
444
445 const auto &exp = handle( message );
446 if ( !exp ) {
447 try {
448 std::rethrow_exception ( exp.error () );
449 } catch ( const zypp::Exception &e ) {
450 ERR << "Catched exception during message handling: " << e << std::endl;
451 } catch ( const std::exception &e ) {
452 ERR << "Catched exception during message handling: " << e.what()<< std::endl;
453 } catch ( ... ) {
454 ERR << "Unknown Exception during message handling" << std::endl;
455 }
456 }
457 }
458
460 {
461 auto exp = ProvideMessage::create(m);
462 if ( !exp )
463 invalidMessageReceived( exp.error() );
464 return exp;
465 }
466}
Assign a vaiable a certain value when going out of scope.
Definition dtorreset.h:50
Base class for Exception.
Definition Exception.h:153
std::string asUserString() const
Translated error message as string suitable for the user.
Definition Exception.cc:131
std::string historyAsString() const
The history as string.
Definition Exception.cc:195
const char * what() const override
Return message string.
Definition Exception.h:322
bool historyEmpty() const
Whether the history list is empty.
Definition Exception.h:273
static MediaConfig & instance()
Command frame for communication with PluginScript.
Definition PluginFrame.h:42
const std::string & command() const
Return the frame command.
Url manipulation class.
Definition Url.h:93
std::string getScheme() const
Returns the scheme name of the URL.
Definition Url.cc:551
std::string getAuthority() const
Returns the encoded authority component of the URL.
Definition Url.cc:559
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
Definition Url.cc:622
void logToStdErr()
Log to std::err.
static LogControl instance()
Singleton access.
Definition LogControl.h:102
void enableLogForwardingMode(bool enable=true)
const std::string & asString() const
String representation.
Definition Pathname.h:93
std::string basename() const
Return the last component of this path.
Definition Pathname.h:130
bool empty() const
Test for an empty path.
Definition Pathname.h:116
Just inherits Exception to separate media exceptions.
MediaException()
Ctor taking message.
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition base.h:163
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition base.h:142
ValueMap::iterator beginList()
ValueMap::iterator endList()
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
static ProvideMessage createDetachFinished(const uint32_t reqId)
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
static ProvideMessage createAttachFinished(const uint32_t reqId, const std::optional< std::string > &localMountPoint={})
SignalProxy< void()> sigInvalidMessageReceived()
SignalProxy< void()> sigMessageReceived()
static Ptr create(IODevice::Ptr iostr)
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
static expected success(ConsParams &&...params)
Definition expected.h:115
void detachSuccess(const uint32_t id)
std::deque< ProvideWorkerItemRef > _pendingProvides
void handleSingleMessage(const ProvideMessage &provide)
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
AsyncDataSource & controlIO()
void attachSuccess(const uint32_t id, const std::optional< std::string > &localMountPoint={})
ProvideWorker(std::string_view workerName)
expected< void > executeHandshake()
AsyncDataSource::Ptr _controlIO
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
StompFrameStreamRef messageStream() const
virtual void cancel(const std::deque< ProvideWorkerItemRef >::iterator &request)=0
std::deque< ProvideMessage > _pendingMessages
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
void pushSingleMessage(const zypp::PluginFrame &msg)
expected< ProvideMessage > parseReceivedMessage(const zypp::PluginFrame &m)
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
ProvideNotificatioMode provNotificationMode() const
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
ProviderConfiguration _workerConf
void provideFailed(const uint32_t id, const ProvideMessage::Code code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
virtual expected< WorkerCaps > initialize(const Configuration &conf)=0
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
StompFrameStreamRef _stream
std::deque< ProvideWorkerItemRef > & requestQueue()
ProvideNotificatioMode _provNotificationMode
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
void invalidMessageReceived(std::exception_ptr p)
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
std::exception_ptr _fatalError
Url details namespace.
Definition UrlBase.cc:58
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< void > OnScopeExit
constexpr std::string_view Password("password")
constexpr std::string_view Username("username")
constexpr std::string_view AuthTimestamp("auth_timestamp")
constexpr std::string_view History("history")
auto or_else(Fun &&function)
Definition expected.h:630
auto and_then(Fun &&function)
Definition expected.h:623
zyppng::ProviderConfiguration Configuration
zyppng::WorkerCaps WorkerCaps
#define MIL_PRV
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
Definition String.h:213
void setName(T &&name)
static ZYPP_API ThreadData & current()
Definition threaddata.cc:16
std::map< std::string, std::string > extraKeys
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition Exception.h:463
#define MIL
Definition Logger.h:100
#define ERR
Definition Logger.h:102