XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor.
 ~Stream ()
 Destructor.
bool CanCollapse (const URL &url)
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
XRootDStatus EnableLink (PathID &path)
void Finalize ()
void ForceConnect ()
 Force connection.
void ForceError (XRootDStatus status, const bool hush, const uint64_t sess)
 Force error.
std::shared_ptr< ChannelGetChannel ()
const std::string & GetName () const
 Return stream name.
const URLGetURL () const
 Get the URL.
XRootDStatus Initialize ()
 Initializer.
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
void SetChannel (std::weak_ptr< Channel > &channel)
 Sets a weak_ptr of our owning Channel.
void SetChannelData (AnyObject *channelData)
 Set the channel data.
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
void SetJobManager (JobManager *jobManager)
 Set job manager.
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
void SetPoller (Poller *poller)
 Set the poller.
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
void SetTransport (TransportHandler *transport)
 Set the transport.
void Tick (time_t now)

Detailed Description

Stream.

Definition at line 169 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 175 of file XrdClStream.hh.

176 {
177 Disconnected = 0,
178 Connected = 1,
179 Connecting = 2,
180 Error = 3
181 };
@ Disconnected
Not connected.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL * url,
const URL & prefer = URL() )

Constructor.

Definition at line 299 of file XrdClStream.cc.

299 :
300 pUrl( url ),
301 pPrefer( prefer ),
302 pTransport( 0 ),
303 pPoller( 0 ),
304 pTaskManager( 0 ),
305 pJobManager( 0 ),
306 pIncomingQueue( 0 ),
307 pChannelData( 0 ),
308 pLastStreamError( 0 ),
309 pConnectionCount( 0 ),
310 pConnectionInitTime( 0 ),
311 pAddressType( Utils::IPAll ),
312 pSessionId( 0 ),
313 pBytesSent( 0 ),
314 pBytesReceived( 0 )
315 {
316 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
317 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
318
319 std::ostringstream o;
320 o << pUrl->GetHostId();
321 pStreamName = o.str();
322
323 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
325 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
327 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
329
330 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
332
333 pAddressType = Utils::String2AddressType( netStack );
334 if( pAddressType == Utils::AddressType::IPAuto )
335 {
337 if( !( stacks & XrdNetUtils::hasIP64 ) )
338 {
339 if( stacks & XrdNetUtils::hasIPv4 )
340 pAddressType = Utils::AddressType::IPv4;
341 else if( stacks & XrdNetUtils::hasIPv6 )
342 pAddressType = Utils::AddressType::IPv6;
343 }
344 }
345
346 Log *log = DefaultEnv::GetLog();
347 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
348 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
349 "Window: %d", pStreamName.c_str(), netStack.c_str(),
350 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
351 }
static Log * GetLog()
Get default log.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 356 of file XrdClStream.cc.

357 {
358 // Used to disconnect substreams here, but since we are refernce counted
359 // and connected substream hold a count, if we're here they're closed.
360
361 Log *log = DefaultEnv::GetLog();
362 log->Debug( PostMasterMsg, "[%s] Destroying stream",
363 pStreamName.c_str() );
364
365 MonitorDisconnection( XRootDStatus() );
366
367 SubStreamList::iterator it;
368 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369 delete *it;
370 }

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL & url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1395 of file XrdClStream.cc.

1396 {
1397 Log *log = DefaultEnv::GetLog();
1398
1399 //--------------------------------------------------------------------------
1400 // Resolve all the addresses of the host we're supposed to connect to
1401 //--------------------------------------------------------------------------
1402 std::vector<XrdNetAddr> prefaddrs;
1403 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1404 if( !st.IsOK() )
1405 {
1406 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1407 , pStreamName.c_str(), url.GetHostName().c_str() );
1408 return false;
1409 }
1410
1411 //--------------------------------------------------------------------------
1412 // Resolve all the addresses of the alias
1413 //--------------------------------------------------------------------------
1414 std::vector<XrdNetAddr> aliasaddrs;
1415 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1416 if( !st.IsOK() )
1417 {
1418 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1419 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1420 return false;
1421 }
1422
1423 //--------------------------------------------------------------------------
1424 // Now check if the preferred host is part of the alias
1425 //--------------------------------------------------------------------------
1426 auto itr = prefaddrs.begin();
1427 for( ; itr != prefaddrs.end() ; ++itr )
1428 {
1429 auto itr2 = aliasaddrs.begin();
1430 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1431 if( itr->Same( &*itr2 ) ) return true;
1432 }
1433
1434 return false;
1435 }
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Here is the call graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t subStream)

