42#include <sys/socket.h>
124 mlist.emplace_front();
134 mlist.emplace_back();
136 std::tie( mit, ins ) =
mthmap.insert(
139 if( mit->second ==
mlist.begin() )
156 if(
mlist.empty() ) {
157 mlist.emplace_front();
167 mlist.emplace_back();
169 std::tie( mit, ins ) =
mthmap.insert(
172 if( mit->second ==
mlist.begin() )
180 mlist.erase( mit->second );
201 mlist.emplace_front( func );
203 auto lit =
mlist.begin();
219 mlist.emplace_back( func );
226 if( mit->second ==
mlist.begin() )
241 std::function<void()> keepfn;
245 if( mit ==
mthmap.end() )
return;
248 assert( mit->second ==
mlist.begin() );
250 const size_t cnt = --
mlist.front().cnt;
256 std::swap( keepfn,
mlist.front().fn );
259 mlist.erase( mit->second );
263 assert(
mlist.empty() ||
mlist.front().cnt == 0 );
267 auto &lfn =
mlist.front().fn;
308 pLastStreamError( 0 ),
309 pConnectionCount( 0 ),
310 pConnectionInitTime( 0 ),
311 pAddressType(
Utils::IPAll ),
316 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
317 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
319 std::ostringstream o;
320 o << pUrl->GetHostId();
321 pStreamName = o.str();
348 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
349 "Window: %d", pStreamName.c_str(), netStack.c_str(),
350 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
363 pStreamName.c_str() );
367 SubStreamList::iterator it;
368 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
377 if( !pTransport || !pPoller || !pChannelData )
381 pChannelData, 0,
this );
383 pSubStreams[0]->socket = s;
416 return pSubStreams[0]->socket->EnableUplink();
420 return pSubStreams[path.
up]->socket->EnableUplink();
431 time_t now = ::time(0);
433 if( now-pLastStreamError < pStreamErrorWindow )
434 return pLastFatalError;
436 gettimeofday( &pConnectionStarted, 0 );
446 "the host", pStreamName.c_str() );
447 pLastStreamError = now;
449 pLastFatalError = st;
453 if( pPrefer.IsValid() )
455 std::vector<XrdNetAddr> addrresses;
460 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
464 std::vector<XrdNetAddr> tmp;
465 tmp.reserve( pAddresses.size() );
467 auto itr = pAddresses.begin();
468 for( ; itr != pAddresses.end() ; ++itr )
470 if( !HasNetAddr( *itr, addrresses ) )
471 tmp.push_back( *itr );
474 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
476 pAddresses.swap( tmp );
483 while( !pAddresses.empty() )
485 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
486 pAddresses.pop_back();
487 pConnectionInitTime = ::time( 0 );
488 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
520 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
521 if( pSubStreams.size() <= path.
up )
524 "substream %d, using 0 instead", pStreamName.c_str(),
530 "substream %d expecting answer at %d", pStreamName.c_str(),
539 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
541 pSubStreams[path.
up]->outQueue->PushBack( msg, handler,
572 SubStreamList::iterator it;
573 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
575 (*it)->socket->Close();
591 SubStreamList::iterator it;
592 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
597 pIncomingQueue->ReportTimeout( now );
612 StreamConnectorTask(
const XrdCl::URL &url,
const std::string &n ):
615 std::string name =
"StreamConnectorTask for ";
638 ServerResponse *rsp =
reinterpret_cast<ServerResponse*
>( response.GetBuffer() );
641 ClientCloseRequest *req;
644 memcpy( req->
fhandle,
reinterpret_cast<uint8_t*
>( rsp->
body.buffer.data ), 4 );
646 msg->SetSessionId( pSessionId );
647 NullResponseHandler *handler =
new NullResponseHandler();
648 MessageSendParams params;
650 params.followRedirects =
false;
651 params.stateful =
true;
659 bool Stream::IsPartial(
Message &msg )
661 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
667 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
679 std::shared_ptr<Message> msg,
680 uint32_t bytesReceived )
682 msg->SetSessionId( pSessionId );
683 pBytesReceived += bytesReceived;
694 if( !IsPartial( *msg ) )
696 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
703 RequestClose( *msg );
717 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
718 pStreamName.c_str(), (
void*)msg.get(), rsp->
hdr.
status,
727 pStreamName.c_str(), (
void*)msg.get() );
732 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
735 if( IsPartial( *msg ) )
744 Job *job =
new HandleIncMsgJob( handler );
745 pJobManager->QueueJob( job );
751 std::pair<Message *, MsgHandler *>
758 if( pSubStreams[subStream]->outQueue->IsEmpty() )
761 pSubStreams[subStream]->socket->GetStreamName().c_str() );
763 pSubStreams[subStream]->socket->DisableUplink();
768 h.
msg = pSubStreams[subStream]->outQueue->PopMessage( h.
handler,
773 "from out-queue to in-queue, starting to send outgoing.",
774 pUrl->GetHostId().c_str(), (
void*)h.
handler,
782 pIncomingQueue->AddMessageHandler( h.
handler, rmMsg );
787 pStreamName.c_str() );
798 if( closing )
return;
801 if( pSubStreams[subStream]->outQueue->IsEmpty() )
804 pSubStreams[subStream]->socket->GetStreamName().c_str() );
805 pSubStreams[subStream]->socket->DisableUplink();
816 pTransport->MessageSent( msg, subStream, bytesSent,
819 pBytesSent += bytesSent;
823 pIncomingQueue->AssignTimeout( h.
handler );
828 pSubStreams[subStream]->outMsgHelper.
Reset();
839 if( closing )
return;
842 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
845 subStream, ipstack.c_str() );
849 pLastStreamError = 0;
851 pConnectionCount = 0;
852 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
853 pSessionId = ++sSessCntGen;
858 if( pSubStreams.size() == 1 && numSub > 1 )
860 for( uint16_t i = 1; i < numSub; ++i )
862 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
864 pChannelData, i,
this );
866 pSubStreams[i]->socket = s;
875 if( pSubStreams.size() > 1 )
878 pStreamName.c_str(), pSubStreams.size() - 1 );
879 for(
size_t i = 1; i < pSubStreams.size(); ++i )
883 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
884 SockHandlerClose( i );
886 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
887 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
890 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
891 SockHandlerClose( i );
905 gettimeofday( &pConnectionDone, 0 );
910 i.
server = pUrl->GetHostId();
911 i.
sTOD = pConnectionStarted;
912 i.
eTOD = pConnectionDone;
913 i.
streams = pSubStreams.size();
916 std::string *qryResponse =
nullptr;
918 qryResult.
Get( qryResponse );
921 i.
auth = *qryResponse;
935 else if( pOnDataConnJob )
940 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
952 if( closing )
return;
954 SockHandlerClose( subStream );
955 time_t now = ::time(0);
968 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
971 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
973 OnFatalError( 0, st, scopedLock );
980 OnFatalError( subStream, status, scopedLock );
987 time_t elapsed = now-pConnectionInitTime;
989 pStreamName.c_str(), (
long long) elapsed, pConnectionWindow );
994 if( !pAddresses.empty() )
999 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
1000 pAddresses.pop_back();
1001 pConnectionInitTime = ::time( 0 );
1002 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
1004 while( !pAddresses.empty() && !st.
IsOK() );
1007 OnFatalError( subStream, st, scopedLock );
1017 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
1021 pStreamName.c_str(), (
long long) (pConnectionWindow - elapsed) );
1024 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
1025 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
1032 else if( pConnectionCount < pConnectionRetry && !status.
IsFatal() )
1039 OnFatalError( subStream, st, scopedLock );
1046 OnFatalError( subStream, status, scopedLock );
1057 if( closing )
return;
1059 SockHandlerClose( subStream );
1062 pStreamName.c_str(), subStream, status.
ToString().c_str() );
1067 Reinsert( subStream );
1076 if( pSubStreams[subStream]->outQueue->IsEmpty() )
1081 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
1084 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
1086 OnFatalError( 0, st, scopedLock );
1090 OnFatalError( subStream, status, scopedLock );
1099 if( subStream == 0 )
1101 MonitorDisconnection( status );
1104 SubStreamList::iterator it;
1105 size_t outstanding = 0;
1106 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1107 outstanding += (*it)->outQueue->GetSizeStateless();
1115 OnFatalError( 0, st, scopedLock );
1125 "message handlers.", pStreamName.c_str() );
1127 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1146 [
this, channel, status, hush, sess]()
1150 if( closing )
return;
1151 if( sess && sess != pSessionId )
return;
1154 for(
size_t substream = 0; substream < pSubStreams.size(); ++substream )
1157 SockHandlerClose( substream );
1161 pStreamName.c_str(), status.
ToString().c_str() );
1166 Reinsert( substream );
1169 pConnectionCount = 0;
1177 "message handlers.", pStreamName.c_str() );
1179 SubStreamList::iterator it;
1181 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1183 scopedLock.UnLock();
1194 void Stream::OnFatalError( uint16_t subStream,
1199 SockHandlerClose( subStream );
1201 pStreamName.c_str(), status.
ToString().c_str() );
1209 pConnectionCount = 0;
1210 pLastStreamError = ::time(0);
1211 pLastFatalError = status;
1214 SubStreamList::iterator it;
1216 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1217 q.GrabItems( *(*it)->outQueue );
1230 void Stream::MonitorDisconnection(
XRootDStatus status )
1235 Monitor::DisconnectInfo i;
1236 i.server = pUrl->GetHostId();
1237 i.rBytes = pBytesReceived;
1238 i.sBytes = pBytesSent;
1239 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1253 if( substream != 0 )
1263 SubStreamList::iterator it;
1264 time_t now = time(0);
1268 if( closing )
return false;
1269 uint32_t outgoingMessages = 0;
1270 time_t lastActivity = 0;
1271 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1273 outgoingMessages += (*it)->outQueue->GetSize();
1274 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1275 if( lastActivity < sockLastActivity )
1276 lastActivity = sockLastActivity;
1279 if( !outgoingMessages )
1281 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1286 pStreamName.c_str() );
1287 const uint64_t sess = pSessionId;
1304 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1328 pChannelEvHandlers.AddHandler( handler );
1336 pChannelEvHandlers.RemoveHandler( handler );
1347 mh.
handler = pIncomingQueue->GetHandlerForMessage( msg,
1375 pIncomingQueue->RemoveMessageHandler( mh.
handler );
1402 std::vector<XrdNetAddr> prefaddrs;
1407 , pStreamName.c_str(), url.
GetHostName().c_str() );
1414 std::vector<XrdNetAddr> aliasaddrs;
1419 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1426 auto itr = prefaddrs.begin();
1427 for( ; itr != prefaddrs.end() ; ++itr )
1429 auto itr2 = aliasaddrs.begin();
1430 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1431 if( itr->Same( &*itr2 ) )
return true;
1446 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpAddr() ),
false );
1452 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpStack() ),
false );
1458 result.
Set(
new std::string( pSubStreams[0]->socket->GetHostName() ),
false );
1471 void Stream::Reinsert( uint16_t subStream )
1476 if( pSubStreams[subStream]->outMsgHelper.msg )
1492 pSubStreams[subStream]->outMsgHelper.Reset();
1498 if( pSubStreams[subStream]->inMsgHelper.handler )
1500 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
1501 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1502 XRootDMsgHandler *xrdHandler =
dynamic_cast<XRootDMsgHandler*
>( h.handler );
1503 if( xrdHandler ) xrdHandler->PartialReceived();
1512 void Stream::SockHandlerClose( uint16_t subStream )
1514 SubStreamData *sd = pSubStreams[subStream];
1516 pMutex.AddClosing(subStream);
1517 sd->socket->PreClose();
1518 AsyncSocketHandler *s =
new AsyncSocketHandler( *sd->socket );
1519 Job *job =
new SocketDestroyJob( sd->socket );
1520 pJobManager->QueueJob( job );
1522 pMutex.RemoveClosing(subStream);
struct ServerResponseBody_Status bdy
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ StreamBroken
The stream is broken.
@ FatalError
Stream has been broken and won't be recovered.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
bool HasUnsetTimeout(MsgHandler *handler)
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
virtual void OnWaitingToSend(Message *msg)
Called to indicate the message is waiting to be sent.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual void Run(void *)
The job logic.
SocketDestroyJob(AsyncSocketHandler *socket)
virtual ~SocketDestroyJob()
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
std::list< MtxInfo >::iterator fnlistit
void Lock()
Lock. Regular, non-subStream aware recursive lock.
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
std::map< uint16_t, size_t > mclosing
std::list< MtxInfo > mlist
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
std::shared_ptr< Channel > GetChannel()
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
const std::string & GetHostName() const
Get the name of the target host.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static pthread_t ID(void)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
union ServerResponse::@040373375333017131300127053271011057331004327334 body
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.