XRootD
Loading...
Searching...
No Matches
XrdClPollerBuiltIn.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClSocket.hh"
31#include "XrdSys/XrdSysE2T.hh"
33
34namespace
35{
36 //----------------------------------------------------------------------------
37 // A helper struct passed to the callback as a custom arg
38 //----------------------------------------------------------------------------
39 struct PollerHelper
40 {
41 PollerHelper():
42 channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43 readTimeout(0), writeTimeout(0)
44 {}
45 XrdSys::IOEvents::Channel *channel;
46 XrdSys::IOEvents::CallBack *callBack;
47 bool readEnabled;
48 bool writeEnabled;
49 time_t readTimeout;
50 time_t writeTimeout;
51 };
52
53 //----------------------------------------------------------------------------
54 // Call back implementation
55 //----------------------------------------------------------------------------
56 class SocketCallBack: public XrdSys::IOEvents::CallBack
57 {
58 public:
59 enum CallBackFlags
60 {
61 kRunningCallBack = 1,
62 kIdSet = 2,
63 kWantDisable = 4,
64 kDisabled = 8
65 };
66
67 struct DisableControl
68 {
69 DisableControl() : pFlags( 0 ), pCnd( 0 ) { }
70 ~DisableControl() { }
71
72 //------------------------------------------------------------------------
73 // Want to disable further callbacks. If callback is currently running
74 // in a different thread from our caller, wait until callback finishes.
75 //------------------------------------------------------------------------
76 void DisableCallBack()
77 {
78 const int flags = pFlags.fetch_or( kWantDisable );
79 if( !(flags & kIdSet) ) return;
80 if( !(flags & kRunningCallBack) ) return;
81 XrdSysCondVarHelper lck( pCnd );
82 if( XrdSysThread::Same( XrdSysThread::ID(), pSelfId ) ) return;
83 while( !(pFlags.load() & kDisabled) ) pCnd.Wait();
84 }
85
86 std::atomic<int> pFlags;
87 XrdSysCondVar pCnd;
88 pthread_t pSelfId;
89 };
90
91 SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
92 pSocket( sock ), pHandler( sh )
93 {
94 pControl = std::make_shared<DisableControl>();
95 }
96
97 virtual ~SocketCallBack() {};
98
99 virtual bool Event( XrdSys::IOEvents::Channel *chP,
100 void *cbArg,
101 int evFlags )
102 {
103 using namespace XrdCl;
104 uint8_t ev = 0;
105
106 if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
107 if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
108 if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
109 if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
110
111 Log *log = DefaultEnv::GetLog();
112 if( unlikely(log->GetLevel() >= Log::DumpMsg) )
113 {
114 log->Dump( PollerMsg, "%s Got an event: %s",
115 pSocket->GetName().c_str(),
116 SocketHandler::EventTypeToString( ev ).c_str() );
117 }
118
119 int flags = pControl->pFlags.fetch_or( kRunningCallBack );
120 if( !( flags & kIdSet ) )
121 {
122 XrdSysCondVarHelper lck( pControl->pCnd );
123 pControl->pSelfId = XrdSysThread::ID();
124 flags = pControl->pFlags.fetch_or( kIdSet );
125 }
126 if( flags & kWantDisable )
127 {
128 XrdSysCondVarHelper lck( pControl->pCnd );
129 pControl->pFlags &= ~kRunningCallBack;
130 pControl->pFlags |= kDisabled;
131 pControl->pCnd.Broadcast();
132 return false;
133 }
134
135 //----------------------------------------------------------------------
136 // If the event handler calls RemoveSocket for itself this object may
137 // be destroyed during the Event call.
138 //----------------------------------------------------------------------
139 auto control = pControl;
140 pHandler->Event( ev, pSocket );
141
142 flags = control->pFlags.fetch_and( ~kRunningCallBack );
143 if( flags & kWantDisable )
144 {
145 XrdSysCondVarHelper lck( control->pCnd );
146 control->pFlags |= kDisabled;
147 control->pCnd.Broadcast();
148 return false;
149 }
150 return true;
151 }
152
153 std::shared_ptr<DisableControl> GetControl()
154 {
155 return pControl;
156 }
157
158 private:
159 XrdCl::Socket *pSocket;
160 XrdCl::SocketHandler *pHandler;
161 std::shared_ptr<DisableControl> pControl;
162 };
163}
164
165
166namespace XrdCl
167{
168 //----------------------------------------------------------------------------
169 // Initialize the poller
170 //----------------------------------------------------------------------------
172 {
173 return true;
174 }
175
176 //----------------------------------------------------------------------------
177 // Finalize the poller
178 //----------------------------------------------------------------------------
180 {
181 //--------------------------------------------------------------------------
182 // Clean up the channels
183 //--------------------------------------------------------------------------
184 SocketMap::iterator it;
185 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
186 {
187 PollerHelper *helper = (PollerHelper*)it->second;
188 if( helper->channel ) helper->channel->Delete();
189 delete helper->callBack;
190 delete helper;
191 }
192 pSocketMap.clear();
193
194 return true;
195 }
196
197 //------------------------------------------------------------------------
198 // Start polling
199 //------------------------------------------------------------------------
201 {
202 //--------------------------------------------------------------------------
203 // Start the poller
204 //--------------------------------------------------------------------------
205 using namespace XrdSys;
206
207 Log *log = DefaultEnv::GetLog();
208 log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
209 XrdSysMutexHelper scopedLock( pMutex );
210 int errNum = 0;
211 const char *errMsg = 0;
212
213 for( int i = 0; i < pNbPoller; ++i )
214 {
215 XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
216 if( !poller )
217 {
218 log->Error( PollerMsg, "Unable to create the internal poller object: "
219 "%s (%s)", XrdSysE2T( errno ), errMsg );
220 return false;
221 }
222 pPollerPool.push_back( poller );
223 }
224
225 pNext = pPollerPool.begin();
226
227 log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
228
229 //--------------------------------------------------------------------------
230 // Check if we have any descriptors to reinsert from the last time we
231 // were started
232 //--------------------------------------------------------------------------
233 SocketMap::iterator it;
234 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235 {
236 PollerHelper *helper = (PollerHelper*)it->second;
237 Socket *socket = it->first;
238 helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
239 helper->callBack );
240 if( helper->readEnabled )
241 {
242 bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
243 helper->readTimeout, &errMsg );
244 if( !status )
245 {
246 log->Error( PollerMsg, "Unable to enable read notifications "
247 "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
248
249 return false;
250 }
251 }
252
253 if( helper->writeEnabled )
254 {
255 bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
256 helper->writeTimeout, &errMsg );
257 if( !status )
258 {
259 log->Error( PollerMsg, "Unable to enable write notifications "
260 "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
261
262 return false;
263 }
264 }
265 }
266 return true;
267 }
268
269 //------------------------------------------------------------------------
270 // Stop polling
271 //------------------------------------------------------------------------
273 {
274 using namespace XrdSys::IOEvents;
275
276 Log *log = DefaultEnv::GetLog();
277 log->Debug( PollerMsg, "Stopping the poller..." );
278
279 XrdSysMutexHelper scopedLock( pMutex );
280
281 if( pPollerPool.empty() )
282 {
283 log->Debug( PollerMsg, "Stopping a poller that has not been started" );
284 return true;
285 }
286
287 while( !pPollerPool.empty() )
288 {
289 XrdSys::IOEvents::Poller *poller = pPollerPool.back();
290 if( *pNext == poller )
291 pNext = pPollerPool.begin();
292 pPollerPool.pop_back();
293
294 if( !poller ) continue;
295
296 scopedLock.UnLock();
297 poller->Stop();
298 delete poller;
299 scopedLock.Lock( &pMutex );
300 }
301 pNext = pPollerPool.end();
302 pPollerMap.clear();
303
304 SocketMap::iterator it;
305 const char *errMsg = 0;
306
307 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
308 {
309 PollerHelper *helper = (PollerHelper*)it->second;
310 if( !helper->channel ) continue;
311 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
312 if( !status )
313 {
314 Socket *socket = it->first;
315 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
316 socket->GetName().c_str(), errMsg );
317 }
318 helper->channel->Delete();
319 helper->channel = 0;
320 }
321
322 return true;
323 }
324
325 //------------------------------------------------------------------------
326 // Add socket to the polling queue
327 //------------------------------------------------------------------------
329 SocketHandler *handler )
330 {
331 Log *log = DefaultEnv::GetLog();
332 XrdSysMutexHelper scopedLock( pMutex );
333
334 if( !socket )
335 {
336 log->Error( PollerMsg, "Invalid socket, impossible to poll" );
337 return false;
338 }
339
340 if( socket->GetStatus() != Socket::Connected &&
341 socket->GetStatus() != Socket::Connecting )
342 {
343 log->Error( PollerMsg, "Socket is not in a state valid for polling" );
344 return false;
345 }
346
347 log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
348
349 //--------------------------------------------------------------------------
350 // Check if the socket is already registered
351 //--------------------------------------------------------------------------
352 SocketMap::const_iterator it = pSocketMap.find( socket );
353 if( it != pSocketMap.end() )
354 {
355 log->Warning( PollerMsg, "%s Already registered with this poller",
356 socket->GetName().c_str() );
357 return false;
358 }
359
360 //--------------------------------------------------------------------------
361 // Create the socket helper
362 //--------------------------------------------------------------------------
363 XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
364
365 if( !poller )
366 {
367 log->Error( PollerMsg, "No poller available, can not add socket" );
368 return false;
369 }
370
371 PollerHelper *helper = new PollerHelper();
372 helper->callBack = new ::SocketCallBack( socket, handler );
373
374 if( poller )
375 {
376 helper->channel = new XrdSys::IOEvents::Channel( poller,
377 socket->GetFD(),
378 helper->callBack );
379 }
380
381 handler->Initialize( this );
382 pSocketMap[socket] = helper;
383 return true;
384 }
385
386 //----------------------------------------------------------------------------
387 // This disables further callbacks to the socket's handler from the poller
388 // thread. Will wait until any current callback completes (unless our caller
389 // is the same thread). We do not yet remove the socket from the poller.
390 // Removal may still block until the relevant poller thread can run.
391 //----------------------------------------------------------------------------
393 {
394 XrdSysMutexHelper scopedLock( pMutex );
395 SocketMap::iterator it = pSocketMap.find( socket );
396 if( it == pSocketMap.end() )
397 return;
398
399 PollerHelper *helper = (PollerHelper*)it->second;
400 if( !helper ) return;
401 XrdSys::IOEvents::CallBack *cb = helper->callBack;
402 if( !cb ) return;
403 SocketCallBack *scb = dynamic_cast<SocketCallBack*>( cb );
404 if( !scb ) return;
405 auto dc = scb->GetControl();
406 scopedLock.UnLock();
407 dc->DisableCallBack();
408 }
409
410 //------------------------------------------------------------------------
411 // Remove the socket
412 //------------------------------------------------------------------------
414 {
415 using namespace XrdSys::IOEvents;
416 Log *log = DefaultEnv::GetLog();
417
418 //--------------------------------------------------------------------------
419 // Find the right socket
420 //--------------------------------------------------------------------------
421 XrdSysMutexHelper scopedLock( pMutex );
422 SocketMap::iterator it = pSocketMap.find( socket );
423 if( it == pSocketMap.end() )
424 return true;
425
426 log->Debug( PollerMsg, "%s Removing socket from the poller",
427 socket->GetName().c_str() );
428
429 // unregister from the poller it's currently associated with
430 UnregisterFromPoller( socket );
431
432 //--------------------------------------------------------------------------
433 // Remove the socket
434 //--------------------------------------------------------------------------
435 PollerHelper *helper = (PollerHelper*)it->second;
436 pSocketMap.erase( it );
437 scopedLock.UnLock();
438
439 if( helper->channel )
440 {
441 const char *errMsg;
442 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
443 if( !status )
444 {
445 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
446 socket->GetName().c_str(), errMsg );
447 return false;
448 }
449 helper->channel->Delete();
450 }
451 delete helper->callBack;
452 delete helper;
453 return true;
454 }
455
456 //----------------------------------------------------------------------------
457 // Notify the handler about read events
458 //----------------------------------------------------------------------------
460 bool notify,
461 time_t timeout )
462 {
463 using namespace XrdSys::IOEvents;
464 Log *log = DefaultEnv::GetLog();
465
466 if( !socket )
467 {
468 log->Error( PollerMsg, "Invalid socket, read events unavailable" );
469 return false;
470 }
471
472 //--------------------------------------------------------------------------
473 // Check if the socket is registered
474 //--------------------------------------------------------------------------
475 XrdSysMutexHelper scopedLock( pMutex );
476 SocketMap::const_iterator it = pSocketMap.find( socket );
477 if( it == pSocketMap.end() )
478 {
479 log->Warning( PollerMsg, "%s Socket is not registered",
480 socket->GetName().c_str() );
481 return false;
482 }
483
484 PollerHelper *helper = (PollerHelper*)it->second;
485 XrdSys::IOEvents::Poller *poller = GetPoller( socket );
486
487 //--------------------------------------------------------------------------
488 // Enable read notifications
489 //--------------------------------------------------------------------------
490 if( notify )
491 {
492 if( helper->readEnabled )
493 return true;
494 helper->readTimeout = timeout;
495
496 log->Dump( PollerMsg, "%s Enable read notifications, timeout: %lld",
497 socket->GetName().c_str(), (long long)timeout );
498
499 if( poller )
500 {
501 const char *errMsg;
502 bool status = helper->channel->Enable( Channel::readEvents, timeout,
503 &errMsg );
504 if( !status )
505 {
506 log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
507 socket->GetName().c_str(), errMsg );
508 return false;
509 }
510 }
511 helper->readEnabled = true;
512 }
513
514 //--------------------------------------------------------------------------
515 // Disable read notifications
516 //--------------------------------------------------------------------------
517 else
518 {
519 if( !helper->readEnabled )
520 return true;
521
522 log->Dump( PollerMsg, "%s Disable read notifications",
523 socket->GetName().c_str() );
524
525 if( poller )
526 {
527 const char *errMsg;
528 bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
529 if( !status )
530 {
531 log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
532 socket->GetName().c_str(), errMsg );
533 return false;
534 }
535 }
536 helper->readEnabled = false;
537 }
538 return true;
539 }
540
541 //----------------------------------------------------------------------------
542 // Notify the handler about write events
543 //----------------------------------------------------------------------------
545 bool notify,
546 time_t timeout )
547 {
548 using namespace XrdSys::IOEvents;
549 Log *log = DefaultEnv::GetLog();
550
551 if( !socket )
552 {
553 log->Error( PollerMsg, "Invalid socket, write events unavailable" );
554 return false;
555 }
556
557 //--------------------------------------------------------------------------
558 // Check if the socket is registered
559 //--------------------------------------------------------------------------
560 XrdSysMutexHelper scopedLock( pMutex );
561 SocketMap::const_iterator it = pSocketMap.find( socket );
562 if( it == pSocketMap.end() )
563 {
564 log->Warning( PollerMsg, "%s Socket is not registered",
565 socket->GetName().c_str() );
566 return false;
567 }
568
569 PollerHelper *helper = (PollerHelper*)it->second;
570 XrdSys::IOEvents::Poller *poller = GetPoller( socket );
571
572 //--------------------------------------------------------------------------
573 // Enable write notifications
574 //--------------------------------------------------------------------------
575 if( notify )
576 {
577 if( helper->writeEnabled )
578 return true;
579
580 helper->writeTimeout = timeout;
581
582 log->Dump( PollerMsg, "%s Enable write notifications, timeout: %lld",
583 socket->GetName().c_str(), (long long)timeout );
584
585 if( poller )
586 {
587 const char *errMsg;
588 bool status = helper->channel->Enable( Channel::writeEvents, timeout,
589 &errMsg );
590 if( !status )
591 {
592 log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
593 socket->GetName().c_str(), errMsg );
594 return false;
595 }
596 }
597 helper->writeEnabled = true;
598 }
599
600 //--------------------------------------------------------------------------
601 // Disable read notifications
602 //--------------------------------------------------------------------------
603 else
604 {
605 if( !helper->writeEnabled )
606 return true;
607
608 log->Dump( PollerMsg, "%s Disable write notifications",
609 socket->GetName().c_str() );
610 if( poller )
611 {
612 const char *errMsg;
613 bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
614 if( !status )
615 {
616 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
617 socket->GetName().c_str(), errMsg );
618 return false;
619 }
620 }
621 helper->writeEnabled = false;
622 }
623 return true;
624 }
625
626 //----------------------------------------------------------------------------
627 // Check whether the socket is registered with the poller
628 //----------------------------------------------------------------------------
630 {
631 XrdSysMutexHelper scopedLock( pMutex );
632 SocketMap::iterator it = pSocketMap.find( socket );
633 return it != pSocketMap.end();
634 }
635
636 //----------------------------------------------------------------------------
637 // Return poller threads in round-robin fashion
638 //----------------------------------------------------------------------------
639 XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
640 {
641 if( pPollerPool.empty() ) return 0;
642
643 PollerPool::iterator ret = pNext;
644 ++pNext;
645 if( pNext == pPollerPool.end() )
646 pNext = pPollerPool.begin();
647 return *ret;
648 }
649
650 //----------------------------------------------------------------------------
651 // Return the poller associated with the respective channel
652 //----------------------------------------------------------------------------
653 XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
654 {
655 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
656
657 if( itr == pPollerMap.end() )
658 {
659 XrdSys::IOEvents::Poller* poller = GetNextPoller();
660 if( poller )
661 pPollerMap[socket->GetFD()] = poller;
662 return poller;
663 }
664
665 return itr->second;
666 }
667
668 void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
669 {
670 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
671 if( itr == pPollerMap.end() ) return;
672 pPollerMap.erase( itr );
673 }
674
675 XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(const Socket * socket)
676 {
677 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
678 if( itr == pPollerMap.end() ) return 0;
679 return itr->second;
680 }
681
682 //----------------------------------------------------------------------------
683 // Get the initial value for pNbPoller
684 //----------------------------------------------------------------------------
685 int PollerBuiltIn::GetNbPollerInit()
686 {
687 Env * env = DefaultEnv::GetEnv();
689 env->GetInt("ParallelEvtLoop", ret);
690 return ret;
691 }
692}
#define unlikely(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool EnableReadNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool Stop()
Stop polling.
virtual void ShutdownEvents(Socket *socket)
virtual bool EnableWriteNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
A network socket.
std::string GetName() const
Get the string representation of the socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
int GetFD() const
Get the file descriptor.
SocketStatus GetStatus() const
Get the socket status.
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static pthread_t ID(void)
@ allEvents
All of the above.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Enable(int events, int timeout=0, const char **eText=0)
bool Disable(int events, const char **eText=0)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
const uint64_t PollerMsg
const int DefaultParallelEvtLoop
XrdSysError Log
Definition XrdConfig.cc:113