Disables respective uplink if empty.

Definition at line 794 of file XrdClStream.cc.

795 {
796 bool closing;
797 StreamMutexHelper scopedLock( pMutex, subStream, closing );
798 if( closing ) return;
799 Log *log = DefaultEnv::GetLog();
800
801 if( pSubStreams[subStream]->outQueue->IsEmpty() )
802 {
803 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
804 pSubStreams[subStream]->socket->GetStreamName().c_str() );
805 pSubStreams[subStream]->socket->DisableUplink();
806 }
807 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Here is the call graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID & path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 391 of file XrdClStream.cc.

392 {
393 StreamMutexHelper scopedLock( pMutex );
394
395 //--------------------------------------------------------------------------
396 // We are in the process of connecting the main stream, so we do nothing
397 // because when the main stream connection is established it will connect
398 // all the other streams
399 //--------------------------------------------------------------------------
400 if( pSubStreams[0]->status == Socket::Connecting )
401 return XRootDStatus();
402
403 //--------------------------------------------------------------------------
404 // The main stream is connected, so we can verify whether we have
405 // the up and the down stream connected and ready to handle data.
406 // If anything is not right we fall back to stream 0.
407 //--------------------------------------------------------------------------
408 if( pSubStreams[0]->status == Socket::Connected )
409 {
410 if( pSubStreams[path.down]->status != Socket::Connected )
411 path.down = 0;
412
413 if( pSubStreams[path.up]->status == Socket::Disconnected )
414 {
415 path.up = 0;
416 return pSubStreams[0]->socket->EnableUplink();
417 }
418
419 if( pSubStreams[path.up]->status == Socket::Connected )
420 return pSubStreams[path.up]->socket->EnableUplink();
421
422 return XRootDStatus();
423 }
424
425 //--------------------------------------------------------------------------
426 // The main stream is not connected, we need to check whether enough time
427 // has passed since we last encountered an error (if any) so that we could
428 // re-attempt the connection
429 //--------------------------------------------------------------------------
430 Log *log = DefaultEnv::GetLog();
431 time_t now = ::time(0);
432
433 if( now-pLastStreamError < pStreamErrorWindow )
434 return pLastFatalError;
435
436 gettimeofday( &pConnectionStarted, 0 );
437 ++pConnectionCount;
438
439 //--------------------------------------------------------------------------
440 // Resolve all the addresses of the host we're supposed to connect to
441 //--------------------------------------------------------------------------
442 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
443 if( !st.IsOK() )
444 {
445 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
446 "the host", pStreamName.c_str() );
447 pLastStreamError = now;
448 st.status = stFatal;
449 pLastFatalError = st;
450 return st;
451 }
452
453 if( pPrefer.IsValid() )
454 {
455 std::vector<XrdNetAddr> addrresses;
456 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
457 if( !st.IsOK() )
458 {
459 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
460 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
461 }
462 else
463 {
464 std::vector<XrdNetAddr> tmp;
465 tmp.reserve( pAddresses.size() );
466 // first add all remaining addresses
467 auto itr = pAddresses.begin();
468 for( ; itr != pAddresses.end() ; ++itr )
469 {
470 if( !HasNetAddr( *itr, addrresses ) )
471 tmp.push_back( *itr );
472 }
473 // then copy all 'preferred' addresses
474 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
475 // and keep the result
476 pAddresses.swap( tmp );
477 }
478 }
479
480 Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
481 pAddresses );
482
483 while( !pAddresses.empty() )
484 {
485 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
486 pAddresses.pop_back();
487 pConnectionInitTime = ::time( 0 );
488 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
489 if( st.IsOK() )
490 {
491 pSubStreams[0]->status = Socket::Connecting;
492 break;
493 }
494 }
495 return st;
496 }
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Finalize()

void XrdCl::Stream::Finalize ( )

Used at finalize time, disconnects the stream. Assumes poller and jobmanager are not running.

Definition at line 568 of file XrdClStream.cc.

