libzypp  17.36.7
asyncdatasource.cpp
Go to the documentation of this file.
2 
4 #include <zypp-core/zyppng/base/AutoDisconnect>
5 #include <zypp-core/zyppng/base/EventDispatcher>
7 
8 namespace zyppng {
9 
10  void AsyncDataSourcePrivate::notifierActivated( const SocketNotifier &notify, int evTypes )
11  {
12  if ( _writeNotifier.get() == &notify ) {
13  if ( evTypes & SocketNotifier::Error ) {
14  DBG << "Closing due to error when polling" << std::endl;
16  return;
17  }
18  readyWrite();
19  } else {
20 
21  auto dev = std::find_if( _readFds.begin(), _readFds.end(),
22  [ &notify ]( const auto &dev ){ return ( dev._readNotifier.get() == &notify ); } );
23 
24  if ( dev == _readFds.end() ) {
25  return;
26  }
27 
28  readyRead( std::distance( _readFds.begin(), dev ) );
29  }
30  }
31 
32  void AsyncDataSourcePrivate::readyRead( uint channel )
33  {
34  auto bytesToRead = z_func()->rawBytesAvailable( channel );
35  if ( bytesToRead == 0 ) {
36  // make sure to check if bytes are available even if the ioctl call returns something different
37  bytesToRead = 4096;
38  }
39 
40  auto &_readBuf = _readChannels[channel];
41  char *buf = _readBuf.reserve( bytesToRead );
42  const auto bytesRead = z_func()->readData( channel, buf, bytesToRead );
43 
44  if ( bytesRead <= 0 ) {
45  _readBuf.chop( bytesToRead );
46 
47  switch( bytesRead ) {
48  // remote close , close the read channel
49  case 0: {
51  break;
52  }
53  // no data is available , just try again later
54  case -2: break;
55  // anything else
56  default:
57  case -1: {
59  break;
60  }
61  }
62  return;
63  }
64 
65  if ( bytesToRead > bytesRead )
66  _readBuf.chop( bytesToRead-bytesRead );
67 
68  if ( channel == _currentReadChannel )
69  _readyRead.emit();
70 
71  _channelReadyRead.emit( channel );
72  return;
73  }
74 
76  {
77  const auto nwrite = _writeBuffer.frontSize();
78  if ( !nwrite ) {
79  // disable Write notifications so we do not wake up without the need to
80  _writeNotifier->setEnabled( false );
81  return;
82  }
83 
84  const auto nBuf = _writeBuffer.front();
85  const auto written = eintrSafeCall( ::write, _writeFd, nBuf, nwrite );
86  if ( written == -1 ) {
87  switch ( errno ) {
88  case EACCES:
90  return;
91  case EAGAIN:
92 #if EAGAIN != EWOULDBLOCK
93  case EWOULDBLOCK:
94 #endif
95  return;
96  case EPIPE:
97  case ECONNRESET:
99  return;
100  default:
102  return;
103  }
104  return;
105  }
106  _writeBuffer.discard( written );
107  _sigBytesWritten.emit( written );
108 
109  if ( _writeBuffer.size() == 0 )
110  _sigAllBytesWritten.emit();
111  }
112 
114  {
115  bool sig = _writeFd >= 0;
116  _writeNotifier.reset();
117  _writeFd = -1;
119  _mode.unsetFlag( AsyncDataSource::WriteOnly );
120  if ( sig )
121  _sigWriteFdClosed.emit( reason );
122  }
123 
125  {
126  auto &readFd = _readFds[channel];
127  // we do not clear the read buffer so code has the opportunity to read whats left in there
128  bool sig = readFd._readFd >= 0;
129  readFd._readNotifier.reset();
130  readFd._readFd = -1;
131  if ( sig ) {
132  z_func()->finishReadChannel( channel );
133  _sigReadFdClosed.emit( channel, reason );
134  }
135  }
136 
138 
140  { }
141 
143  : IODevice(d)
144  {}
145 
147  {
148  return std::shared_ptr<AsyncDataSource>( new AsyncDataSource );
149  }
150 
151 
152  bool AsyncDataSource::openFds ( const std::vector<int>& readFds, int writeFd )
153  {
154  Z_D();
155 
156  if ( d->_mode != IODevice::Closed )
157  return false;
158 
159  IODevice::OpenMode mode;
160 
161  bool error = false;
162  for ( const auto readFd : readFds ) {
163  if ( readFd >= 0 ) {
164  mode |= IODevice::ReadOnly;
165  d->_readFds.push_back( {
166  readFd,
168  });
170  ERR << "Failed to set read FD to non blocking" << std::endl;
171  error = true;
172  break;
173  }
174  d->_readFds.back()._readNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
175  }
176  }
177 
178  if ( writeFd >= 0 && !error ) {
179  mode |= IODevice::WriteOnly;
181  ERR << "Failed to set write FD to non blocking" << std::endl;
182  error = true;
183  } else {
184  d->_writeFd = writeFd;
185  d->_writeNotifier = SocketNotifier::create( writeFd, SocketNotifier::Write | AbstractEventSource::Error, false );
186  d->_writeNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
187  }
188  }
189 
190  if( error || !IODevice::open( mode ) ) {
191  d->_mode = IODevice::Closed;
192  d->_readFds.clear();
193  d->_writeNotifier.reset();
194  d->_writeFd = -1;
195  return false;
196  }
197 
198  // make sure we have enough read buffers
199  setReadChannelCount( d->_readFds.size() );
200  return true;
201  }
202 
203  int64_t zyppng::AsyncDataSource::writeData( const char *data, int64_t count )
204  {
205  Z_D();
206  if ( count > 0 ) {
207  // we always use the write buffer, to make sure the fd is actually writeable
208  d->_writeBuffer.append( data, count );
209  d->_writeNotifier->setEnabled( true );
210  }
211  return count;
212  }
213 
214  int64_t zyppng::AsyncDataSource::readData( uint channel, char *buffer, int64_t bufsize )
215  {
216  Z_D();
217  if ( channel >= d->_readFds.size() ) {
218  ERR << constants::outOfRangeErrMsg << std::endl;
219  throw std::logic_error( constants::outOfRangeErrMsg.data() );
220  }
221  const auto read = eintrSafeCall( ::read, d->_readFds[channel]._readFd, buffer, bufsize );
222  if ( read < 0 ) {
223  switch ( errno ) {
224  #if EAGAIN != EWOULDBLOCK
225  case EWOULDBLOCK:
226  #endif
227  case EAGAIN: {
228  return -2;
229  }
230  default:
231  break;
232  }
233  }
234  return read;
235  }
236 
237  int64_t AsyncDataSource::rawBytesAvailable( uint channel ) const
238  {
239  Z_D();
240 
241  if ( channel >= d->_readFds.size() ) {
242  ERR << constants::outOfRangeErrMsg << std::endl;
243  throw std::logic_error( constants::outOfRangeErrMsg.data() );
244  }
245 
246  if ( isOpen() && canRead() )
247  return zyppng::bytesAvailableOnFD( d->_readFds[channel]._readFd );
248  return 0;
249  }
250 
252  {
253  Z_D();
254  if ( channel >= d->_readFds.size() ) {
255  ERR << constants::outOfRangeErrMsg << std::endl;
256  throw std::logic_error( constants::outOfRangeErrMsg.data() );
257  }
258  }
259 
261  {
262  Z_D();
263  for( uint i = 0; i < d->_readFds.size(); ++i ) {
264  auto &readChan = d->_readFds[i];
265  readChan._readNotifier.reset();
266  if ( readChan._readFd >= 0) {
267  finishReadChannel(i);
268  d->_sigReadFdClosed.emit( i, UserRequest );
269  }
270  }
271  d->_readFds.clear();
272 
273  d->_writeNotifier.reset();
274  d->_writeBuffer.clear();
275  if ( d->_writeFd >= 0 ) {
276  d->_writeFd = -1;
277  d->_sigWriteFdClosed.emit( UserRequest );
278  }
279 
280  IODevice::close();
281  }
282 
284  {
285  Z_D();
286 
287  // if we are open writeOnly, simply call close();
288  if ( !canRead() ) {
289  close();
290  return;
291  }
292 
293  d->_mode = ReadOnly;
294  d->_writeNotifier.reset();
295  d->_writeBuffer.clear();
296 
297  if ( d->_writeFd >= 0 ) {
298  d->_writeFd = -1;
299  d->_sigWriteFdClosed.emit( UserRequest );
300  }
301  }
302 
303  bool AsyncDataSource::waitForReadyRead( uint channel, int timeout )
304  {
305  Z_D();
306  if ( !canRead() )
307  return false;
308 
309  if ( channel >= d->_readFds.size() ) {
310  ERR << constants::outOfRangeErrMsg << std::endl;
311  throw std::logic_error( constants::outOfRangeErrMsg.data() );
312  }
313 
314  bool gotRR = false;
315  auto rrConn = AutoDisconnect( d->_channelReadyRead.connect([&]( uint activated ){
316  gotRR = ( channel == activated );
317  }) );
318 
319  // we can only wait if we are open for reading and still have a valid fd
320  auto &channelRef = d->_readFds[ channel ];
321  while ( readFdOpen(channel) && canRead() && !gotRR ) {
322  int rEvents = 0;
323  if ( EventDispatcher::waitForFdEvent( channelRef._readFd, AbstractEventSource::Read | AbstractEventSource::Error , rEvents, timeout ) ) {
324  //simulate signal from read notifier
325  d->notifierActivated( *channelRef._readNotifier, rEvents );
326  } else {
327  //timeout
328  return false;
329  }
330  }
331  return gotRR;
332  }
333 
335  {
336  Z_D();
337  if ( !canWrite() )
338  return;
339 
340  int timeout = -1;
341  while ( canWrite() && d->_writeBuffer.frontSize() ) {
342  int rEvents = 0;
344  //simulate signal from write notifier
345  d->readyWrite();
346  } else {
347  //timeout
348  return;
349  }
350  }
351  }
352 
354  {
355  return d_func()->_sigWriteFdClosed;
356  }
357 
359  {
360  return d_func()->_sigReadFdClosed;
361  }
362 
364  {
365  Z_D();
366  if ( !d->_readChannels.size() )
367  return false;
368  return readFdOpen( d_func()->_currentReadChannel );
369  }
370 
371  bool AsyncDataSource::readFdOpen(uint channel) const
372  {
373  Z_D();
374  if ( channel >= d->_readFds.size() ) {
375  ERR << constants::outOfRangeErrMsg << std::endl;
376  throw std::logic_error( constants::outOfRangeErrMsg.data() );
377  }
378  auto &channelRef = d->_readFds[ channel ];
379  return ( channelRef._readNotifier && channelRef._readFd >= 0 );
380  }
381 
383  {
384  return d_func()->_writeBuffer.size();
385  }
386 
387 }
bool canRead() const
Definition: iodevice.cc:85
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
virtual bool open(const OpenMode mode)
Definition: iodevice.cc:16
void notifierActivated(const SocketNotifier &notify, int evTypes)
void setReadChannelCount(uint channels)
Definition: iodevice.cc:37
Signal< void(AsyncDataSource::ChannelCloseReason)> _sigWriteFdClosed
char * front()
Definition: iobuffer.cc:35
bool isOpen() const
Definition: iodevice.cc:95
static Ptr create(int socket, int evTypes, bool enable=true)
int64_t size() const
Definition: iobuffer.cc:154
void closeReadChannel(uint channel, AsyncDataSource::ChannelCloseReason reason)
Signal< void()> _sigAllBytesWritten
Definition: iodevice_p.h:47
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
#define ERR
Definition: Logger.h:102
std::vector< ReadChannelDev > _readFds
#define Z_D()
Definition: zyppglobal.h:105
Signal< void(uint, AsyncDataSource::ChannelCloseReason)> _sigReadFdClosed
void closeWriteChannel(AsyncDataSource::ChannelCloseReason reason)
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
int64_t rawBytesAvailable(uint channel) const override
int64_t writeData(const char *data, int64_t count) override
Failed to block or unblock the fd.
IODevice::OpenMode _mode
Definition: iodevice_p.h:43
void readChannelChanged(uint channel) override
SocketNotifier::Ptr _writeNotifier
int64_t discard(int64_t bytes)
Definition: iobuffer.cc:55
int64_t frontSize() const
Definition: iobuffer.cc:43
constexpr std::string_view outOfRangeErrMsg("Channel index out of range")
virtual void closeWriteChannel()
auto eintrSafeCall(Fun &&function, Args &&... args)
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
ZYPP_IMPL_PRIVATE(UnixSignalSource)
int64_t bytesPending() const override
bool openFds(const std::vector< int > &readFds, int writeFd=-1)
std::map< std::string, std::string > read(const Pathname &_path)
Read sysconfig file path_r and return (key,valye) pairs.
Definition: sysconfig.cc:34
virtual void close()
Definition: iodevice.cc:30
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
bool write(const Pathname &path_r, const std::string &key_r, const std::string &val_r, const std::string &newcomment_r)
Add or change a value in sysconfig file path_r.
Definition: sysconfig.cc:80
Signal< void(uint) > _channelReadyRead
Definition: iodevice_p.h:45
bool canWrite() const
Definition: iodevice.cc:90
Signal< void() > _readyRead
Definition: iodevice_p.h:44
std::vector< IOBuffer > _readChannels
Definition: iodevice_p.h:39
Signal< void(int64_t)> _sigBytesWritten
Definition: iodevice_p.h:46
int64_t bytesAvailableOnFD(int fd)
Definition: linuxhelpers.cc:62
std::shared_ptr< AsyncDataSource > Ptr
BlockingMode setFDBlocking(int fd, bool mode)
Definition: IOTools.cc:31
#define DBG
Definition: Logger.h:99
bool waitForReadyRead(uint channel, int timeout) override