6#include <zypp-core/zyppng/base/EventDispatcher>
10#include <sys/socket.h>
74 const auto oldState =
state();
76 if ( oldState == newState )
121 auto wbOld = std::move( std::get<ConnectedState>(
_state)._writeBuffer );
129 if ( z_func()->canRead() )
130 z_func()->finishReadChannel( 0 );
136 z_func()->IODevice::close();
149 auto doDelayedConnect = [
this, &
state ](){
150 if ( !
state._connectNotifier ) {
155 if ( !
state._connectTimeout ) {
159 state._connectNotifier.reset();
160 state._connectTimeout.reset();
163 state._connectTimeout->setSingleShot(
true );
164 state._connectTimeout->start( 30000 );
172 if (
_targetAddr->nativeSockAddr()->sa_family == AF_UNIX ) {
174 return doDelayedConnect();
183 return doDelayedConnect();
214 if ( bytesToRead == 0 ) {
220 char *buf = readBuf.reserve( bytesToRead );
221 const auto bytesRead = z_func()->readData( 0, buf, bytesToRead );
223 if ( bytesRead <= 0 ) {
224 readBuf.chop( bytesToRead );
226 switch ( bytesRead ) {
245 if ( bytesToRead > bytesRead )
246 readBuf.chop( bytesToRead-bytesRead );
255 return std::visit( [
this](
auto &s ){
257 if constexpr ( std::is_same_v<T, ConnectedState> || std::is_same_v<T, ClosingState> ) {
258 const auto nwrite = s._writeBuffer.frontSize();
265 const auto nBuf = s._writeBuffer.front();
267 if ( written == -1 ) {
273#if EAGAIN != EWOULDBLOCK
287 s._writeBuffer.discard( written );
289 if ( s._writeBuffer.size() == 0 )
345 std::visit( [
this, &ev ] (
const auto &currState ) {
347 if constexpr ( std::is_same<ConnectingState, T>() ) {
349 if ( this->
_targetAddr->nativeSockAddr()->sa_family == AF_UNIX ) {
357 socklen_t errSize =
sizeof ( err );
358 ::getsockopt(
_socket, SOL_SOCKET, SO_ERROR, &err, &errSize );
360 if ( err == 0 || err == EISCONN ) {
363 if ( err == EINPROGRESS || err == EAGAIN || err == EALREADY )
371 }
else if constexpr ( std::is_same<ConnectedState, T>() ) {
388 }
else if constexpr ( std::is_same<ClosingState, T>() ) {
397 if ( currState._writeBuffer.size() == 0 ) {
402 }
else if constexpr ( std::is_same<ListeningState, T>() ) {
408 DBG <<
"Unexpected state on socket activation" << std::endl;
415 return std::visit([](
const auto &s )
constexpr {
return s.type(); },
_state );
423 sptr->d_func()->_socket = fd;
426 if ( !sptr->setBlocking(
false ) ) {
427 DBG <<
"Failed to unblock socket." << std::endl;
431 if( sptr->d_func()->transition(
state ) )
445 if ( channel != 0 ) {
446 constexpr std::string_view msg(
"Socket does not support multiple read channels");
447 ERR << msg << std::endl;
448 throw std::logic_error( msg.data() );
450 return d_func()->rawBytesAvailable();
460 return Ptr(
new Socket( domain, type, protocol ) );
466 if ( !addr || !d->initSocket() )
470 if ( res >= 0)
return true;
512 if ( !d->initSocket() )
541 if ( d->_socket == -1 )
545 const auto res =
eintrSafeCall( ::accept4, d->_socket, (
struct sockaddr*)
nullptr, (socklen_t *)
nullptr, SOCK_CLOEXEC );
548#if EAGAIN != EWOULDBLOCK
568 socklen_t optlen =
sizeof(domain);
569 int res = getsockopt( fd, SOL_SOCKET, SO_DOMAIN, &domain, &optlen );
571 DBG <<
"Error querying socket domain: " <<
strerr_cxx() << std::endl;
577 optlen =
sizeof(protocol);
578 res = getsockopt( fd, SOL_SOCKET, SO_PROTOCOL, &protocol, &optlen );
580 DBG <<
"Error querying socket protocol: " <<
strerr_cxx() << std::endl;
586 optlen =
sizeof(type);
587 res = getsockopt( fd, SOL_SOCKET, SO_TYPE, &type, &optlen );
589 DBG <<
"Error querying socket type: " <<
strerr_cxx() << std::endl;
601 if ( !d->initSocket() )
604 const int oldFlags = fcntl( d->_socket, F_GETFL, 0 );
605 if (oldFlags == -1)
return false;
607 const int flags = set ? ( oldFlags & ~(O_NONBLOCK) ) : ( oldFlags | O_NONBLOCK );
610 if ( flags == oldFlags )
613 if ( fcntl( d->_socket, F_SETFL, flags ) != 0) {
626 if ( !d->initSocket() )
629 d->_targetAddr = std::move(addr);
647 auto sock = d->_socket;
673 std::visit([&d](
const auto &s ){
675 if constexpr ( std::is_same_v<Type, SocketPrivate::ConnectedState > ) {
677 if ( s._writeBuffer.size() ) {
693 auto &s = std::get<SocketPrivate::ConnectedState>( d->_state );
696 if ( s._writeBuffer.size() ) {
697 s._writeBuffer.append( data, count );
699 d->writePendingData();
703 auto written =
eintrSafeCall( ::send, d->_socket, data, count, MSG_NOSIGNAL );
704 if ( written == -1 ) {
706#if EAGAIN != EWOULDBLOCK
725 if ( written >= 0 ) {
726 if ( written < count ) {
728 s._writeBuffer.append( data + written, count - written );
732 d->_sigBytesWritten.emit( written );
735 if ( s._writeBuffer.size() == 0 )
736 d->_sigAllBytesWritten.emit();
750 d->onSocketActivated( rEvents );
763 bool canContinue =
true;
764 bool bufferEmpty =
false;
766 while ( canContinue && !bufferEmpty ) {
771 std::visit([&](
const auto &s ){
773 if constexpr ( std::is_same_v<T, SocketPrivate::ConnectedState> || std::is_same_v<T, SocketPrivate::ClosingState> ) {
774 if ( s._writeBuffer.size() > 0 ) {
779 d->onSocketActivated( rEvents );
782 if ( s._writeBuffer.size() == 0 ){
802 d->onSocketActivated( rEvents );
813 if ( channel != 0 ) {
814 constexpr std::string_view msg(
"Socket does not support multiple read channels");
815 ERR << msg << std::endl;
816 throw std::logic_error( msg.data() );
829 }
else if (
read < 0 ) {
831#if EAGAIN != EWOULDBLOCK
848 if ( channel != 0 ) {
849 constexpr std::string_view msg(
"Changing the readChannel on a Socket is not supported");
850 ERR << msg << std::endl;
851 throw std::logic_error( msg.data() );
858 return std::visit([&](
const auto &s ) -> int64_t {
860 if constexpr ( std::is_same_v<T, SocketPrivate::ConnectedState> || std::is_same_v<T, SocketPrivate::ClosingState> ) {
861 return s._writeBuffer.size();
869 return d_func()->state();
874 return d_func()->_incomingConnection;
879 return d_func()->_connected;
884 return d_func()->_disconnected;
889 return d_func()->_sigError;
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
Signal< void()> _sigAllBytesWritten
Signal< void(int64_t)> _sigBytesWritten
std::vector< IOBuffer > _readChannels
Signal< void() > _readyRead
Signal< void(uint) > _channelReadyRead
virtual int64_t bytesAvailable() const
ByteArray read(int64_t maxSize)
static Ptr create(int socket, int evTypes, bool enable=true)
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
Socket::SocketState state() const
static Socket::Ptr wrapSocket(int fd, int domain, int type, int protocol, Socket::SocketState state)
std::variant< InitialState, ConnectingState, ConnectedState, ListeningState, ClosingState, ClosedState > _state
bool transition(Socket::SocketState newState)
int64_t rawBytesAvailable() const
bool readRawBytesToBuffer()
bool handleConnectError(int error)
Signal< void()> _disconnected
void onSocketActivated(int ev)
std::shared_ptr< SockAddr > _targetAddr
Signal< void()> _connected
void setError(Socket::SocketError error, std::string &&err, bool emit=true)
void onSocketActivatedSlot(const SocketNotifier &, int ev)
Signal< void()> _incomingConnection
Signal< void(Socket::SocketError)> _sigError
Socket::SocketError _error
SocketError lastError() const
SignalProxy< void()> sigConnected()
bool connect(std::shared_ptr< SockAddr > addr)
static Ptr fromSocket(int fd, SocketState state)
@ UnsupportedSocketOptions
@ InsufficientPermissions
@ ConnectionClosedByRemote
void readChannelChanged(uint channel) override
bool waitForReadyRead(uint channel, int timeout=-1) override
static Ptr create(int domain, int type, int protocol)
SignalProxy< void()> sigDisconnected()
SignalProxy< void()> sigIncomingConnection()
bool waitForAllBytesWritten(int timeout=-1)
bool listen(int backlog=50)
SignalProxy< void(Socket::SocketError)> sigError()
Socket(int domain, int type, int protocol)
bool setBlocking(const bool set=true)
int64_t writeData(const char *data, int64_t count) override
int64_t bytesPending() const override
bool waitForConnected(int timeout=-1)
std::shared_ptr< Socket > Ptr
bool bind(const std::shared_ptr< SockAddr > &addr)
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
int64_t rawBytesAvailable(uint channel=0) const override
SocketState state() const
static std::shared_ptr< Timer > create()
Creates a new Timer object, the timer is not started at this point.
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
typename decay< T >::type decay_t
auto eintrSafeCall(Fun &&function, Args &&... args)
int64_t bytesAvailableOnFD(int fd)
std::string strerr_cxx(const int err=-1)
ClosingState(IOBuffer &&writeBuffer)
SocketNotifier::Ptr _socketNotifier
SocketNotifier::Ptr _socketNotifier
SocketNotifier::Ptr _socketNotifier
#define ZYPP_IMPL_PRIVATE(Class)