569 {
570 auto channel = GetChannel();
571 StreamMutexHelper scopedLock( pMutex );
572 SubStreamList::iterator it;
573 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
574 {
575 (*it)->socket->Close();
576 (*it)->status = Socket::Disconnected;
577 }
578 pSessionId = 0;
579 }
std::shared_ptr< Channel > GetChannel()

References XrdCl::Socket::Disconnected, and GetChannel().

Here is the call graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 552 of file XrdClStream.cc.

553 {
554 StreamMutexHelper scopedLock( pMutex );
555 if( pSubStreams[0]->status == Socket::Connecting )
556 {
557 pSubStreams[0]->status = Socket::Disconnected;
558 XrdCl::PathID path( 0, 0 );
559 XrdCl::XRootDStatus st = EnableLink( path );
560 if( !st.IsOK() )
561 OnConnectError( 0, st );
562 }
563 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Here is the call graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus status,
const bool hush,
const uint64_t sess )

Force error.

Definition at line 1141 of file XrdClStream.cc.

1142 {
1143 auto channel = GetChannel();
1144 bool closing;
1145 StreamMutexHelper scopedLock( pMutex,
1146 [this, channel, status, hush, sess]()
1147 {
1148 this->ForceError(status, hush, sess);
1149 }, closing );
1150 if( closing ) return;
1151 if( sess && sess != pSessionId ) return;
1152
1153 Log *log = DefaultEnv::GetLog();
1154 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1155 {
1156 if( pSubStreams[substream]->status != Socket::Connected ) continue;
1157 SockHandlerClose( substream );
1158
1159 if( !hush )
1160 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1161 pStreamName.c_str(), status.ToString().c_str() );
1162
1163 //--------------------------------------------------------------------
1164 // Reinsert the stuff that we have failed to sent
1165 //--------------------------------------------------------------------
1166 Reinsert( substream );
1167 }
1168
1169 pConnectionCount = 0;
1170 pSessionId = 0;
1171
1172 //------------------------------------------------------------------------
1173 // We're done here, unlock the stream mutex to avoid deadlocks and
1174 // report the disconnection event to the handlers
1175 //------------------------------------------------------------------------
1176 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1177 "message handlers.", pStreamName.c_str() );
1178
1179 SubStreamList::iterator it;
1180 OutQueue q;
1181 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1182 q.GrabItems( *(*it)->outQueue );
1183 scopedLock.UnLock();
1184
1185 q.Report( status );
1186
1187 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1188 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1189 }
@ Broken
The stream is broken.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), ForceError(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelEventHandler::StreamBroken, and XrdCl::Status::ToString().

Referenced by ForceError().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetChannel()

std::shared_ptr< Channel > XrdCl::Stream::GetChannel ( )
inline

Gets a shared_ptr of our owning Channel. Used to by our AsyncSocketHandlers to obtain a ref count, and internally to ensure Channel (and thus ourselves) remains alive during error-event handlers that close sockets.

Definition at line 414 of file XrdClStream.hh.

415 {
416 return pChannel.lock();
417 }

Referenced by Finalize(), ForceError(), OnConnect(), OnConnectError(), OnError(), and OnReadTimeout().

Here is the caller graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 297 of file XrdClStream.hh.

298 {
299 return pStreamName;
300 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 284 of file XrdClStream.hh.

285 {
286 return pUrl;
287 }

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 375 of file XrdClStream.cc.

376 {
377 if( !pTransport || !pPoller || !pChannelData )
378 return XRootDStatus( stError, errUninitialized );
379
380 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
381 pChannelData, 0, this );
382 pSubStreams.push_back( new SubStreamData() );
383 pSubStreams[0]->socket = s;
384 return XRootDStatus();
385 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t stream,
MsgHandler *& incHandler )

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1364 of file XrdClStream.cc.

1366 {
1367 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1368 if( !mh.handler )
1370
1371 uint16_t action = mh.handler->InspectStatusRsp();
1372 mh.action |= action;
1373
1374 if( action & MsgHandler::RemoveHandler )
1375 pIncomingQueue->RemoveMessageHandler( mh.handler );
1376
1377 if( action & MsgHandler::Raw )
1378 {
1379 incHandler = mh.handler;
1380 return MsgHandler::Raw;
1381 }
1382
1383 if( action & MsgHandler::Corrupted )
1384 return MsgHandler::Corrupted;
1385
1386 if( action & MsgHandler::More )
1387 return MsgHandler::More;
1388
1389 return MsgHandler::None;
1390 }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, and XrdCl::MsgHandler::RemoveHandler.

Here is the call graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > & msg,
uint16_t stream )

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1343 of file XrdClStream.cc.

