libzypp 17.37.17
stompframestream.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9
10#include "stompframestream.h"
11#include <zypp-core/ByteCount.h>
13
15#include <zypp-core/zyppng/base/AutoDisconnect>
16
17namespace zyppng {
18
19 constexpr auto MAX_CMDLEN = 256;
20 constexpr auto MAX_HDRLEN = 8 * 1024; // we might send long paths in headers
21 constexpr auto MAX_BODYLEN = 1024 * 1024; // 1Mb for now, we do not want to use up all the memory
22
24 : zypp::Exception( zypp::str::Str() << "Invalid Message received: (" << msg <<")" )
25 { }
26
36
38 {
39 const auto &parseError = [this](){
41 _pendingMessage.reset();
42 _pendingBodyLen.reset();
44 };
45
46 // ATTENTION: Remember to also update the parser logic in zypp-core/rpc/PluginFrame.cc
47 // if code here is changed or features are added.
48
49 // loop until we have a full message, or we have no more data to read
50 while(true) {
51 switch( _parserState ) {
52 case ParseError: {
53 // we got a parse error before, try to recover by reading until the next \0
54 bool gotTerm = false;
55 while ( _ioDev->bytesAvailable ( ) ) {
56 auto d = _ioDev->read (1);
57 if ( !d.size() )
58 break;
59
60 if ( d.front () == '\0' ){
61 gotTerm = true;
63 break;
64 }
65 }
66
67 if ( gotTerm )
68 continue;
69
70 return false;
71 }
72 case ReceiveCommand: {
73 const auto ba = _ioDev->readBufferCount();
74 if ( !_ioDev->canReadLine() ) {
75 if ( ba > MAX_CMDLEN ) {
76 ERR << "Received malformed message from peer, CMD line exceeds: " << MAX_CMDLEN << " bytes" << std::endl;
77 parseError();
78 continue;
79 }
80 return false;
81 }
82
83 ByteArray command = _ioDev->readLine( MAX_CMDLEN );
84 command.pop_back(); // remove \n
85 if ( command.empty() ) {
86 // STOMP spec says multiple EOLs after a message are allowed, so we just ignore empty lines
87 // if they happen before a new frame starts
88 WAR << "Received empty line before command, ignoring" << std::endl;
89 return false;
90 }
91
92 if ( !_pendingMessage )
94 _pendingMessage->setCommand( command.asString() );
96
97 break;
98 }
99
100 case ReceiveHeaders: {
101 const auto ba = _ioDev->readBufferCount();
102 if ( !_ioDev->canReadLine() ) {
103 if ( ba > MAX_HDRLEN ) {
104 ERR << "Received malformed message from peer, header line exceeds: " << MAX_HDRLEN << " bytes" << std::endl;
105 parseError();
106 continue;
107 }
108 return false;
109 }
110
111 ByteArray header = _ioDev->readLine( MAX_HDRLEN );
112 header.pop_back(); // remove \n
113 if ( header.empty () ) {
114 // --> empty line sep. header and body
116
117 // if we received a content-length header we set the flag for the body parser to know it has to read
118 // n bytes before expecting the \0 terminator
119 const auto &contentLen = _pendingMessage->getHeaderNT( zypp::PluginFrame::contentLengthHeader(), std::string() );
120 std::optional<uint64_t> cLen;
121 if ( !contentLen.empty() ) {
122 cLen = zyppng::str::safe_strtonum<uint64_t>(contentLen);
123 if ( !cLen ) {
124 ERR << "Received malformed message from peer: Invalid value for " << zypp::PluginFrame::contentLengthHeader() << ":" << contentLen << std::endl;
125 parseError();
126 continue;
127 }
128#if 0
129 if ( (*cLen) > MAX_BODYLEN ) {
130 ERR << "Message body exceeds maximum length: " << zypp::ByteCount( *cLen ) << " vs " << zypp::ByteCount( MAX_BODYLEN ) << std::endl;
131 parseError();
132 continue;
133 }
134#endif
135
136 _pendingBodyLen = *cLen;
138 }
139 } else {
140 try {
141 _pendingMessage->addRawHeader ( header );
142 } catch ( const zypp::Exception &e ) {
143 ZYPP_CAUGHT(e);
144 ERR << "Received malformed message from peer, header format invalid: " << header.asStringView() << " (" << e << ")" << std::endl;
145 parseError();
146 continue;
147 }
148 }
149 break;
150 }
151
152 case ReceiveBody: {
153
154 ByteArray body;
155 if ( _pendingBodyLen ) {
156 // we need to read the required body bytes plus the terminating \0
157 const auto reqBytes = (*_pendingBodyLen) + 1;
158 if ( _ioDev->bytesAvailable() < reqBytes )
159 return false;
160
161 body = _ioDev->read( reqBytes );
162 if ( body.back () != '\0' ) {
163 ERR << "Received malformed message from peer: Body was not terminated with \\0" << std::endl;
164 parseError();
165 continue;
166 }
167
168 body.pop_back (); // remove \0
169
170 } else {
171 // we do not know the body size, need to read until \0
172 const auto ba = _ioDev->readBufferCount();
173 if ( !_ioDev->canReadUntil( _ioDev->currentReadChannel (), '\0' ) ) {
174 if ( ba > MAX_BODYLEN ) {
175 ERR << "Message body exceeds maximum length: " << zypp::ByteCount( _ioDev->readBufferCount() ) << " vs " << zypp::ByteCount( MAX_BODYLEN ) << std::endl;
176 parseError();
177 continue;
178 }
179 return false;
180 }
181
182 body = _ioDev->channelReadUntil( _ioDev->currentReadChannel (), '\0' );
183 body.pop_back(); // remove the \0
184 }
185
186 // if we reach this place we have a full message, store the body and lets go)
187 _pendingMessage->setBody( std::move(body) );
188
189 _messages.emplace_back( std::move(*_pendingMessage) );
190 _pendingMessage.reset();
191 _pendingBodyLen.reset();
193
194 _sigNextMessage.emit ();
195
196 if ( _messages.size() ) {
197 // nag the user code until all messages have been used up
198 _nextMessageTimer->start(0);
199 }
200
201 // once we have a message, exit the loop so other things can be done
202 return true;
203 }
204 }
205 }
206 }
207
209 {
210 if ( _messages.size() )
211 _sigNextMessage.emit();
212
213 if ( !_messages.size() )
214 _nextMessageTimer->stop();
215 }
216
217 std::optional<zypp::PluginFrame> zyppng::StompFrameStream::nextMessage( const std::string &msgName )
218 {
219 if ( !_messages.size () ) {
220
221 // try to read the next messages from the fd
222 {
223 _sigNextMessage.block ();
224 zypp::OnScopeExit unblock([&](){
225 _sigNextMessage.unblock();
226 });
228 }
229
230 if ( !_messages.size () )
231 return {};
232 }
233
234 std::optional<zypp::PluginFrame> res;
235
236 if( msgName.empty() ) {
237 res = std::move( _messages.front () );
238 _messages.pop_front();
239
240 } else {
241 const auto i = std::find_if( _messages.begin(), _messages.end(), [&]( const zypp::PluginFrame &msg ) {
242 return msg.command() == msgName;
243 });
244
245 if ( i != _messages.end() ) {
246 res = std::move(*i);
247 _messages.erase(i);
248 }
249 }
250
251 if ( _messages.size() )
252 _nextMessageTimer->start(0);
253 else
254 _nextMessageTimer->stop();
255
256 return res;
257 }
258
259 std::optional<zypp::PluginFrame> StompFrameStream::nextMessageWait( const std::string &msgName )
260 {
261 // make sure the signal is not emitted until we have the next message
262 _sigNextMessage.block ();
263 zypp::OnScopeExit unblock([&](){
264 _sigNextMessage.unblock();
265 });
266
267 bool receivedInvalidMsg = false;
269 receivedInvalidMsg = true;
270 }));
271
272 const bool hasMsgName = msgName.size();
273 while ( !receivedInvalidMsg && _ioDev->isOpen() && _ioDev->canRead() ) {
274 if ( _messages.size() ) {
275 if ( hasMsgName ) {
276 std::optional<zypp::PluginFrame> msg = nextMessage(msgName);
277 if ( msg ) return msg;
278 }
279 else {
280 break;
281 }
282 }
283
284 if ( !_ioDev->waitForReadyRead ( -1 ) ) {
285 // this can only mean that a error happened, like device was closed
286 return {};
287 }
288 }
289 return nextMessage (msgName);
290 }
291
293 {
294 if ( !_ioDev->canWrite () )
295 return false;
296
297 try {
298 IODeviceOStreamBuf ostrbuf(_ioDev);
299 std::ostream output(&ostrbuf);
300 env.writeTo ( output );
301 } catch ( const zypp::Exception &e ) {
302 ZYPP_CAUGHT(e);
303 ERR << "Failed to serialize message to stream" << std::endl;
304 return false;
305 }
306
307 return true;
308 }
309
314
319
321 {
322 bool cont = true;
323 while ( cont && _ioDev->bytesAvailable() ) {
324 cont = readNextMessage ();
325 }
326 }
327}
std::string asString() const
Definition ByteArray.h:24
Store and operate with byte count.
Definition ByteCount.h:32
Base class for Exception.
Definition Exception.h:153
Exception()
Default ctor.
Definition Exception.cc:94
const std::string & msg() const
Return the message string provided to the ctor.
Definition Exception.h:206
Command frame for communication with PluginScript.
Definition PluginFrame.h:42
static const std::string & contentLengthHeader()
"content-lenght" header name
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition base.h:163
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition base.h:142
SignalProxy< void()> sigReadyRead()
Definition iodevice.cc:368
std::shared_ptr< IODevice > Ptr
Definition iodevice.h:45
InvalidMessageReceivedException(const std::string &msg={})
SignalProxy< void()> sigInvalidMessageReceived()
std::optional< int64_t > _pendingBodyLen
Signal< void()> _sigNextMessage
std::optional< zypp::PluginFrame > nextMessage(const std::string &msgName="")
void timeout(const zyppng::Timer &)
std::deque< zypp::PluginFrame > _messages
StompFrameStream(IODevice::Ptr iostr)
std::optional< zypp::PluginFrame > _pendingMessage
SignalProxy< void()> sigMessageReceived()
std::optional< zypp::PluginFrame > nextMessageWait(const std::string &msgName="")
enum zyppng::StompFrameStream::ParserState _parserState
Signal< void()> _sigInvalidMessageReceived
bool sendFrame(const zypp::PluginFrame &message)
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
Definition Arch.h:364
String related utilities and Regular expression matching.
Namespace intended to collect all environment variables we use.
Definition Env.h:25
Iostream related utilities.
Definition IOStream.cc:26
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< void > OnScopeExit
std::optional< T > safe_strtonum(const std::string_view &val)
Definition string.h:23
constexpr auto MAX_HDRLEN
constexpr auto MAX_CMDLEN
constexpr auto MAX_BODYLEN
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition Exception.h:475
#define ERR
Definition Logger.h:102
#define WAR
Definition Logger.h:101