1344 {
1345 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1346 if( !mh.handler )
1347 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1348 mh.expires,
1349 mh.action );
1350
1351 if( !mh.handler )
1352 return nullptr;
1353
1354 if( mh.action & MsgHandler::Raw )
1355 return mh.handler;
1356 return nullptr;
1357 }

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t subStream)

Call back when a message has been reconstructed.

Definition at line 834 of file XrdClStream.cc.

835 {
836 auto channel = GetChannel();
837 bool closing;
838 StreamMutexHelper scopedLock( pMutex, subStream, closing );
839 if( closing ) return;
840 pSubStreams[subStream]->status = Socket::Connected;
841
842 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
843 Log *log = DefaultEnv::GetLog();
844 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
845 subStream, ipstack.c_str() );
846
847 if( subStream == 0 )
848 {
849 pLastStreamError = 0;
850 pLastFatalError = XRootDStatus();
851 pConnectionCount = 0;
852 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
853 pSessionId = ++sSessCntGen;
854
855 //------------------------------------------------------------------------
856 // Create the streams if they don't exist yet
857 //------------------------------------------------------------------------
858 if( pSubStreams.size() == 1 && numSub > 1 )
859 {
860 for( uint16_t i = 1; i < numSub; ++i )
861 {
862 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
863 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
864 pChannelData, i, this );
865 pSubStreams.push_back( new SubStreamData() );
866 pSubStreams[i]->socket = s;
867 }
868 }
869
870 //------------------------------------------------------------------------
871 // Connect the extra streams, if we fail we move all the outgoing items
872 // to stream 0, we don't need to enable the uplink here, because it
873 // should be already enabled after the handshaking process is completed.
874 //------------------------------------------------------------------------
875 if( pSubStreams.size() > 1 )
876 {
877 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
878 pStreamName.c_str(), pSubStreams.size() - 1 );
879 for( size_t i = 1; i < pSubStreams.size(); ++i )
880 {
881 if( pSubStreams[i]->status != Socket::Disconnected )
882 {
883 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
884 SockHandlerClose( i );
885 }
886 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
887 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
888 if( !st.IsOK() )
889 {
890 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
891 SockHandlerClose( i );
892 }
893 else
894 {
895 pSubStreams[i]->status = Socket::Connecting;
896 }
897 }
898 }
899
900 //------------------------------------------------------------------------
901 // Inform monitoring
902 //------------------------------------------------------------------------
903 pBytesSent = 0;
904 pBytesReceived = 0;
905 gettimeofday( &pConnectionDone, 0 );
906 Monitor *mon = DefaultEnv::GetMonitor();
907 if( mon )
908 {
909 Monitor::ConnectInfo i;
910 i.server = pUrl->GetHostId();
911 i.sTOD = pConnectionStarted;
912 i.eTOD = pConnectionDone;
913 i.streams = pSubStreams.size();
914
915 AnyObject qryResult;
916 std::string *qryResponse = nullptr;
917 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
918 qryResult.Get( qryResponse );
919
920 if (qryResponse) {
921 i.auth = *qryResponse;
922 delete qryResponse;
923 } else {
924 i.auth = "";
925 }
926
927 mon->Event( Monitor::EvConnect, &i );
928 }
929
930 //------------------------------------------------------------------------
931 // For every connected control-stream call the global on-connect handler
932 //------------------------------------------------------------------------
934 }
935 else if( pOnDataConnJob )
936 {
937 //------------------------------------------------------------------------
938 // For every connected data-stream call the on-connect handler
939 //------------------------------------------------------------------------
940 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
941 }
942 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::TransportQuery::Auth, XrdCl::Monitor::ConnectInfo::auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, and XrdCl::Monitor::ConnectInfo::streams.

Here is the call graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t subStream,
XRootDStatus status )

On connect error.

Definition at line 947 of file XrdClStream.cc.

948 {
949 auto channel = GetChannel();
950 bool closing;
951 StreamMutexHelper scopedLock( pMutex, subStream, closing );
952 if( closing ) return;
953 Log *log = DefaultEnv::GetLog();
954 SockHandlerClose( subStream );
955 time_t now = ::time(0);
956
957 //--------------------------------------------------------------------------
958 // For every connection error call the global connection error handler
959 //--------------------------------------------------------------------------
961
962 //--------------------------------------------------------------------------
963 // If we connected subStream == 0 and cannot connect >0 then we just give
964 // up and move the outgoing messages to another queue
965 //--------------------------------------------------------------------------
966 if( subStream > 0 )
967 {
968 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
969 if( pSubStreams[0]->status == Socket::Connected )
970 {
971 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
972 if( !st.IsOK() )
973 OnFatalError( 0, st, scopedLock );
974 return;
975 }
976
977 if( pSubStreams[0]->status == Socket::Connecting )
978 return;
979
980 OnFatalError( subStream, status, scopedLock );
981 return;
982 }
983
984 //--------------------------------------------------------------------------
985 // Check if we still have time to try and do something in the current window
986 //--------------------------------------------------------------------------
987 time_t elapsed = now-pConnectionInitTime;
988 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
989 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
990
991 //------------------------------------------------------------------------
992 // If we have some IP addresses left we try them
993 //------------------------------------------------------------------------
994 if( !pAddresses.empty() )
995 {
996 XRootDStatus st;
997 do
998 {
999 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
1000 pAddresses.pop_back();
1001 pConnectionInitTime = ::time( 0 );
1002 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
1003 }
1004 while( !pAddresses.empty() && !st.IsOK() );
1005
1006 if( !st.IsOK() )
1007 OnFatalError( subStream, st, scopedLock );
1008 else
1009 pSubStreams[0]->status = Socket::Connecting;
1010
1011 return;
1012 }
1013 //------------------------------------------------------------------------
1014 // If we still can retry with the same host name, we sleep until the end
1015 // of the connection window and try
1016 //------------------------------------------------------------------------
1017 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
1018 && !status.IsFatal() )
1019 {
1020 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
1021 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
1022
1023 pSubStreams[0]->status = Socket::Connecting;
1024 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
1025 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
1026 return;
1027 }
1028 //--------------------------------------------------------------------------
1029 // We are out of the connection window, the only thing we can do here
1030 // is re-resolving the host name and retrying if we still can
1031 //--------------------------------------------------------------------------
1032 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
1033 {
1034 pAddresses.clear();
1035 pSubStreams[0]->status = Socket::Disconnected;
1036 PathID path( 0, 0 );
1037 XRootDStatus st = EnableLink( path );
1038 if( !st.IsOK() )
1039 OnFatalError( subStream, st, scopedLock );
1040 return;
1041 }
1042
1043 //--------------------------------------------------------------------------
1044 // Else, we fail
1045 //--------------------------------------------------------------------------
1046 OnFatalError( subStream, status, scopedLock );
1047 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), and XrdCl::PostMasterMsg.

Referenced by ForceConnect().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t subStream,
XRootDStatus status )

On error.

Definition at line 1052 of file XrdClStream.cc.

1053 {
1054 auto channel = GetChannel();
1055 bool closing;
1056 StreamMutexHelper scopedLock( pMutex, subStream, closing );
1057 if( closing ) return;
1058 Log *log = DefaultEnv::GetLog();
1059 SockHandlerClose( subStream );
1060
1061 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
1062 pStreamName.c_str(), subStream, status.ToString().c_str() );
1063
1064 //--------------------------------------------------------------------------
1065 // Reinsert the stuff that we have failed to sent
1066 //--------------------------------------------------------------------------
1067 Reinsert( subStream );
1068
1069 //--------------------------------------------------------------------------
1070 // We are dealing with an error of a peripheral stream. If we don't have
1071 // anything to send don't bother recovering. Otherwise move the requests
1072 // to stream 0 if possible.
1073 //--------------------------------------------------------------------------
1074 if( subStream > 0 )
1075 {
1076 if( pSubStreams[subStream]->outQueue->IsEmpty() )
1077 return;
1078
1079 if( pSubStreams[0]->status != Socket::Disconnected )
1080 {
1081 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
1082 if( pSubStreams[0]->status == Socket::Connected )
1083 {
1084 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
1085 if( !st.IsOK() )
1086 OnFatalError( 0, st, scopedLock );
1087 return;
1088 }
1089 }
1090 OnFatalError( subStream, status, scopedLock );
1091 return;
1092 }
1093
1094 //--------------------------------------------------------------------------
1095 // If we lost the stream 0 we have lost the session, we re-enable the
1096 // stream if we still have things in one of the outgoing queues, otherwise
1097 // there is not point to recover at this point.
1098 //--------------------------------------------------------------------------
1099 if( subStream == 0 )
1100 {
1101 MonitorDisconnection( status );
1102 pSessionId = 0;
1103
1104 SubStreamList::iterator it;
1105 size_t outstanding = 0;
1106 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1107 outstanding += (*it)->outQueue->GetSizeStateless();
1108
1109 if( outstanding )
1110 {
1111 PathID path( 0, 0 );
1112 XRootDStatus st = EnableLink( path );
1113 if( !st.IsOK() )
1114 {
1115 OnFatalError( 0, st, scopedLock );
1116 return;
1117 }
1118 }
1119
1120 //------------------------------------------------------------------------
1121 // We're done here, unlock the stream mutex to avoid deadlocks and
1122 // report the disconnection event to the handlers
1123 //------------------------------------------------------------------------
1124 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1125 "message handlers.", pStreamName.c_str() );
1126 OutQueue q;
1127 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1128 q.GrabStateful( *(*it)->outQueue );
1129 scopedLock.UnLock();
1130
1131 q.Report( status );
1132 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1133 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1134 return;
1135 }
1136 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::Status::IsOK(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdCl::StreamMutexHelper::UnLock().

Referenced by OnReadTimeout().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t subStream,
std::shared_ptr< Message > msg,
uint32_t bytesReceived )

Call back when a message has been reconstructed.

Definition at line 678 of file XrdClStream.cc.

681 {
682 msg->SetSessionId( pSessionId );
683 pBytesReceived += bytesReceived;
684
685 MsgHandler *handler = nullptr;
686 uint16_t action = 0;
687 {
688 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
689 handler = mh.handler;
690 action = mh.action;
691 mh.Reset();
692 }
693
694 if( !IsPartial( *msg ) )
695 {
696 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
697 *pChannelData );
698 if( streamAction & TransportHandler::DigestMsg )
699 return;
700
701 if( streamAction & TransportHandler::RequestClose )
702 {
703 RequestClose( *msg );
704 return;
705 }
706 }
707
708 Log *log = DefaultEnv::GetLog();
709
710 //--------------------------------------------------------------------------
711 // No handler, we discard the message ...
712 //--------------------------------------------------------------------------
713 if( !handler )
714 {
715 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
716 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
717 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
718 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
719 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
720 return;
721 }
722
723 //--------------------------------------------------------------------------
724 // We have a handler, so we call the callback
725 //--------------------------------------------------------------------------
726 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
727 pStreamName.c_str(), (void*)msg.get() );
728
730 {
731 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
732 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
733
734 // if we are handling partial response we have to take down the timeout fence
735 if( IsPartial( *msg ) )
736 {
737 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
738 if( xrdHandler ) xrdHandler->PartialReceived();
739 }
740
741 return;
742 }
743
744 Job *job = new HandleIncMsgJob( handler );
745 pJobManager->QueueJob( job );
746 }
kXR_char streamid[2]
Definition XProtocol.hh:956
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
ServerResponseHeader hdr

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Here is the call graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t subStream,
Message * msg,
uint32_t bytesSent )

Definition at line 812 of file XrdClStream.cc.

815 {
816 pTransport->MessageSent( msg, subStream, bytesSent,
817 *pChannelData );
818 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
819 pBytesSent += bytesSent;
820 if( h.handler )
821 {
822 // ensure expiration time is assigned if still in queue
823 pIncomingQueue->AssignTimeout( h.handler );
824 // OnStatusReady may cause the handler to delete itself, in
825 // which case the handler or the user callback may also delete msg
826 h.handler->OnStatusReady( msg, XRootDStatus() );
827 }
828 pSubStreams[subStream]->outMsgHelper.Reset();
829 }

References XrdCl::OutQueue::MsgHelper::handler, XrdCl::MsgHandler::OnStatusReady(), and XrdCl::OutQueue::MsgHelper::Reset().

Here is the call graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t subStream)

On read timeout.

Definition at line 1248 of file XrdClStream.cc.

1249 {
1250 //--------------------------------------------------------------------------
1251 // We only take the main stream into account
1252 //--------------------------------------------------------------------------
1253 if( substream != 0 )
1254 return true;
1255
1256 //--------------------------------------------------------------------------
1257 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1258 // It is assumed that the underlying transport makes sure that there is no
1259 // pending requests that are not answered, ie. all possible virtual streams
1260 // are de-allocated
1261 //--------------------------------------------------------------------------
1262 Log *log = DefaultEnv::GetLog();
1263 SubStreamList::iterator it;
1264 time_t now = time(0);
1265
1266 bool closing;
1267 StreamMutexHelper scopedLock( pMutex, substream, closing );
1268 if( closing ) return false;
1269 uint32_t outgoingMessages = 0;
1270 time_t lastActivity = 0;
1271 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1272 {
1273 outgoingMessages += (*it)->outQueue->GetSize();
1274 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1275 if( lastActivity < sockLastActivity )
1276 lastActivity = sockLastActivity;
1277 }
1278
1279 if( !outgoingMessages )
1280 {
1281 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1282 *pChannelData );
1283 if( disconnect )
1284 {
1285 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1286 pStreamName.c_str() );
1287 const uint64_t sess = pSessionId;
1288 scopedLock.UnLock();
1289 //----------------------------------------------------------------------
1290 // Important note!
1291 //
1292 // This destroys the Stream object itself, the underlined
1293 // AsyncSocketHandler object (that called this method) and the Channel
1294 // object that aggregates this Stream.
1295 //----------------------------------------------------------------------
1297 return false;
1298 }
1299 }
1300
1301 //--------------------------------------------------------------------------
1302 // Check if the stream is broken
1303 //--------------------------------------------------------------------------
1304 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1305 *pChannelData );
1306 if( !st.IsOK() )
1307 {
1308 scopedLock.UnLock();
1309 OnError( substream, st );
1310 return false;
1311 }
1312 return true;
1313 }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), OnError(), XrdCl::PostMasterMsg, and XrdCl::StreamMutexHelper::UnLock().

Here is the call graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t subStream)

Definition at line 752 of file XrdClStream.cc.

753 {
754 bool closing;
755 StreamMutexHelper scopedLock( pMutex, subStream, closing );
756 if( closing ) return std::make_pair( (Message *)0, (MsgHandler *)0 );
757 Log *log = DefaultEnv::GetLog();
758 if( pSubStreams[subStream]->outQueue->IsEmpty() )
759 {
760 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
761 pSubStreams[subStream]->socket->GetStreamName().c_str() );
762
763 pSubStreams[subStream]->socket->DisableUplink();
764 return std::make_pair( (Message *)0, (MsgHandler *)0 );
765 }
766
767 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
768 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
769 h.expires,
770 h.stateful );
771
772 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
773 "from out-queue to in-queue, starting to send outgoing.",
774 pUrl->GetHostId().c_str(), (void*)h.handler,
775 h.msg->GetObfuscatedDescription().c_str() );
776
777 scopedLock.UnLock();
778
779 if( h.handler )
780 {
781 bool rmMsg = false;
782 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
783 if( rmMsg )
784 {
785 Log *log = DefaultEnv::GetLog();
786 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
787 pStreamName.c_str() );
788 }
789 h.handler->OnReadyToSend( h.msg );
790 }
791 return std::make_pair( h.msg, h.handler );
792 }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdCl::StreamMutexHelper::UnLock(), and XrdCl::Log::Warning().

Here is the call graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t subStream)

On write timeout.

Definition at line 1318 of file XrdClStream.cc.

1319 {
1320 return true;
1321 }

◆ Query()

Status XrdCl::Stream::Query ( uint16_t query,
AnyObject & result )

Query the stream.

Definition at line 1440 of file XrdClStream.cc.

1441 {
1442 switch( query )
1443 {
1445 {
1446 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1447 return Status();
1448 }
1449
1451 {
1452 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1453 return Status();
1454 }
1455
1457 {
1458 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1459 return Status();
1460 }
1461
1462 default:
1463 return Status( stError, errQueryNotSupported );
1464 }
1465 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Here is the call graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler * handler)

Register channel event handler.

Definition at line 1326 of file XrdClStream.cc.

1327 {
1328 pChannelEvHandlers.AddHandler( handler );
1329 }

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler * handler)

Remove a channel event handler.

Definition at line 1334 of file XrdClStream.cc.

1335 {
1336 pChannelEvHandlers.RemoveHandler( handler );
1337 }

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Queue the message for sending.

Definition at line 501 of file XrdClStream.cc.

505 {
506 StreamMutexHelper scopedLock( pMutex );
507 Log *log = DefaultEnv::GetLog();
508
509 //--------------------------------------------------------------------------
510 // Check the session ID and bounce if needed
511 //--------------------------------------------------------------------------
512 if( msg->GetSessionId() &&
513 (pSubStreams[0]->status != Socket::Connected ||
514 pSessionId != msg->GetSessionId()) )
515 return XRootDStatus( stError, errInvalidSession );
516
517 //--------------------------------------------------------------------------
518 // Decide on the path to send the message
519 //--------------------------------------------------------------------------
520 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
521 if( pSubStreams.size() <= path.up )
522 {
523 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
524 "substream %d, using 0 instead", pStreamName.c_str(),
525 msg->GetObfuscatedDescription().c_str(), path.up );
526 path.up = 0;
527 }
528
529 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
530 "substream %d expecting answer at %d", pStreamName.c_str(),
531 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
532
533 //--------------------------------------------------------------------------
534 // Enable *a* path and insert the message to the right queue
535 //--------------------------------------------------------------------------
536 XRootDStatus st = EnableLink( path );
537 if( st.IsOK() )
538 {
539 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
540 handler->OnWaitingToSend( msg );
541 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
542 expires, stateful );
543 }
544 else
545 st.status = stFatal;
546 return st;
547 }
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::MsgHandler::OnWaitingToSend(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Here is the call graph for this function:

◆ SetChannel()

void XrdCl::Stream::SetChannel ( std::weak_ptr< Channel > & channel)
inline

Sets a weak_ptr of our owning Channel.

Definition at line 233 of file XrdClStream.hh.

234 {
235 pChannel = channel;
236 }

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject * channelData)
inline

Set the channel data.

Definition at line 241 of file XrdClStream.hh.

242 {
243 pChannelData = channelData;
244 }

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue * incomingQueue)
inline

Set the incoming queue.

Definition at line 225 of file XrdClStream.hh.

226 {
227 pIncomingQueue = incomingQueue;
228 }

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager * jobManager)
inline

Set job manager.

Definition at line 257 of file XrdClStream.hh.

258 {
259 pJobManager = jobManager;
260 }

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > & onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 391 of file XrdClStream.hh.

392 {
393 StreamMutexHelper scopedLock( pMutex );
394 pOnDataConnJob = onConnJob;
395 }

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller * poller)
inline

Set the poller.

Definition at line 217 of file XrdClStream.hh.

218 {
219 pPoller = poller;
220 }

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager * taskManager)
inline

Set task manager.

Definition at line 249 of file XrdClStream.hh.

250 {
251 pTaskManager = taskManager;
252 }

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler * transport)
inline

Set the transport.

Definition at line 209 of file XrdClStream.hh.

210 {
211 pTransport = transport;
212 }

◆ Tick()

void XrdCl::Stream::Tick ( time_t now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 584 of file XrdClStream.cc.

585 {
586 //--------------------------------------------------------------------------
587 // Check for timed-out requests and incoming handlers
588 //--------------------------------------------------------------------------
589 StreamMutexHelper scopedLock( pMutex );
590 OutQueue q;
591 SubStreamList::iterator it;
592 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
593 q.GrabExpired( *(*it)->outQueue, now );
594 scopedLock.UnLock();
595
596 q.Report( XRootDStatus( stError, errOperationExpired ) );
597 pIncomingQueue->ReportTimeout( now );
598 }
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdCl::OutQueue::Report(), XrdCl::stError, and XrdCl::StreamMutexHelper::UnLock().

Here is the call graph for this function:

The documentation for this class was generated from the following files: