XRootD
Loading...
Searching...
No Matches
XrdClXRootDTransport.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClSocket.hh"
29#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClTls.hh"
35#include "XrdNet/XrdNetAddr.hh"
36#include "XrdNet/XrdNetUtils.hh"
39#include "XrdOuc/XrdOucUtils.hh"
40#include "XrdOuc/XrdOucCRC.hh"
42#include "XrdSys/XrdSysTimer.hh"
47#include "XrdSys/XrdSysE2T.hh"
48#include "XrdCl/XrdClTls.hh"
49#include "XrdCl/XrdClSocket.hh"
51#include "XrdVersion.hh"
52
53#include <arpa/inet.h>
54#include <sys/types.h>
55#include <unistd.h>
56#include <dlfcn.h>
57#include <sstream>
58#include <iomanip>
59#include <set>
60#include <limits>
61
62#include <atomic>
63
65
66namespace XrdCl
67{
69 {
71
72 static void UnloadHandler()
73 {
74 UnloadHandler( "root" );
75 UnloadHandler( "xroot" );
76 }
77
78 static void UnloadHandler( const std::string &trProt )
79 {
81 TransportHandler *trHandler = trManager->GetHandler( trProt );
82 trHandler->WaitBeforeExit();
83 }
84
85 void Register( const std::string &protocol )
86 {
87 XrdSysRWLockHelper scope( lock, false ); // obtain write lock
88 std::pair< std::set<std::string>::iterator, bool > ret = protocols.insert( protocol );
89 // if that's the first time we are using the protocol, the sec lib
90 // was just loaded so now's the time to register the atexit handler
91 if( ret.second )
92 {
93 atexit( UnloadHandler );
94 }
95 }
96
99 std::set<std::string> protocols;
100 };
101
102 //----------------------------------------------------------------------------
104 //----------------------------------------------------------------------------
106 {
107 //--------------------------------------------------------------------------
108 // Define the stream status for the link negotiation purposes
109 //--------------------------------------------------------------------------
122
123 //--------------------------------------------------------------------------
124 // Constructor
125 //--------------------------------------------------------------------------
127 serverFlags( 0 )
128 {
129 }
130
132 uint8_t pathId;
133 uint32_t serverFlags;
134 };
135
136 //----------------------------------------------------------------------------
138 //----------------------------------------------------------------------------
140 {
141 StreamSelector( uint16_t size )
142 {
143 //----------------------------------------------------------------------
144 // Subtract one because we shouldn't take into account the control
145 // stream.
146 //----------------------------------------------------------------------
147 strmqueues.resize( size - 1, 0 );
148 }
149
150 //------------------------------------------------------------------------
151 // @param size : number of streams
152 //------------------------------------------------------------------------
153 void AdjustQueues( uint16_t size )
154 {
155 strmqueues.resize( size - 1, 0);
156 }
157
158 //------------------------------------------------------------------------
159 // @param connected : bitarray stating if given sub-stream is connected
160 //
161 // @return : substream number
162 //------------------------------------------------------------------------
163 uint16_t Select( const std::vector<bool> &connected )
164 {
165 uint16_t ret = 0;
166 size_t minval = std::numeric_limits<size_t>::max();
167
168 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
169 {
170 if( !connected[i] ) continue;
171
172 if( strmqueues[i] < minval )
173 {
174 ret = i;
175 minval = strmqueues[i];
176 }
177 }
178
179 ++strmqueues[ret];
180 return ret + 1;
181 }
182
183 //--------------------------------------------------------------------------
184 // Update queue for given substream
185 //--------------------------------------------------------------------------
186 void MsgReceived( uint16_t substrm )
187 {
188 if( substrm > 0 )
189 --strmqueues[substrm - 1];
190 }
191
192 private:
193
194 std::vector<size_t> strmqueues;
195 };
196
198 {
199 BindPrefSelector( std::vector<std::string> && bindprefs ) :
200 bindprefs( std::move( bindprefs ) ), next( 0 )
201 {
202 }
203
204 inline const std::string& Get()
205 {
206 std::string &ret = bindprefs[next];
207 ++next;
208 if( next >= bindprefs.size() )
209 next = 0;
210 return ret;
211 }
212
213 private:
214 std::vector<std::string> bindprefs;
215 size_t next;
216 };
217
218 //----------------------------------------------------------------------------
220 //----------------------------------------------------------------------------
222 {
223 //--------------------------------------------------------------------------
224 // Constructor
225 //--------------------------------------------------------------------------
226 XRootDChannelInfo( const URL &url ):
227 serverFlags(0),
229 firstLogIn(true),
230 authBuffer(0),
231 authProtocol(0),
232 authParams(0),
233 authEnv(0),
234 finstcnt(0),
235 openFiles(0),
236 waitBarrier(0),
237 protection(0),
238 protRespBody(0),
239 protRespSize(0),
240 encrypted(false),
241 istpc(false)
242 {
244 memset( sessionId, 0, 16 );
245 memset( oldSessionId, 0, 16 );
246 }
247
248 //--------------------------------------------------------------------------
249 // Destructor
250 //--------------------------------------------------------------------------
252 {
253 delete [] authBuffer;
254 }
255
256 typedef std::vector<XRootDStreamInfo> StreamInfoVector;
257
258 //--------------------------------------------------------------------------
259 // Data
260 //--------------------------------------------------------------------------
261 uint32_t serverFlags;
263 uint8_t sessionId[16];
264 uint8_t oldSessionId[16];
266 std::shared_ptr<SIDManager> sidManager;
272 std::string streamName;
273 std::string authProtocolName;
274 std::set<uint16_t> sentOpens;
275 std::set<uint16_t> sentCloses;
276 std::atomic<uint32_t> finstcnt; // file instance count
277 uint32_t openFiles;
281 unsigned int protRespSize;
282 std::unique_ptr<StreamSelector> strmSelector;
284 bool istpc;
285 std::unique_ptr<BindPrefSelector> bindSelector;
286 std::string logintoken;
288 };
289
290 //----------------------------------------------------------------------------
291 // Constructor
292 //----------------------------------------------------------------------------
294 pSecUnloadHandler( new PluginUnloadHandler() )
295 {
296 }
297
298 //----------------------------------------------------------------------------
299 // Destructor
300 //----------------------------------------------------------------------------
302 {
303 delete pSecUnloadHandler; pSecUnloadHandler = 0;
304 }
305
306 //----------------------------------------------------------------------------
307 // Read message header from socket
308 //----------------------------------------------------------------------------
310 {
311 //--------------------------------------------------------------------------
312 // A new message - allocate the space needed for the header
313 //--------------------------------------------------------------------------
314 if( message.GetCursor() == 0 && message.GetSize() < 8 )
315 message.Allocate( 8 );
316
317 //--------------------------------------------------------------------------
318 // Read the message header
319 //--------------------------------------------------------------------------
320 if( message.GetCursor() < 8 )
321 {
322 size_t leftToBeRead = 8 - message.GetCursor();
323 while( leftToBeRead )
324 {
325 int bytesRead = 0;
326 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
327 leftToBeRead, bytesRead );
328 if( !status.IsOK() || status.code == suRetry )
329 return status;
330
331 leftToBeRead -= bytesRead;
332 message.AdvanceCursor( bytesRead );
333 }
334 UnMarshallHeader( message );
335
336 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
337 Log *log = DefaultEnv::GetLog();
338 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
339 "body", (void*)&message, bodySize );
340
341 return XRootDStatus( stOK, suDone );
342 }
344 }
345
346 //----------------------------------------------------------------------------
347 // Read message body from socket
348 //----------------------------------------------------------------------------
350 {
351 //--------------------------------------------------------------------------
352 // Retrieve the body
353 //--------------------------------------------------------------------------
354 size_t leftToBeRead = 0;
355 uint32_t bodySize = 0;
357 bodySize = rsphdr->dlen;
358
359 if( message.GetSize() < bodySize + 8 )
360 message.ReAllocate( bodySize + 8 );
361
362 leftToBeRead = bodySize-(message.GetCursor()-8);
363 while( leftToBeRead )
364 {
365 int bytesRead = 0;
366 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
367
368 if( !status.IsOK() || status.code == suRetry )
369 return status;
370
371 leftToBeRead -= bytesRead;
372 message.AdvanceCursor( bytesRead );
373 }
374
375 return XRootDStatus( stOK, suDone );
376 }
377
378 //----------------------------------------------------------------------------
379 // Read more of the message body from socket
380 //----------------------------------------------------------------------------
382 {
384 if( rsphdr->status != kXR_status )
386
387 //--------------------------------------------------------------------------
388 // In case of non kXR_status responses we read all the response, including
389 // data. For kXR_status responses we first read only the remainder of the
390 // header. The header must then be unmarshalled, and then a second call to
391 // GetMore (repeated for suRetry as needed) will read the data.
392 //--------------------------------------------------------------------------
393
394 uint32_t bodySize = rsphdr->dlen;
395 if( bodySize+8 < sizeof( ServerResponseStatus ) )
397 "kXR_status: invalid message size." );
398
400 bodySize += rspst->bdy.dlen;
401
402 if( message.GetSize() < bodySize + 8 )
403 message.ReAllocate( bodySize + 8 );
404
405 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
406 while( leftToBeRead )
407 {
408 int bytesRead = 0;
409 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
410
411 if( !status.IsOK() || status.code == suRetry )
412 return status;
413
414 leftToBeRead -= bytesRead;
415 message.AdvanceCursor( bytesRead );
416 }
417
418 // Unmarchal to message body
419 Log *log = DefaultEnv::GetLog();
421 if( !st.IsOK() && st.code == errDataError )
422 {
423 log->Error( XRootDTransportMsg, "[msg: %p] %s", (void*)&message,
424 st.GetErrorMessage().c_str() );
425 return st;
426 }
427
428 if( !st.IsOK() )
429 {
430 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
431 (void*)&message );
432 return st;
433 }
434
435 return XRootDStatus( stOK, suDone );
436 }
437
438 //----------------------------------------------------------------------------
439 // Initialize channel
440 //----------------------------------------------------------------------------
442 AnyObject &channelData )
443 {
444 XRootDChannelInfo *info = new XRootDChannelInfo( url );
445 XrdSysMutexHelper scopedLock( info->mutex );
446 channelData.Set( info );
447
448 Env *env = DefaultEnv::GetEnv();
449 int streams = DefaultSubStreamsPerChannel;
450 env->GetInt( "SubStreamsPerChannel", streams );
451 if( streams < 1 ) streams = 1;
452 info->stream.resize( streams );
453 info->strmSelector.reset( new StreamSelector( streams ) );
454 info->encrypted = url.IsSecure();
455 info->istpc = url.IsTPC();
456 info->logintoken = url.GetLoginToken();
457 }
458
459 //----------------------------------------------------------------------------
460 // Finalize channel
461 //----------------------------------------------------------------------------
465
466 //----------------------------------------------------------------------------
467 // HandShake
468 //----------------------------------------------------------------------------
470 AnyObject &channelData )
471 {
472 XRootDChannelInfo *info = 0;
473 channelData.Get( info );
474
475 if (!info)
477
478 XrdSysMutexHelper scopedLock( info->mutex );
479
480 if( info->stream.size() <= handShakeData->subStreamId )
481 {
482 Log *log = DefaultEnv::GetLog();
484 "[%s] Internal error: not enough substreams",
485 handShakeData->streamName.c_str() );
487 }
488
489 if( handShakeData->subStreamId == 0 )
490 {
491 info->streamName = handShakeData->streamName;
492 return HandShakeMain( handShakeData, channelData );
493 }
494 return HandShakeParallel( handShakeData, channelData );
495 }
496
497 //----------------------------------------------------------------------------
498 // Hand shake the main stream
499 //----------------------------------------------------------------------------
500 XRootDStatus XRootDTransport::HandShakeMain( HandShakeData *handShakeData,
501 AnyObject &channelData )
502 {
503 XRootDChannelInfo *info = 0;
504 channelData.Get( info );
505
506 if (!info) {
508 "[%s] Internal error: no channel info",
509 handShakeData->streamName.c_str());
511 }
512
513 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
514
515 //--------------------------------------------------------------------------
516 // First step - we need to create and initial handshake and send it out
517 //--------------------------------------------------------------------------
518 if( sInfo.status == XRootDStreamInfo::Disconnected ||
519 sInfo.status == XRootDStreamInfo::Broken )
520 {
521 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
523 sInfo.status = XRootDStreamInfo::HandShakeSent;
524 return XRootDStatus( stOK, suContinue );
525 }
526
527 //--------------------------------------------------------------------------
528 // Second step - we got the reply message to the initial handshake
529 //--------------------------------------------------------------------------
530 if( sInfo.status == XRootDStreamInfo::HandShakeSent )
531 {
532 XRootDStatus st = ProcessServerHS( handShakeData, info );
533 if( st.IsOK() )
535 else
536 sInfo.status = XRootDStreamInfo::Broken;
537 return st;
538 }
539
540 //--------------------------------------------------------------------------
541 // Third step - we got the response to the protocol request, we need
542 // to process it and send out a login request
543 //--------------------------------------------------------------------------
544 if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
545 {
546 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
547
548 if( !st.IsOK() )
549 {
550 sInfo.status = XRootDStreamInfo::Broken;
551 return st;
552 }
553
554 if( st.code == suRetry )
555 {
556 handShakeData->out = GenerateProtocol( handShakeData, info,
559 return XRootDStatus( stOK, suRetry );
560 }
561
562 handShakeData->out = GenerateLogIn( handShakeData, info );
563 sInfo.status = XRootDStreamInfo::LoginSent;
564 return XRootDStatus( stOK, suContinue );
565 }
566
567 //--------------------------------------------------------------------------
568 // Fourth step - handle the log in response and proceed with the
569 // authentication if required by the server
570 //--------------------------------------------------------------------------
571 if( sInfo.status == XRootDStreamInfo::LoginSent )
572 {
573 XRootDStatus st = ProcessLogInResp( handShakeData, info );
574
575 if( !st.IsOK() )
576 {
577 sInfo.status = XRootDStreamInfo::Broken;
578 return st;
579 }
580
581 if( st.IsOK() && st.code == suDone )
582 {
583 //----------------------------------------------------------------------
584 // If it's not our first log in we need to end the previous session
585 // to make sure that the server noticed our disconnection and closed
586 // all the writable handles that we owned
587 //----------------------------------------------------------------------
588 if( !info->firstLogIn )
589 {
590 handShakeData->out = GenerateEndSession( handShakeData, info );
592 return XRootDStatus( stOK, suContinue );
593 }
594
595 sInfo.status = XRootDStreamInfo::Connected;
596 info->firstLogIn = false;
597 return st;
598 }
599
600 st = DoAuthentication( handShakeData, info );
601 if( !st.IsOK() )
602 sInfo.status = XRootDStreamInfo::Broken;
603 else
604 sInfo.status = XRootDStreamInfo::AuthSent;
605 return st;
606 }
607
608 //--------------------------------------------------------------------------
609 // Fifth step and later - proceed with the authentication
610 //--------------------------------------------------------------------------
611 if( sInfo.status == XRootDStreamInfo::AuthSent )
612 {
613 XRootDStatus st = DoAuthentication( handShakeData, info );
614
615 if( !st.IsOK() )
616 {
617 sInfo.status = XRootDStreamInfo::Broken;
618 return st;
619 }
620
621 if( st.IsOK() && st.code == suDone )
622 {
623 //----------------------------------------------------------------------
624 // If it's not our first log in we need to end the previous session
625 //----------------------------------------------------------------------
626 if( !info->firstLogIn )
627 {
628 handShakeData->out = GenerateEndSession( handShakeData, info );
630 return XRootDStatus( stOK, suContinue );
631 }
632
633 sInfo.status = XRootDStreamInfo::Connected;
634 info->firstLogIn = false;
635 return st;
636 }
637
638 return st;
639 }
640
641 //--------------------------------------------------------------------------
642 // The last step - kXR_endsess returned
643 //--------------------------------------------------------------------------
644 if( sInfo.status == XRootDStreamInfo::EndSessionSent )
645 {
646 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
647
648 if( st.IsOK() && st.code == suDone )
649 {
650 sInfo.status = XRootDStreamInfo::Connected;
651 }
652 else if( !st.IsOK() )
653 {
654 sInfo.status = XRootDStreamInfo::Broken;
655 }
656
657 return st;
658 }
659
660 return XRootDStatus( stOK, suDone );
661 }
662
663 //----------------------------------------------------------------------------
664 // Hand shake parallel stream
665 //----------------------------------------------------------------------------
666 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
667 AnyObject &channelData )
668 {
669 XRootDChannelInfo *info = 0;
670 channelData.Get( info );
671
672 if (!info) {
674 "[%s] Internal error: no channel info",
675 handShakeData->streamName.c_str());
676 return XRootDStatus(stFatal, errInternal);
677 }
678
679 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
680
681 //--------------------------------------------------------------------------
682 // First step - we need to create and initial handshake and send it out
683 //--------------------------------------------------------------------------
684 if( sInfo.status == XRootDStreamInfo::Disconnected ||
685 sInfo.status == XRootDStreamInfo::Broken )
686 {
687 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
689 sInfo.status = XRootDStreamInfo::HandShakeSent;
690 return XRootDStatus( stOK, suContinue );
691 }
692
693 //--------------------------------------------------------------------------
694 // Second step - we got the reply message to the initial handshake,
695 // if successful we need to send bind
696 //--------------------------------------------------------------------------
697 if( sInfo.status == XRootDStreamInfo::HandShakeSent )
698 {
699 XRootDStatus st = ProcessServerHS( handShakeData, info );
700 if( st.IsOK() )
702 else
703 sInfo.status = XRootDStreamInfo::Broken;
704 return st;
705 }
706
707 //--------------------------------------------------------------------------
708 // Second step bis - we got the response to the protocol request, we need
709 // to process it and send out a bind request
710 //--------------------------------------------------------------------------
711 if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
712 {
713 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
714
715 if( !st.IsOK() )
716 {
717 sInfo.status = XRootDStreamInfo::Broken;
718 return st;
719 }
720
721 handShakeData->out = GenerateBind( handShakeData, info );
722 sInfo.status = XRootDStreamInfo::BindSent;
723 return XRootDStatus( stOK, suContinue );
724 }
725
726 //--------------------------------------------------------------------------
727 // Third step - we got the response to the kXR_bind
728 //--------------------------------------------------------------------------
729 if( sInfo.status == XRootDStreamInfo::BindSent )
730 {
731 XRootDStatus st = ProcessBindResp( handShakeData, info );
732
733 if( !st.IsOK() )
734 {
735 sInfo.status = XRootDStreamInfo::Broken;
736 return st;
737 }
738 sInfo.status = XRootDStreamInfo::Connected;
739 return XRootDStatus();
740 }
741 return XRootDStatus();
742 }
743
744 //------------------------------------------------------------------------
745 // @return true if handshake has been done and stream is connected,
746 // false otherwise
747 //------------------------------------------------------------------------
749 AnyObject &channelData )
750 {
751 XRootDChannelInfo *info = 0;
752 channelData.Get( info );
753
754 if (!info) {
756 "[%s] Internal error: no channel info",
757 handShakeData->streamName.c_str());
758 return false;
759 }
760
761 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
762 return ( sInfo.status == XRootDStreamInfo::Connected );
763 }
764
765 //----------------------------------------------------------------------------
766 // Check if the stream should be disconnected
767 //----------------------------------------------------------------------------
768 bool XRootDTransport::IsStreamTTLElapsed( time_t inactiveTime,
769 AnyObject &channelData )
770 {
771 XRootDChannelInfo *info = 0;
772 channelData.Get( info );
773
774 Env *env = DefaultEnv::GetEnv();
775 Log *log = DefaultEnv::GetLog();
776
777 if (!info) {
779 "Internal error: no channel info, behaving as if TTL has elapsed");
780 return true;
781 }
782
783 //--------------------------------------------------------------------------
784 // Check the TTL settings for the current server
785 //--------------------------------------------------------------------------
786 int ttl;
787 if( info->serverFlags & kXR_isServer )
788 {
790 env->GetInt( "DataServerTTL", ttl );
791 }
792 else
793 {
795 env->GetInt( "LoadBalancerTTL", ttl );
796 }
797
798 //--------------------------------------------------------------------------
799 // See whether we can give a go-ahead for the disconnection
800 //--------------------------------------------------------------------------
801 XrdSysMutexHelper scopedLock( info->mutex );
802 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
803 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
804 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
805 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
806 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
807
808 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
809 return false;
810
811 if( !allocatedSIDs && inactiveTime > ttl )
812 return true;
813
814 return false;
815 }
816
817 //----------------------------------------------------------------------------
818 // Check the stream is broken - ie. TCP connection got broken and
819 // went undetected by the TCP stack
820 //----------------------------------------------------------------------------
822 AnyObject &channelData )
823 {
824 XRootDChannelInfo *info = 0;
825 channelData.Get( info );
826 Env *env = DefaultEnv::GetEnv();
827 Log *log = DefaultEnv::GetLog();
828
829 if (!info) {
831 "Internal error: no channel info, behaving as if stream is broken");
832 return true;
833 }
834
835 int streamTimeout = DefaultStreamTimeout;
836 env->GetInt( "StreamTimeout", streamTimeout );
837
838 XrdSysMutexHelper scopedLock( info->mutex );
839
840 const time_t now = time(0);
841 const bool anySID =
842 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
843
844 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
845 "stream timeout: %d, any SID: %d, wait barrier: %s",
846 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
847 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
848
849 if( inactiveTime < streamTimeout )
850 return Status();
851
852 if( now < info->waitBarrier )
853 return Status();
854
855 if( !anySID )
856 return Status();
857
859 }
860
861 //----------------------------------------------------------------------------
862 // Multiplex
863 //----------------------------------------------------------------------------
865 {
866 return PathID( 0, 0 );
867 }
868
869 //----------------------------------------------------------------------------
870 // Multiplex
871 //----------------------------------------------------------------------------
873 AnyObject &channelData,
874 PathID *hint )
875 {
876 XRootDChannelInfo *info = 0;
877 channelData.Get( info );
878
879 if (!info) {
881 "Internal error: no channel info, cannot multiplex");
882 return PathID(0,0);
883 }
884
885 XrdSysMutexHelper scopedLock( info->mutex );
886
887 //--------------------------------------------------------------------------
888 // If we're not connected to a data server or we don't know that yet
889 // we stream through 0
890 //--------------------------------------------------------------------------
891 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
892 return PathID( 0, 0 );
893
894 //--------------------------------------------------------------------------
895 // Select the streams
896 //--------------------------------------------------------------------------
897 Log *log = DefaultEnv::GetLog();
898 uint16_t upStream = 0;
899 uint16_t downStream = 0;
900
901 if( hint )
902 {
903 upStream = hint->up;
904 downStream = hint->down;
905 }
906 else
907 {
908 upStream = 0;
909 std::vector<bool> connected;
910 connected.reserve( info->stream.size() - 1 );
911 size_t nbConnected = 0;
912 for( size_t i = 1; i < info->stream.size(); ++i )
913 if( info->stream[i].status == XRootDStreamInfo::Connected )
914 {
915 connected.push_back( true );
916 ++nbConnected;
917 }
918 else
919 connected.push_back( false );
920
921 if( nbConnected == 0 )
922 downStream = 0;
923 else
924 downStream = info->strmSelector->Select( connected );
925 }
926
927 if( upStream >= info->stream.size() )
928 {
930 "[%s] Up link stream %d does not exist, using 0",
931 info->streamName.c_str(), upStream );
932 upStream = 0;
933 }
934
935 if( downStream >= info->stream.size() )
936 {
938 "[%s] Down link stream %d does not exist, using 0",
939 info->streamName.c_str(), downStream );
940 downStream = 0;
941 }
942
943 //--------------------------------------------------------------------------
944 // Modify the message
945 //--------------------------------------------------------------------------
946 UnMarshallRequest( msg );
948 switch( hdr->requestid )
949 {
950 //------------------------------------------------------------------------
951 // Read - we update the path id to tell the server where we want to
952 // get the response, but we still send the request through stream 0
953 // We need to allocate space for read_args if we don't have it
954 // included yet
955 //------------------------------------------------------------------------
956 case kXR_read:
957 {
958 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
959 {
960 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
961 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
962 memset( newBuf, 0, 8 );
964 req->dlen += 8;
965 }
966 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
967 args->pathid = info->stream[downStream].pathId;
968 break;
969 }
970
971
972 //------------------------------------------------------------------------
973 // PgRead - we update the path id to tell the server where we want to
974 // get the response, but we still send the request through stream 0
975 // We need to allocate space for ClientPgReadReqArgs if we don't have it
976 // included yet
977 //------------------------------------------------------------------------
978 case kXR_pgread:
979 {
980 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
981 {
982 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
983 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
984 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
986 req->dlen += sizeof( ClientPgReadReqArgs );
987 }
988 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
989 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
990 args->pathid = info->stream[downStream].pathId;
991 break;
992 }
993
994 //------------------------------------------------------------------------
995 // ReadV - the situation is identical to read but we don't need any
996 // additional structures to specify the return path
997 //------------------------------------------------------------------------
998 case kXR_readv:
999 {
1001 req->pathid = info->stream[downStream].pathId;
1002 break;
1003 }
1004
1005 //------------------------------------------------------------------------
1006 // Write - multiplexing writes doesn't work properly in the server
1007 //------------------------------------------------------------------------
1008 case kXR_write:
1009 {
1010// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1011// req->pathid = info->stream[downStream].pathId;
1012 break;
1013 }
1014
1015 //------------------------------------------------------------------------
1016 // WriteV - multiplexing writes doesn't work properly in the server
1017 //------------------------------------------------------------------------
1018 case kXR_writev:
1019 {
1020// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1021// req->pathid = info->stream[downStream].pathId;
1022 break;
1023 }
1024
1025 //------------------------------------------------------------------------
1026 // PgWrite - multiplexing writes doesn't work properly in the server
1027 //------------------------------------------------------------------------
1028 case kXR_pgwrite:
1029 {
1030// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1031// req->pathid = info->stream[downStream].pathId;
1032 break;
1033 }
1034 };
1035 MarshallRequest( msg );
1036 return PathID( upStream, downStream );
1037 }
1038
1039 //----------------------------------------------------------------------------
1040 // Return a number of substreams per stream that should be created
1041 // This depends on the environment and whether we are connected to
1042 // a data server or not
1043 //----------------------------------------------------------------------------
1045 {
1046 XRootDChannelInfo *info = 0;
1047 channelData.Get( info );
1048
1049 if (!info) {
1050 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1051 return 1;
1052 }
1053
1054 XrdSysMutexHelper scopedLock( info->mutex );
1055
1056 //--------------------------------------------------------------------------
1057 // If the connection has been opened in order to orchestrate a TPC or
1058 // the remote server is a Manager or Metamanager we will need only one
1059 // (control) stream.
1060 //--------------------------------------------------------------------------
1061 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1062
1063 //--------------------------------------------------------------------------
1064 // Number of streams requested by user
1065 //--------------------------------------------------------------------------
1066 uint16_t ret = info->stream.size();
1067
1069 int nodata = DefaultTlsNoData;
1070 env->GetInt( "TlsNoData", nodata );
1071
1072 // Does the server require the stream 0 to be encrypted?
1073 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1074 ( info->serverFlags & kXR_tlsLogin ) ||
1075 ( info->serverFlags & kXR_tlsSess );
1076 // Does the server NOT require the data streams to be encrypted?
1077 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1078 // Does the user require the stream 0 to be encrypted?
1079 bool usrTlsStrm0 = info->encrypted;
1080 // Does the user NOT require the data streams to be encrypted?
1081 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1082
1083 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1084 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1085 {
1086 //------------------------------------------------------------------------
1087 // The server or user asked us to encrypt stream 0, but to send the data
1088 // (read/write) using a plain TCP connection
1089 //------------------------------------------------------------------------
1090 if( ret == 1 ) ++ret;
1091 }
1092
1093 if( ret > info->stream.size() )
1094 {
1095 info->stream.resize( ret );
1096 info->strmSelector->AdjustQueues( ret );
1097 }
1098
1099 return ret;
1100 }
1101
1102 //----------------------------------------------------------------------------
1103 // Marshall
1104 //----------------------------------------------------------------------------
1106 {
1107 ClientRequest *req = (ClientRequest*)msg;
1108 switch( req->header.requestid )
1109 {
1110 //------------------------------------------------------------------------
1111 // kXR_protocol
1112 //------------------------------------------------------------------------
1113 case kXR_protocol:
1114 req->protocol.clientpv = htonl( req->protocol.clientpv );
1115 break;
1116
1117 //------------------------------------------------------------------------
1118 // kXR_login
1119 //------------------------------------------------------------------------
1120 case kXR_login:
1121 req->login.pid = htonl( req->login.pid );
1122 break;
1123
1124 //------------------------------------------------------------------------
1125 // kXR_locate
1126 //------------------------------------------------------------------------
1127 case kXR_locate:
1128 req->locate.options = htons( req->locate.options );
1129 break;
1130
1131 //------------------------------------------------------------------------
1132 // kXR_query
1133 //------------------------------------------------------------------------
1134 case kXR_query:
1135 req->query.infotype = htons( req->query.infotype );
1136 break;
1137
1138 //------------------------------------------------------------------------
1139 // kXR_truncate
1140 //------------------------------------------------------------------------
1141 case kXR_truncate:
1142 req->truncate.offset = htonll( req->truncate.offset );
1143 break;
1144
1145 //------------------------------------------------------------------------
1146 // kXR_mkdir
1147 //------------------------------------------------------------------------
1148 case kXR_mkdir:
1149 req->mkdir.mode = htons( req->mkdir.mode );
1150 break;
1151
1152 //------------------------------------------------------------------------
1153 // kXR_chmod
1154 //------------------------------------------------------------------------
1155 case kXR_chmod:
1156 req->chmod.mode = htons( req->chmod.mode );
1157 break;
1158
1159 //------------------------------------------------------------------------
1160 // kXR_open
1161 //------------------------------------------------------------------------
1162 case kXR_open:
1163 req->open.mode = htons( req->open.mode );
1164 req->open.options = htons( req->open.options );
1165 req->open.optiont = htons( req->open.optiont );
1166 break;
1167
1168 //------------------------------------------------------------------------
1169 // kXR_read
1170 //------------------------------------------------------------------------
1171 case kXR_read:
1172 req->read.offset = htonll( req->read.offset );
1173 req->read.rlen = htonl( req->read.rlen );
1174 break;
1175
1176 //------------------------------------------------------------------------
1177 // kXR_write
1178 //------------------------------------------------------------------------
1179 case kXR_write:
1180 req->write.offset = htonll( req->write.offset );
1181 break;
1182
1183 //------------------------------------------------------------------------
1184 // kXR_mv
1185 //------------------------------------------------------------------------
1186 case kXR_mv:
1187 req->mv.arg1len = htons( req->mv.arg1len );
1188 break;
1189
1190 //------------------------------------------------------------------------
1191 // kXR_readv
1192 //------------------------------------------------------------------------
1193 case kXR_readv:
1194 {
1195 uint16_t numChunks = (req->readv.dlen)/16;
1196 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1197 for( size_t i = 0; i < numChunks; ++i )
1198 {
1199 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1200 dataChunk[i].offset = htonll( dataChunk[i].offset );
1201 }
1202 break;
1203 }
1204
1205 case kXR_clone:
1206 {
1207 uint32_t numChunks = (req->clone.dlen)/sizeof(XrdProto::clone_list);
1208 XrdProto::clone_list *dataChunk =
1209 (XrdProto::clone_list*)( msg + sizeof( ClientRequestHdr ) );
1210 for( size_t i = 0; i < numChunks; ++i )
1211 {
1212 dataChunk[i].srcOffs = htonll( dataChunk[i].srcOffs );
1213 dataChunk[i].srcLen = htonll( dataChunk[i].srcLen );
1214 dataChunk[i].dstOffs = htonll( dataChunk[i].dstOffs );
1215 }
1216 break;
1217 }
1218
1219 //------------------------------------------------------------------------
1220 // kXR_writev
1221 //------------------------------------------------------------------------
1222 case kXR_writev:
1223 {
1224 uint16_t numChunks = (req->writev.dlen)/16;
1225 XrdProto::write_list *wrtList =
1226 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1227 for( size_t i = 0; i < numChunks; ++i )
1228 {
1229 wrtList[i].wlen = htonl( wrtList[i].wlen );
1230 wrtList[i].offset = htonll( wrtList[i].offset );
1231 }
1232
1233 break;
1234 }
1235
1236 case kXR_pgread:
1237 {
1238 req->pgread.offset = htonll( req->pgread.offset );
1239 req->pgread.rlen = htonl( req->pgread.rlen );
1240 break;
1241 }
1242
1243 case kXR_pgwrite:
1244 {
1245 req->pgwrite.offset = htonll( req->pgwrite.offset );
1246 break;
1247 }
1248
1249 //------------------------------------------------------------------------
1250 // kXR_prepare
1251 //------------------------------------------------------------------------
1252 case kXR_prepare:
1253 {
1254 req->prepare.optionX = htons( req->prepare.optionX );
1255 req->prepare.port = htons( req->prepare.port );
1256 break;
1257 }
1258
1259 case kXR_chkpoint:
1260 {
1261 if( req->chkpoint.opcode == kXR_ckpXeq )
1262 MarshallRequest( msg + 24 );
1263 break;
1264 }
1265 };
1266
1267 req->header.requestid = htons( req->header.requestid );
1268 req->header.dlen = htonl( req->header.dlen );
1269 return XRootDStatus();
1270 }
1271
1272 //----------------------------------------------------------------------------
1273 // Unmarshall the request - sometimes the requests need to be rewritten,
1274 // so we need to unmarshall them
1275 //----------------------------------------------------------------------------
1277 {
1278 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1279 // We rely on the marshaling process to be symmetric!
1280 // First we unmarshall the request ID and the length because
1281 // MarshallRequest() relies on these, and then we need to unmarshall these
1282 // two again, because they get marshalled in MarshallRequest().
1283 // All this is pretty damn ugly and should be rewritten.
1284 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1285 req->header.requestid = htons( req->header.requestid );
1286 req->header.dlen = htonl( req->header.dlen );
1287 XRootDStatus st = MarshallRequest( msg );
1288 req->header.requestid = htons( req->header.requestid );
1289 req->header.dlen = htonl( req->header.dlen );
1290 msg->SetIsMarshalled( false );
1291 return st;
1292 }
1293
1294 //----------------------------------------------------------------------------
1295 // Unmarshall the body of the incoming message
1296 //----------------------------------------------------------------------------
1298 {
1300
1301 //--------------------------------------------------------------------------
1302 // kXR_ok
1303 //--------------------------------------------------------------------------
1304 if( m->hdr.status == kXR_ok )
1305 {
1306 switch( reqType )
1307 {
1308 //----------------------------------------------------------------------
1309 // kXR_protocol
1310 //----------------------------------------------------------------------
1311 case kXR_protocol:
1312 if( m->hdr.dlen < 8 )
1313 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1314 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1315 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1316 break;
1317 }
1318 }
1319 //--------------------------------------------------------------------------
1320 // kXR_error
1321 //--------------------------------------------------------------------------
1322 else if( m->hdr.status == kXR_error )
1323 {
1324 if( m->hdr.dlen < 4 )
1325 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1326 m->body.error.errnum = ntohl( m->body.error.errnum );
1327 }
1328
1329 //--------------------------------------------------------------------------
1330 // kXR_wait
1331 //--------------------------------------------------------------------------
1332 else if( m->hdr.status == kXR_wait )
1333 {
1334 if( m->hdr.dlen < 4 )
1335 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1336 m->body.wait.seconds = htonl( m->body.wait.seconds );
1337 }
1338
1339 //--------------------------------------------------------------------------
1340 // kXR_redirect
1341 //--------------------------------------------------------------------------
1342 else if( m->hdr.status == kXR_redirect )
1343 {
1344 if( m->hdr.dlen < 4 )
1345 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1346 m->body.redirect.port = htonl( m->body.redirect.port );
1347 }
1348
1349 //--------------------------------------------------------------------------
1350 // kXR_waitresp
1351 //--------------------------------------------------------------------------
1352 else if( m->hdr.status == kXR_waitresp )
1353 {
1354 if( m->hdr.dlen < 4 )
1355 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1356 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1357 }
1358
1359 //--------------------------------------------------------------------------
1360 // kXR_attn
1361 //--------------------------------------------------------------------------
1362 else if( m->hdr.status == kXR_attn )
1363 {
1364 if( m->hdr.dlen < 4 )
1365 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1366 m->body.attn.actnum = htonl( m->body.attn.actnum );
1367 }
1368
1369 return XRootDStatus();
1370 }
1371
1372 //------------------------------------------------------------------------
1374 //------------------------------------------------------------------------
1376 {
1377 //--------------------------------------------------------------------------
1378 // Calculate the crc32c before the unmarshaling the body!
1379 //--------------------------------------------------------------------------
1381 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1382 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1383 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1384
1385 size_t stlen = sizeof( ServerResponseStatus );
1386 switch( reqType )
1387 {
1388 case kXR_pgread:
1389 {
1390 stlen += sizeof( ServerResponseBody_pgRead );
1391 break;
1392 }
1393
1394 case kXR_pgwrite:
1395 {
1396 stlen += sizeof( ServerResponseBody_pgWrite );
1397 break;
1398 }
1399 }
1400
1401 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1402 "kXR_status: invalid message size." );
1403
1404 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1405 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1406
1407 switch( reqType )
1408 {
1409 case kXR_pgread:
1410 {
1412 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1413 break;
1414 }
1415
1416 case kXR_pgwrite:
1417 {
1419 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1420 break;
1421 }
1422 }
1423
1424 //--------------------------------------------------------------------------
1425 // Do the integrity checks
1426 //--------------------------------------------------------------------------
1427 if( crcval != rspst->bdy.crc32c )
1428 {
1429 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1430 "corrupted (crc32c integrity check failed)." );
1431 }
1432
1433 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1434 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1435 {
1436 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1437 "(stream ID mismatch)." );
1438 }
1439
1440
1441
1442 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1443 {
1444 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1445 "(request ID mismatch)." );
1446 }
1447
1448 return XRootDStatus();
1449 }
1450
1452 {
1454 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1455
1456 switch( reqType )
1457 {
1458 case kXR_pgwrite:
1459 {
1460 //--------------------------------------------------------------------------
1461 // If there's no additional data there's nothing to unmarshal
1462 //--------------------------------------------------------------------------
1463 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1464 //--------------------------------------------------------------------------
1465 // If there's not enough data to form correction-segment report an error
1466 //--------------------------------------------------------------------------
1467 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1469 "kXR_status: invalid message size." );
1470
1471 //--------------------------------------------------------------------------
1472 // Calculate the crc32c for the additional data
1473 //--------------------------------------------------------------------------
1475 cse->cseCRC = ntohl( cse->cseCRC );
1476 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1477 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1478 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1479
1480 //--------------------------------------------------------------------------
1481 // Do the integrity checks
1482 //--------------------------------------------------------------------------
1483 if( crcval != cse->cseCRC )
1484 {
1485 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1486 "corrupted (crc32c integrity check failed)." );
1487 }
1488
1489 cse->dlFirst = ntohs( cse->dlFirst );
1490 cse->dlLast = ntohs( cse->dlLast );
1491
1492 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1493 sizeof( kXR_int64 );
1494 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1495 sizeof( ServerResponseBody_pgWrCSE ) );
1496
1497 for( size_t i = 0; i < pgcnt; ++i )
1498 pgoffs[i] = ntohll( pgoffs[i] );
1499
1500 return XRootDStatus();
1501 break;
1502 }
1503
1504 default:
1505 break;
1506 }
1507
1509 }
1510
1511 //----------------------------------------------------------------------------
1512 // Unmarshall the header of the incoming message
1513 //----------------------------------------------------------------------------
1515 {
1517 header->status = ntohs( header->status );
1518 header->dlen = ntohl( header->dlen );
1519 }
1520
1521 //----------------------------------------------------------------------------
1522 // Log server error response
1523 //----------------------------------------------------------------------------
1525 {
1526 Log *log = DefaultEnv::GetLog();
1527 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1528 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1529 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1530 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1531 rsp->body.error.errnum, errmsg );
1532 delete [] errmsg;
1533 }
1534
1535 //------------------------------------------------------------------------
1536 // Number of currently connected data streams
1537 //------------------------------------------------------------------------
1539 {
1540 XRootDChannelInfo *info = 0;
1541 channelData.Get( info );
1542
1543 if (!info) {
1544 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1545 return 0;
1546 }
1547
1548 XrdSysMutexHelper scopedLock( info->mutex );
1549
1550 uint16_t nbConnected = 0;
1551 for( size_t i = 1; i < info->stream.size(); ++i )
1552 if( info->stream[i].status == XRootDStreamInfo::Connected )
1553 ++nbConnected;
1554
1555 return nbConnected;
1556 }
1557
1558 //----------------------------------------------------------------------------
1559 // The stream has been disconnected, do the cleanups
1560 //----------------------------------------------------------------------------
1562 uint16_t subStreamId )
1563 {
1564 XRootDChannelInfo *info = 0;
1565 channelData.Get( info );
1566
1567 if (!info) {
1568 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1569 return;
1570 }
1571
1572 XrdSysMutexHelper scopedLock( info->mutex );
1573
1574 if( !info->stream.empty() )
1575 {
1576 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1578 }
1579
1580 if( subStreamId == 0 )
1581 {
1582 CleanUpProtection( info );
1583 info->sidManager->ReleaseAllTimedOut();
1584 info->sentOpens.clear();
1585 info->sentCloses.clear();
1586 info->openFiles = 0;
1587 info->waitBarrier = 0;
1588 }
1589 }
1590
1591 //------------------------------------------------------------------------
1592 // Query the channel
1593 //------------------------------------------------------------------------
1595 AnyObject &result,
1596 AnyObject &channelData )
1597 {
1598 XRootDChannelInfo *info = 0;
1599 channelData.Get( info );
1600
1601 if (!info)
1603
1604 XrdSysMutexHelper scopedLock( info->mutex );
1605
1606 switch( query )
1607 {
1608 //------------------------------------------------------------------------
1609 // Protocol name
1610 //------------------------------------------------------------------------
1612 result.Set( (const char*)"XRootD", false );
1613 return Status();
1614
1615 //------------------------------------------------------------------------
1616 // Authentication
1617 //------------------------------------------------------------------------
1619 result.Set( new std::string( info->authProtocolName ), false );
1620 return Status();
1621
1622 //------------------------------------------------------------------------
1623 // Server flags
1624 //------------------------------------------------------------------------
1626 result.Set( new int( info->serverFlags ), false );
1627 return Status();
1628
1629 //------------------------------------------------------------------------
1630 // Protocol version
1631 //------------------------------------------------------------------------
1633 result.Set( new int( info->protocolVersion ), false );
1634 return Status();
1635
1637 result.Set( new bool( info->encrypted ), false );
1638 return Status();
1639 };
1641 }
1642
1643 //----------------------------------------------------------------------------
1644 // Check whether the transport can hijack the message
1645 //----------------------------------------------------------------------------
1647 uint16_t subStream,
1648 AnyObject &channelData )
1649 {
1650 XRootDChannelInfo *info = 0;
1651 channelData.Get( info );
1652 XrdSysMutexHelper scopedLock( info->mutex );
1653 Log *log = DefaultEnv::GetLog();
1654
1655 //--------------------------------------------------------------------------
1656 // Update the substream queues
1657 //--------------------------------------------------------------------------
1658 info->strmSelector->MsgReceived( subStream );
1659
1660 //--------------------------------------------------------------------------
1661 // Check whether this message is a response to a request that has
1662 // timed out, and if so, drop it
1663 //--------------------------------------------------------------------------
1665 if( rsp->hdr.status == kXR_attn )
1666 {
1667 return NoAction;
1668 }
1669
1670 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1671 {
1672 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1673 "response that we're no longer interested in (timed out)",
1674 (void*)&msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1675 //------------------------------------------------------------------------
1676 // If it is kXR_waitresp there will be another one,
1677 // so we don't release the sid yet
1678 //------------------------------------------------------------------------
1679 if( rsp->hdr.status != kXR_waitresp )
1680 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1681 //------------------------------------------------------------------------
1682 // If it is a successful response to an open request
1683 // that timed out, we need to send a close
1684 //------------------------------------------------------------------------
1685 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1686 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1687 if( sidIt != info->sentOpens.end() )
1688 {
1689 info->sentOpens.erase( sidIt );
1690 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1691 }
1692 return DigestMsg;
1693 }
1694
1695 //--------------------------------------------------------------------------
1696 // If we have a wait or waitresp
1697 //--------------------------------------------------------------------------
1698 uint32_t seconds = 0;
1699 if( rsp->hdr.status == kXR_wait )
1700 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1701 // to re-send the request
1702 else if( rsp->hdr.status == kXR_waitresp )
1703 {
1704 seconds = ntohl( rsp->body.waitresp.seconds );
1705
1706 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1707 "setting up wait barrier.",
1708 info->streamName.c_str(),
1709 seconds );
1710 }
1711
1712 time_t barrier = time(0) + seconds;
1713 if( info->waitBarrier < barrier )
1714 info->waitBarrier = barrier;
1715
1716 //--------------------------------------------------------------------------
1717 // If we got a response to an open request, we may need to bump the counter
1718 // of open files
1719 //--------------------------------------------------------------------------
1720 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1721 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1722 if( sidIt != info->sentOpens.end() )
1723 {
1724 if( rsp->hdr.status == kXR_waitresp )
1725 return NoAction;
1726 info->sentOpens.erase( sidIt );
1727 if( rsp->hdr.status == kXR_ok )
1728 {
1729 ++info->openFiles;
1730 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1731 }
1732 return NoAction;
1733 }
1734
1735 //--------------------------------------------------------------------------
1736 // If we got a response to a close, we may need to decrement the counter of
1737 // open files
1738 //--------------------------------------------------------------------------
1739 sidIt = info->sentCloses.find( sid );
1740 if( sidIt != info->sentCloses.end() )
1741 {
1742 if( rsp->hdr.status == kXR_waitresp )
1743 return NoAction;
1744 info->sentCloses.erase( sidIt );
1745 --info->openFiles;
1746 return NoAction;
1747 }
1748 return NoAction;
1749 }
1750
1751 //----------------------------------------------------------------------------
1752 // Notify the transport about a message having been sent
1753 //----------------------------------------------------------------------------
1755 uint16_t subStream,
1756 uint32_t bytesSent,
1757 AnyObject &channelData )
1758 {
1759 // Called when a message has been sent. For messages that return on a
1760 // different pathid (and hence may use a different poller) it is possible
1761 // that the server has already replied and the reply will trigger
1762 // MessageReceived() before this method has been called. However for open
1763 // and close this is never the case and this method is used for tracking
1764 // only those.
1765 XRootDChannelInfo *info = 0;
1766 channelData.Get( info );
1767 XrdSysMutexHelper scopedLock( info->mutex );
1768 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1769 uint16_t reqid = ntohs( req->header.requestid );
1770
1771
1772 //--------------------------------------------------------------------------
1773 // We need to track opens to know if we can close streams due to idleness
1774 //--------------------------------------------------------------------------
1775 uint16_t sid;
1776 memcpy( &sid, req->header.streamid, 2 );
1777
1778 if( reqid == kXR_open )
1779 info->sentOpens.insert( sid );
1780 else if( reqid == kXR_close )
1781 info->sentCloses.insert( sid );
1782 }
1783
1784
1785 //----------------------------------------------------------------------------
1786 // Get signature for given message
1787 //----------------------------------------------------------------------------
1789 {
1790 XRootDChannelInfo *info = 0;
1791 channelData.Get( info );
1792 return GetSignature( toSign, sign, info );
1793 }
1794
1795 //------------------------------------------------------------------------
1797 //------------------------------------------------------------------------
1799 Message *&sign,
1800 XRootDChannelInfo *info )
1801 {
1802 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1803 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1804
1805 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1806 if( !info ) return Status( stError, errInternal );
1807 if( info->protection )
1808 {
1809 SecurityRequest *newreq = 0;
1810 // check if we have to secure the request in the first place
1811 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1812 // secure (sign/encrypt) the request
1813 int rc = info->protection->Secure( newreq, *thereq, 0 );
1814 // there was an error
1815 if( rc < 0 )
1816 return Status( stError, errInternal, -rc );
1817
1818 sign = new Message();
1819 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1820 }
1821
1822 return Status();
1823 }
1824
1825 //------------------------------------------------------------------------
1827 //------------------------------------------------------------------------
1829 {
1830 XRootDChannelInfo *info = 0;
1831 channelData.Get( info );
1832 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1833 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1834 }
1835
1836 //----------------------------------------------------------------------------
1837 // Wait before exit
1838 //----------------------------------------------------------------------------
1840 {
1841 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1842 pSecUnloadHandler->unloaded = true;
1843 }
1844
1845 //----------------------------------------------------------------------------
1846 // @return : true if encryption should be turned on, false otherwise
1847 //----------------------------------------------------------------------------
1849 AnyObject &channelData )
1850 {
1851 XRootDChannelInfo *info = 0;
1852 channelData.Get( info );
1853
1855 int notlsok = DefaultNoTlsOK;
1856 env->GetInt( "NoTlsOK", notlsok );
1857
1858
1859 if( notlsok )
1860 return info->encrypted;
1861
1862 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1863
1864 // Did the server instructed us to switch to TLS right away?
1865 if( sInfo.serverFlags & kXR_gotoTLS )
1866 {
1867 if( handShakeData->subStreamId == 0 ) info->encrypted = true;
1868 return true ;
1869 }
1870
1871 //--------------------------------------------------------------------------
1872 // The control stream (sub-stream 0) might need to switch to TLS before
1873 // login or after login
1874 //--------------------------------------------------------------------------
1875 if( handShakeData->subStreamId == 0 )
1876 {
1877 //------------------------------------------------------------------------
1878 // We are about to login and the server asked to start encrypting
1879 // before login
1880 //------------------------------------------------------------------------
1881 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1882 ( info->serverFlags & kXR_tlsLogin ) )
1883 {
1884 info->encrypted = true;
1885 return true;
1886 }
1887
1888 //--------------------------------------------------------------------
1889 // The hand-shake is done and the server requested to encrypt the session
1890 //--------------------------------------------------------------------
1891 if( (sInfo.status == XRootDStreamInfo::Connected ||
1892 //--------------------------------------------------------------------
1893 // we really need to turn on TLS before we sent kXR_endsess and we
1894 // are about to do so (1st enable encryption, then send kXR_endsess)
1895 //--------------------------------------------------------------------
1897 ( info->serverFlags & kXR_tlsSess ) )
1898 {
1899 info->encrypted = true;
1900 return true;
1901 }
1902 }
1903 //--------------------------------------------------------------------------
1904 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1905 // bind.
1906 //--------------------------------------------------------------------------
1907 else
1908 {
1909 //------------------------------------------------------------------------
1910 // We are about to bind a data stream and the server asked to start
1911 // encrypting before bind
1912 //------------------------------------------------------------------------
1913 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1914 ( info->serverFlags & kXR_tlsData ) )
1915 {
1916 return true;
1917 }
1918 }
1919
1920 return false;
1921 }
1922
1923 //------------------------------------------------------------------------
1924 // Get bind preference for the next data stream
1925 //------------------------------------------------------------------------
1927 AnyObject &channelData )
1928 {
1929 XRootDChannelInfo *info = 0;
1930 channelData.Get( info );
1931
1932 if(!info || !info->bindSelector)
1933 return url;
1934
1935 return URL( info->bindSelector->Get() );
1936 }
1937
1938 //----------------------------------------------------------------------------
1939 // Generate the message to be sent as an initial handshake
1940 // (handshake+kXR_protocol)
1941 //----------------------------------------------------------------------------
1942 Message *XRootDTransport::GenerateInitialHSProtocol( HandShakeData *hsData,
1943 XRootDChannelInfo *info,
1944 kXR_char expect )
1945 {
1946 Log *log = DefaultEnv::GetLog();
1948 "[%s] Sending out the initial hand shake + kXR_protocol",
1949 hsData->streamName.c_str() );
1950
1951 Message *msg = new Message();
1952
1953 msg->Allocate( 20+sizeof(ClientProtocolRequest) );
1954 msg->Zero();
1955
1957 init->fourth = htonl(4);
1958 init->fifth = htonl(2012);
1959
1961 InitProtocolReq( proto, info, expect );
1962
1963 return msg;
1964 }
1965
1966 //------------------------------------------------------------------------
1967 // Generate the protocol message
1968 //------------------------------------------------------------------------
1969 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1970 XRootDChannelInfo *info,
1971 kXR_char expect )
1972 {
1973 Log *log = DefaultEnv::GetLog();
1974 log->Debug( XRootDTransportMsg,
1975 "[%s] Sending out the kXR_protocol",
1976 hsData->streamName.c_str() );
1977
1978 Message *msg = new Message();
1979 msg->Allocate( sizeof(ClientProtocolRequest) );
1980 msg->Zero();
1981
1982 ClientProtocolRequest *proto = (ClientProtocolRequest *)msg->GetBuffer();
1983 InitProtocolReq( proto, info, expect );
1984
1985 return msg;
1986 }
1987
1988 //------------------------------------------------------------------------
1989 // Initialize protocol request
1990 //------------------------------------------------------------------------
1991 void XRootDTransport::InitProtocolReq( ClientProtocolRequest *request,
1992 XRootDChannelInfo *info,
1993 kXR_char expect )
1994 {
1995 request->requestid = htons(kXR_protocol);
1996 request->clientpv = htonl(kXR_PROTOCOLVERSION);
1999
2000 int notlsok = DefaultNoTlsOK;
2001 int tlsnodata = DefaultTlsNoData;
2002
2003 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
2004
2005 env->GetInt( "NoTlsOK", notlsok );
2006
2008 env->GetInt( "TlsNoData", tlsnodata );
2009
2010 if (info->encrypted || InitTLS())
2012
2013 if (info->encrypted && !(notlsok || tlsnodata))
2015
2016 request->expect = expect;
2017
2018 //--------------------------------------------------------------------------
2019 // If we are in the curse of establishing a connection in the context of
2020 // TPC update the expect! (this will be never followed be a bind)
2021 //--------------------------------------------------------------------------
2022 if( info->istpc )
2024 }
2025
2026 //----------------------------------------------------------------------------
2027 // Process the server initial handshake response
2028 //----------------------------------------------------------------------------
2029 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
2030 XRootDChannelInfo *info )
2031 {
2032 Log *log = DefaultEnv::GetLog();
2033
2034 Message *msg = hsData->in;
2035 ServerResponseHeader *respHdr = (ServerResponseHeader *)msg->GetBuffer();
2036 ServerInitHandShake *hs = (ServerInitHandShake *)msg->GetBuffer(4);
2037
2038 if( respHdr->status != kXR_ok )
2039 {
2040 log->Error( XRootDTransportMsg, "[%s] Invalid hand shake response",
2041 hsData->streamName.c_str() );
2042
2043 return XRootDStatus( stFatal, errHandShakeFailed, 0, "Invalid hand shake response." );
2044 }
2045
2046 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2047 const uint32_t pv = ntohl(hs->protover);
2048 sInfo.serverFlags = ntohl(hs->msgval) == kXR_DataServer ?
2051
2052 if( hsData->subStreamId == 0 )
2053 {
2054 info->protocolVersion = pv;
2055 info->serverFlags = sInfo.serverFlags;
2056 }
2057
2058 log->Debug( XRootDTransportMsg,
2059 "[%s] Got the server hand shake response (%s, protocol "
2060 "version %x)",
2061 hsData->streamName.c_str(),
2062 ServerFlagsToStr( sInfo.serverFlags ).c_str(),
2063 info->protocolVersion );
2064
2065 return XRootDStatus( stOK, suContinue );
2066 }
2067
2068 //----------------------------------------------------------------------------
2069 // Process the protocol response
2070 //----------------------------------------------------------------------------
2071 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
2072 XRootDChannelInfo *info )
2073 {
2074 Log *log = DefaultEnv::GetLog();
2075
2076 XRootDStatus st = UnMarshallBody( hsData->in, kXR_protocol );
2077 if( !st.IsOK() )
2078 return st;
2079
2080 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2081
2082
2083 if( rsp->hdr.status != kXR_ok )
2084 {
2085 log->Error( XRootDTransportMsg, "[%s] kXR_protocol request failed",
2086 hsData->streamName.c_str() );
2087
2088 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_protocol request failed" );
2089 }
2090
2091 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2092 if( rsp->body.protocol.pval >= 0x297 )
2093 sInfo.serverFlags = rsp->body.protocol.flags;
2094
2095 if( hsData->subStreamId > 0 )
2096 return XRootDStatus( stOK, suContinue );
2097
2098 info->serverFlags = sInfo.serverFlags;
2099
2100 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
2101 int notlsok = DefaultNoTlsOK;
2102 env->GetInt( "NoTlsOK", notlsok );
2103
2104 if( rsp->body.protocol.pval < kXR_PROTTLSVERSION && info->encrypted )
2105 {
2106 //------------------------------------------------------------------------
2107 // User requested an encrypted connection but the server is to old to
2108 // support it!
2109 //------------------------------------------------------------------------
2110 if( !notlsok ) return XRootDStatus( stFatal, errTlsError, ENOTSUP, "TLS not supported" );
2111
2112 //------------------------------------------------------------------------
2113 // We are falling back to unencrypted data transmission, as configured
2114 // in XRD_NOTLSOK environment variable
2115 //------------------------------------------------------------------------
2116 log->Info( XRootDTransportMsg,
2117 "[%s] Falling back to unencrypted transmission, server does "
2118 "not support TLS encryption.",
2119 hsData->streamName.c_str() );
2120 info->encrypted = false;
2121 }
2122
2123 if( rsp->body.protocol.pval >= 0x297 )
2124 info->serverFlags = rsp->body.protocol.flags;
2125
2126 if( rsp->hdr.dlen > 8 )
2127 {
2128 info->protRespBody = new ServerResponseBody_Protocol();
2129 info->protRespBody->flags = rsp->body.protocol.flags;
2130 info->protRespBody->pval = rsp->body.protocol.pval;
2131
2132 char* bodybuff = reinterpret_cast<char*>( &rsp->body.protocol.secreq );
2133 size_t bodysize = rsp->hdr.dlen - 8;
2134 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2135 if( !st.IsOK() )
2136 return st;
2137 }
2138
2139 log->Debug( XRootDTransportMsg,
2140 "[%s] kXR_protocol successful (%s, protocol version %x)",
2141 hsData->streamName.c_str(),
2142 ServerFlagsToStr( info->serverFlags ).c_str(),
2143 info->protocolVersion );
2144
2145 if( !( info->serverFlags & kXR_haveTLS ) && info->encrypted )
2146 {
2147 //------------------------------------------------------------------------
2148 // User requested an encrypted connection but the server was not configured
2149 // to support encryption!
2150 //------------------------------------------------------------------------
2151 return XRootDStatus( stFatal, errTlsError, ECONNREFUSED,
2152 "Server was not configured to support encryption." );
2153 }
2154
2155 //--------------------------------------------------------------------------
2156 // Now see if we have to enforce encryption in case the server does not
2157 // support PgRead/PgWrite
2158 //--------------------------------------------------------------------------
2159 int tlsOnNoPgrw = DefaultWantTlsOnNoPgrw;
2160 env->GetInt( "WantTlsOnNoPgrw", tlsOnNoPgrw );
2161 if( !( info->serverFlags & kXR_suppgrw ) && tlsOnNoPgrw )
2162 {
2163 //------------------------------------------------------------------------
2164 // If user requested encryption just make sure it is not switched off for
2165 // data
2166 //------------------------------------------------------------------------
2167 if( info->encrypted )
2168 {
2169 log->Debug( XRootDTransportMsg,
2170 "[%s] Server does not support PgRead/PgWrite and"
2171 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2172 hsData->streamName.c_str() );
2173 env->PutInt( "TlsNoData", DefaultTlsNoData );
2174 }
2175 //------------------------------------------------------------------------
2176 // Otherwise, if server is not enforcing data encryption, we will need to
2177 // redo the protocol request with kXR_wantTLS set.
2178 //------------------------------------------------------------------------
2179 else if( !( info->serverFlags & kXR_tlsData ) &&
2180 ( info->serverFlags & kXR_haveTLS ) )
2181 {
2182 info->encrypted = true;
2183 return XRootDStatus( stOK, suRetry );
2184 }
2185 }
2186
2187 return XRootDStatus( stOK, suContinue );
2188 }
2189
2190 XRootDStatus XRootDTransport::ProcessProtocolBody( char *bodybuff,
2191 size_t bodysize,
2192 XRootDChannelInfo *info )
2193 {
2194 //--------------------------------------------------------------------------
2195 // Parse bind preferences
2196 //--------------------------------------------------------------------------
2197 XrdProto::bifReqs *bifreq = reinterpret_cast<XrdProto::bifReqs*>( bodybuff );
2198 if( bodysize >= sizeof( XrdProto::bifReqs ) && bifreq->theTag == 'B' )
2199 {
2200 bodybuff += sizeof( XrdProto::bifReqs );
2201 bodysize -= sizeof( XrdProto::bifReqs );
2202
2203 if( bodysize < bifreq->bifILen )
2204 return XRootDStatus( stError, errDataError, 0, "Received incomplete "
2205 "protocol response." );
2206 std::string bindprefs_str( bodybuff, bifreq->bifILen );
2207 std::vector<std::string> bindprefs;
2208 Utils::splitString( bindprefs, bindprefs_str, "," );
2209 info->bindSelector.reset( new BindPrefSelector( std::move( bindprefs ) ) );
2210 bodybuff += bifreq->bifILen;
2211 bodysize -= bifreq->bifILen;
2212 }
2213 //--------------------------------------------------------------------------
2214 // Parse security requirements
2215 //--------------------------------------------------------------------------
2216 XrdProto::secReqs *secreq = reinterpret_cast<XrdProto::secReqs*>( bodybuff );
2217 if( bodysize >= 6 /*XrdProto::secReqs*/ && secreq->theTag == 'S' )
2218 {
2219 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2220 info->protRespSize = bodysize + 8 /*pval & flags*/;
2221 }
2222
2223 return XRootDStatus();
2224 }
2225
2226 //----------------------------------------------------------------------------
2227 // Generate the bind message
2228 //----------------------------------------------------------------------------
2229 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2230 XRootDChannelInfo *info )
2231 {
2232 Log *log = DefaultEnv::GetLog();
2233
2234 log->Debug( XRootDTransportMsg,
2235 "[%s] Sending out the bind request",
2236 hsData->streamName.c_str() );
2237
2238
2239 Message *msg = new Message( sizeof( ClientBindRequest ) );
2240 ClientBindRequest *bindReq = (ClientBindRequest *)msg->GetBuffer();
2241
2242 bindReq->requestid = kXR_bind;
2243 memcpy( bindReq->sessid, info->sessionId, 16 );
2244 bindReq->dlen = 0;
2245 MarshallRequest( msg );
2246 return msg;
2247 }
2248
2249 //----------------------------------------------------------------------------
2250 // Generate the bind message
2251 //----------------------------------------------------------------------------
2252 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2253 XRootDChannelInfo *info )
2254 {
2255 Log *log = DefaultEnv::GetLog();
2256
2257 XRootDStatus st = UnMarshallBody( hsData->in, kXR_bind );
2258 if( !st.IsOK() )
2259 return st;
2260
2261 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2262
2263 if( rsp->hdr.status != kXR_ok )
2264 {
2265 log->Error( XRootDTransportMsg, "[%s] kXR_bind request failed",
2266 hsData->streamName.c_str() );
2267 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_bind request failed" );
2268 }
2269
2270 info->stream[hsData->subStreamId].pathId = rsp->body.bind.substreamid;
2271 log->Debug( XRootDTransportMsg, "[%s] kXR_bind successful",
2272 hsData->streamName.c_str() );
2273
2274 return XRootDStatus();
2275 }
2276
2277 //----------------------------------------------------------------------------
2278 // Generate the login message
2279 //----------------------------------------------------------------------------
2280 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2281 XRootDChannelInfo *info )
2282 {
2283 Log *log = DefaultEnv::GetLog();
2284 Env *env = DefaultEnv::GetEnv();
2285
2286 //--------------------------------------------------------------------------
2287 // Compute the login cgi
2288 //--------------------------------------------------------------------------
2289 int timeZone = XrdSysTimer::TimeZone();
2290 char *hostName = XrdNetUtils::MyHostName();
2291 std::string countryCode = Utils::FQDNToCC( hostName );
2292 char *cgiBuffer = new char[1024 + info->logintoken.size()];
2293 std::string appName;
2294 std::string monInfo;
2295 env->GetString( "AppName", appName );
2296 env->GetString( "MonInfo", monInfo );
2297 if( info->logintoken.empty() )
2298 {
2299 snprintf( cgiBuffer, 1024,
2300 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2301 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2302 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2303 }
2304 else
2305 {
2306 snprintf( cgiBuffer, 1024,
2307 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2308 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2309 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2310 }
2311 uint16_t cgiLen = strlen( cgiBuffer );
2312 free( hostName );
2313
2314 //--------------------------------------------------------------------------
2315 // Generate the message
2316 //--------------------------------------------------------------------------
2317 Message *msg = new Message( sizeof(ClientLoginRequest) + cgiLen );
2318 ClientLoginRequest *loginReq = (ClientLoginRequest *)msg->GetBuffer();
2319
2320 loginReq->requestid = kXR_login;
2321 loginReq->pid = ::getpid();
2322 loginReq->capver[0] = (kXR_char) kXR_asyncap | (kXR_char) kXR_ver005;
2323 loginReq->dlen = cgiLen;
2325#ifdef WITH_XRDEC
2326 loginReq->ability2 = kXR_ecredir;
2327#endif
2328
2329 int multiProtocol = 0;
2330 env->GetInt( "MultiProtocol", multiProtocol );
2331 if(multiProtocol)
2332 loginReq->ability |= kXR_multipr;
2333
2334 //--------------------------------------------------------------------------
2335 // Check the IP stacks
2336 //--------------------------------------------------------------------------
2338 bool dualStack = false;
2339 bool privateIPv6 = false;
2340 bool privateIPv4 = false;
2341
2342 if( (stacks & XrdNetUtils::hasIP64) == XrdNetUtils::hasIP64 )
2343 {
2344 dualStack = true;
2345 loginReq->ability |= kXR_hasipv64;
2346 }
2347
2348 if( (stacks & XrdNetUtils::hasIPv6) && !(stacks & XrdNetUtils::hasPub6) )
2349 {
2350 privateIPv6 = true;
2351 loginReq->ability |= kXR_onlyprv6;
2352 }
2353
2354 if( (stacks & XrdNetUtils::hasIPv4) && !(stacks & XrdNetUtils::hasPub4) )
2355 {
2356 privateIPv4 = true;
2357 loginReq->ability |= kXR_onlyprv4;
2358 }
2359
2360 // The following code snippet tries to overcome the problem that this host
2361 // may still be dual-stacked but we don't know it because one of the
2362 // interfaces was not registered in DNS.
2363 //
2364 if( !dualStack && hsData->serverAddr )
2365 {if ( ( ( stacks & XrdNetUtils::hasIPv4 )
2366 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv6))
2367 || ( ( stacks & XrdNetUtils::hasIPv6 )
2368 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv4)))
2369 {dualStack = true;
2370 loginReq->ability |= kXR_hasipv64;
2371 }
2372 }
2373
2374 //--------------------------------------------------------------------------
2375 // Check the username
2376 //--------------------------------------------------------------------------
2377 std::string buffer( 8, 0 );
2378 if( hsData->url->GetUserName().length() )
2379 buffer = hsData->url->GetUserName();
2380 else
2381 {
2382 char *name = new char[1024];
2383 if( !XrdOucUtils::UserName( geteuid(), name, 1024 ) )
2384 buffer = name;
2385 else
2386 buffer = "_anon_";
2387 delete [] name;
2388 }
2389 buffer.resize( 8, 0 );
2390 std::copy( buffer.begin(), buffer.end(), (char*)loginReq->username );
2391
2392 msg->Append( cgiBuffer, cgiLen, 24 );
2393
2394 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_login request, "
2395 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2396 "private IPv6: %s", hsData->streamName.c_str(),
2397 loginReq->username, cgiBuffer, dualStack ? "true" : "false",
2398 privateIPv4 ? "true" : "false",
2399 privateIPv6 ? "true" : "false" );
2400
2401 delete [] cgiBuffer;
2402 MarshallRequest( msg );
2403 return msg;
2404 }
2405
2406 //----------------------------------------------------------------------------
2407 // Process the protocol response
2408 //----------------------------------------------------------------------------
2409 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2410 XRootDChannelInfo *info )
2411 {
2412 Log *log = DefaultEnv::GetLog();
2413
2414 XRootDStatus st = UnMarshallBody( hsData->in, kXR_login );
2415 if( !st.IsOK() )
2416 return st;
2417
2418 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2419
2420 if( rsp->hdr.status != kXR_ok )
2421 {
2422 log->Error( XRootDTransportMsg, "[%s] Got invalid login response",
2423 hsData->streamName.c_str() );
2424 return XRootDStatus( stFatal, errLoginFailed, 0, "Got invalid login response." );
2425 }
2426
2427 if( !info->firstLogIn )
2428 memcpy( info->oldSessionId, info->sessionId, 16 );
2429
2430 if( rsp->hdr.dlen == 0 && info->protocolVersion <= 0x289 )
2431 {
2432 //--------------------------------------------------------------------------
2433 // This if statement is there only to support dCache inaccurate
2434 // implementation of XRoot protocol, that in some cases returns
2435 // an empty login response for protocol version <= 2.8.9.
2436 //--------------------------------------------------------------------------
2437 memset( info->sessionId, 0, 16 );
2438 log->Warning( XRootDTransportMsg,
2439 "[%s] Logged in, accepting empty login response.",
2440 hsData->streamName.c_str() );
2441 return XRootDStatus();
2442 }
2443
2444 if( rsp->hdr.dlen < 16 )
2445 return XRootDStatus( stError, errDataError, 0, "Login response too short." );
2446
2447 memcpy( info->sessionId, rsp->body.login.sessid, 16 );
2448
2449 std::string sessId = Utils::Char2Hex( rsp->body.login.sessid, 16 );
2450
2451 log->Debug( XRootDTransportMsg, "[%s] Logged in, session: %s",
2452 hsData->streamName.c_str(), sessId.c_str() );
2453
2454 //--------------------------------------------------------------------------
2455 // We have an authentication info to process
2456 //--------------------------------------------------------------------------
2457 if( rsp->hdr.dlen > 16 )
2458 {
2459 size_t len = rsp->hdr.dlen-16;
2460 info->authBuffer = new char[len+1];
2461 info->authBuffer[len] = 0;
2462 memcpy( info->authBuffer, rsp->body.login.sec, len );
2463 log->Debug( XRootDTransportMsg, "[%s] Authentication is required: %s",
2464 hsData->streamName.c_str(), info->authBuffer );
2465
2466 return XRootDStatus( stOK, suContinue );
2467 }
2468
2469 return XRootDStatus();
2470 }
2471
2472 //----------------------------------------------------------------------------
2473 // Do the authentication
2474 //----------------------------------------------------------------------------
2475 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2476 XRootDChannelInfo *info )
2477 {
2478 //--------------------------------------------------------------------------
2479 // Prepare
2480 //--------------------------------------------------------------------------
2481 Log *log = DefaultEnv::GetLog();
2482 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2483 XrdSecCredentials *credentials = 0;
2484 std::string protocolName;
2485
2486 //--------------------------------------------------------------------------
2487 // We're doing this for the first time
2488 //--------------------------------------------------------------------------
2489 if( sInfo.status == XRootDStreamInfo::LoginSent )
2490 {
2491 log->Debug( XRootDTransportMsg, "[%s] Sending authentication data",
2492 hsData->streamName.c_str() );
2493
2494 //------------------------------------------------------------------------
2495 // Set up the authentication environment
2496 //------------------------------------------------------------------------
2497 info->authEnv = new XrdOucEnv();
2498 info->authEnv->Put( "sockname", hsData->clientName.c_str() );
2499 info->authEnv->Put( "username", hsData->url->GetUserName().c_str() );
2500 info->authEnv->Put( "password", hsData->url->GetPassword().c_str() );
2501
2502 const URL::ParamsMap &urlParams = hsData->url->GetParams();
2503 URL::ParamsMap::const_iterator it;
2504 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2505 {
2506 if( it->first.compare( 0, 4, "xrd." ) == 0 ||
2507 it->first.compare( 0, 6, "xrdcl." ) == 0 )
2508 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2509 }
2510
2511 //------------------------------------------------------------------------
2512 // Initialize some other structs
2513 //------------------------------------------------------------------------
2514 size_t authBuffLen = strlen( info->authBuffer );
2515 char *pars = (char *)malloc( authBuffLen + 1 );
2516 memcpy( pars, info->authBuffer, authBuffLen );
2517 info->authParams = new XrdSecParameters( pars, authBuffLen );
2518 sInfo.status = XRootDStreamInfo::AuthSent;
2519 delete [] info->authBuffer;
2520 info->authBuffer = 0;
2521
2522 //------------------------------------------------------------------------
2523 // Find a protocol that gives us valid credentials
2524 //------------------------------------------------------------------------
2525 XRootDStatus st = GetCredentials( credentials, hsData, info );
2526 if( !st.IsOK() )
2527 {
2528 CleanUpAuthentication( info );
2529 return st;
2530 }
2531 protocolName = info->authProtocol->Entity.prot;
2532 }
2533
2534 //--------------------------------------------------------------------------
2535 // We've been here already
2536 //--------------------------------------------------------------------------
2537 else
2538 {
2539 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2540 protocolName = info->authProtocol->Entity.prot;
2541
2542 //------------------------------------------------------------------------
2543 // We're required to send out more authentication data
2544 //------------------------------------------------------------------------
2545 if( rsp->hdr.status == kXR_authmore )
2546 {
2547 log->Debug( XRootDTransportMsg,
2548 "[%s] Sending more authentication data for %s",
2549 hsData->streamName.c_str(), protocolName.c_str() );
2550
2551 uint32_t len = rsp->hdr.dlen;
2552 char *secTokenData = (char*)malloc( len );
2553 memcpy( secTokenData, rsp->body.authmore.data, len );
2554 XrdSecParameters *secToken = new XrdSecParameters( secTokenData, len );
2555 XrdOucErrInfo ei( "", info->authEnv);
2556 credentials = info->authProtocol->getCredentials( secToken, &ei );
2557 delete secToken;
2558
2559 //----------------------------------------------------------------------
2560 // The protocol handler refuses to give us the data
2561 //----------------------------------------------------------------------
2562 if( !credentials )
2563 {
2564 log->Error( XRootDTransportMsg,
2565 "[%s] Auth protocol handler for %s refuses to give "
2566 "us more credentials %s",
2567 hsData->streamName.c_str(), protocolName.c_str(),
2568 ei.getErrText() );
2569 CleanUpAuthentication( info );
2570 return XRootDStatus( stFatal, errAuthFailed, 0, ei.getErrText() );
2571 }
2572 }
2573
2574 //------------------------------------------------------------------------
2575 // We have succeeded
2576 //------------------------------------------------------------------------
2577 else if( rsp->hdr.status == kXR_ok )
2578 {
2579 info->authProtocolName = info->authProtocol->Entity.prot;
2580
2581 //----------------------------------------------------------------------
2582 // Do we need protection?
2583 //----------------------------------------------------------------------
2584 if( info->protRespBody )
2585 {
2586 int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2587 if( rc > 0 )
2588 {
2589 log->Debug( XRootDTransportMsg,
2590 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2591 }
2592 else if( rc == 0 )
2593 {
2594 log->Debug( XRootDTransportMsg,
2595 "[%s] XrdSecProtect: no protection needed.",
2596 hsData->streamName.c_str() );
2597 }
2598 else
2599 {
2600 log->Debug( XRootDTransportMsg,
2601 "[%s] Failed to load XrdSecProtect: %s",
2602 hsData->streamName.c_str(), XrdSysE2T( -rc ) );
2603 CleanUpAuthentication( info );
2604
2605 return XRootDStatus( stError, errAuthFailed, -rc, XrdSysE2T( -rc ) );
2606 }
2607 }
2608
2609 if( !info->protection )
2610 CleanUpAuthentication( info );
2611 else
2612 pSecUnloadHandler->Register( info->authProtocolName );
2613
2614 log->Debug( XRootDTransportMsg,
2615 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2616 protocolName.c_str() );
2617
2618 //--------------------------------------------------------------------
2619 // Clear the SSL error queue of the calling thread, as there might be
2620 // some leftover from the authentication!
2621 //--------------------------------------------------------------------
2623
2624 return XRootDStatus();
2625 }
2626 //------------------------------------------------------------------------
2627 // Failure
2628 //------------------------------------------------------------------------
2629 else if( rsp->hdr.status == kXR_error )
2630 {
2631 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
2632 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
2633 log->Error( XRootDTransportMsg,
2634 "[%s] Authentication with %s failed: %s",
2635 hsData->streamName.c_str(), protocolName.c_str(),
2636 errmsg );
2637 delete [] errmsg;
2638
2639 info->authProtocol->Delete();
2640 info->authProtocol = 0;
2641
2642 //----------------------------------------------------------------------
2643 // Find another protocol that gives us valid credentials
2644 //----------------------------------------------------------------------
2645 XRootDStatus st = GetCredentials( credentials, hsData, info );
2646 if( !st.IsOK() )
2647 {
2648 CleanUpAuthentication( info );
2649 return st;
2650 }
2651 protocolName = info->authProtocol->Entity.prot;
2652 }
2653 //------------------------------------------------------------------------
2654 // God knows what
2655 //------------------------------------------------------------------------
2656 else
2657 {
2658 info->authProtocolName = info->authProtocol->Entity.prot;
2659 CleanUpAuthentication( info );
2660
2661 log->Error( XRootDTransportMsg,
2662 "[%s] Authentication with %s failed: unexpected answer",
2663 hsData->streamName.c_str(), protocolName.c_str() );
2664 return XRootDStatus( stFatal, errAuthFailed, 0, "Authentication failed: unexpected answer." );
2665 }
2666 }
2667
2668 //--------------------------------------------------------------------------
2669 // Generate the client request
2670 //--------------------------------------------------------------------------
2671 Message *msg = new Message( sizeof(ClientAuthRequest)+credentials->size );
2672 msg->Zero();
2673 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
2674 char *reqBuffer = msg->GetBuffer(sizeof(ClientAuthRequest));
2675
2676 req->header.requestid = kXR_auth;
2677 req->auth.dlen = credentials->size;
2678 memcpy( req->auth.credtype, protocolName.c_str(),
2679 protocolName.length() > 4 ? 4 : protocolName.length() );
2680
2681 memcpy( reqBuffer, credentials->buffer, credentials->size );
2682 hsData->out = msg;
2683 MarshallRequest( msg );
2684 delete credentials;
2685
2686 //------------------------------------------------------------------------
2687 // Clear the SSL error queue of the calling thread, as there might be
2688 // some leftover from the authentication!
2689 //------------------------------------------------------------------------
2691
2692 return XRootDStatus( stOK, suContinue );
2693 }
2694
2695 //------------------------------------------------------------------------
2696 // Get the initial credentials using one of the protocols
2697 //------------------------------------------------------------------------
2698 XRootDStatus XRootDTransport::GetCredentials( XrdSecCredentials *&credentials,
2699 HandShakeData *hsData,
2700 XRootDChannelInfo *info )
2701 {
2702 //--------------------------------------------------------------------------
2703 // Set up the auth handler
2704 //--------------------------------------------------------------------------
2705 Log *log = DefaultEnv::GetLog();
2706 XrdOucErrInfo ei( "", info->authEnv);
2707 XrdSecGetProt_t authHandler = GetAuthHandler();
2708 if( !authHandler )
2709 return XRootDStatus( stFatal, errAuthFailed, 0, "Could not load authentication handler." );
2710
2711 //--------------------------------------------------------------------------
2712 // Retrieve secuid and secgid, if available. These will override the fsuid
2713 // and fsgid of the current thread reading the credentials to prevent
2714 // security holes in case this process is running with elevated permissions.
2715 //--------------------------------------------------------------------------
2716 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secuid") : 0;
2717 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secgid") : 0;
2718
2719 int secuid = -1;
2720 int secgid = -1;
2721
2722 if(secuidc) secuid = atoi(secuidc);
2723 if(secgidc) secgid = atoi(secgidc);
2724
2725#ifdef __linux__
2726 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2727 if(!uidSetter.IsOk()) {
2728 log->Error( XRootDTransportMsg, "[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2729 hsData->streamName.c_str(), secuid, secgid );
2730 return XRootDStatus( stFatal, errAuthFailed, 0, "Error while setting (fsuid, fsgid)." );
2731 }
2732#else
2733 if(secuid >= 0 || secgid >= 0) {
2734 log->Error( XRootDTransportMsg, "[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2735 hsData->streamName.c_str() );
2736 return XRootDStatus( stFatal, errAuthFailed, 0, "xrdcl.secuid and xrdcl.secgid"
2737 " only supported on Linux" );
2738 }
2739#endif
2740
2741 //--------------------------------------------------------------------------
2742 // Loop over the possible protocols to find one that gives us valid
2743 // credentials
2744 //--------------------------------------------------------------------------
2745 XrdNetAddr &srvAddrInfo = *const_cast<XrdNetAddr *>(hsData->serverAddr);
2746 srvAddrInfo.SetTLS( info->encrypted );
2747 while(1)
2748 {
2749 //------------------------------------------------------------------------
2750 // Get the protocol
2751 //------------------------------------------------------------------------
2752 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2753 srvAddrInfo,
2754 *info->authParams,
2755 &ei );
2756 if( !info->authProtocol )
2757 {
2758 log->Error( XRootDTransportMsg, "[%s] No protocols left to try",
2759 hsData->streamName.c_str() );
2760 return XRootDStatus( stFatal, errAuthFailed, 0, "No protocols left to try" );
2761 }
2762
2763 std::string protocolName = info->authProtocol->Entity.prot;
2764 log->Debug( XRootDTransportMsg, "[%s] Trying to authenticate using %s",
2765 hsData->streamName.c_str(), protocolName.c_str() );
2766
2767 //------------------------------------------------------------------------
2768 // Get the credentials from the current protocol
2769 //------------------------------------------------------------------------
2770 credentials = info->authProtocol->getCredentials( 0, &ei );
2771 if( !credentials )
2772 {
2773 log->Debug( XRootDTransportMsg,
2774 "[%s] Cannot get credentials for protocol %s: %s",
2775 hsData->streamName.c_str(), protocolName.c_str(),
2776 ei.getErrText() );
2777 info->authProtocol->Delete();
2778 continue;
2779 }
2780 return XRootDStatus( stOK, suContinue );
2781 }
2782 }
2783
2784 //------------------------------------------------------------------------
2785 // Clean up the data structures created for the authentication process
2786 //------------------------------------------------------------------------
2787 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2788 {
2789 if( info->authProtocol )
2790 info->authProtocol->Delete();
2791 delete info->authParams;
2792 delete info->authEnv;
2793 info->authProtocol = 0;
2794 info->authParams = 0;
2795 info->authEnv = 0;
2797 return Status();
2798 }
2799
2800 //------------------------------------------------------------------------
2801 // Clean up the data structures created for the protection purposes
2802 //------------------------------------------------------------------------
2803 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2804 {
2805 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
2806 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
2807
2808 if( info->protection )
2809 {
2810 info->protection->Delete();
2811 info->protection = 0;
2812
2813 CleanUpAuthentication( info );
2814 }
2815
2816 if( info->protRespBody )
2817 {
2818 delete info->protRespBody;
2819 info->protRespBody = 0;
2820 info->protRespSize = 0;
2821 }
2822
2823 return Status();
2824 }
2825
2826 //----------------------------------------------------------------------------
2827 // Get the authentication function handle
2828 //----------------------------------------------------------------------------
2829 XrdSecGetProt_t XRootDTransport::GetAuthHandler()
2830 {
2831 Log *log = DefaultEnv::GetLog();
2832 char errorBuff[1024];
2833
2834 // the static constructor is invoked only once and it is guaranteed that this
2835 // is thread safe
2836 static std::atomic<XrdSecGetProt_t> authHandler( XrdSecLoadSecFactory( errorBuff, 1024 ) );
2837 auto ret = authHandler.load( std::memory_order_relaxed );
2838 if( ret ) return ret;
2839
2840 // if we are here it means we failed to load the security library for the
2841 // first time and we hope the environment changed
2842
2843 // obtain a lock
2844 static XrdSysMutex mtx;
2845 XrdSysMutexHelper lck( mtx );
2846 // check if in the meanwhile some else didn't load the library
2847 ret = authHandler.load( std::memory_order_relaxed );
2848 if( ret ) return ret;
2849
2850 // load the library
2851 ret = XrdSecLoadSecFactory( errorBuff, 1024 );
2852 authHandler.store( ret, std::memory_order_relaxed );
2853 // if we failed report an error
2854 if( !ret )
2855 {
2856 log->Error( XRootDTransportMsg,
2857 "Unable to get the security framework: %s", errorBuff );
2858 return 0;
2859 }
2860 return ret;
2861 }
2862
2863 //----------------------------------------------------------------------------
2864 // Generate the end session message
2865 //----------------------------------------------------------------------------
2866 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2867 XRootDChannelInfo *info )
2868 {
2869 Log *log = DefaultEnv::GetLog();
2870
2871 //--------------------------------------------------------------------------
2872 // Generate the message
2873 //--------------------------------------------------------------------------
2874 Message *msg = new Message( sizeof(ClientEndsessRequest) );
2875 ClientEndsessRequest *endsessReq = (ClientEndsessRequest *)msg->GetBuffer();
2876
2877 endsessReq->requestid = kXR_endsess;
2878 memcpy( endsessReq->sessid, info->oldSessionId, 16 );
2879 std::string sessId = Utils::Char2Hex( endsessReq->sessid, 16 );
2880
2881 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_endsess for session:"
2882 " %s", hsData->streamName.c_str(), sessId.c_str() );
2883
2884 MarshallRequest( msg );
2885
2886 Message *sign = 0;
2887 GetSignature( msg, sign, info );
2888 if( sign )
2889 {
2890 //------------------------------------------------------------------------
2891 // Now place both the signature and the request in a single buffer
2892 //------------------------------------------------------------------------
2893 uint32_t size = sign->GetSize();
2894 sign->ReAllocate( size + msg->GetSize() );
2895 char* buffer = sign->GetBuffer( size );
2896 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2897 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2898 }
2899
2900 return msg;
2901 }
2902
2903 //----------------------------------------------------------------------------
2904 // Process the protocol response
2905 //----------------------------------------------------------------------------
2906 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2907 XRootDChannelInfo *info )
2908 {
2909 Log *log = DefaultEnv::GetLog();
2910
2911 Status st = UnMarshallBody( hsData->in, kXR_endsess );
2912 if( !st.IsOK() )
2913 return st;
2914
2915 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2916
2917 // If we're good, we're good!
2918 if( rsp->hdr.status == kXR_ok )
2919 return Status();
2920
2921 // we ignore not found errors as such an error means the connection
2922 // has been already terminated
2923 if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound )
2924 return Status();
2925
2926 // other errors
2927 if( rsp->hdr.status == kXR_error )
2928 {
2929 std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 );
2930 log->Error( XRootDTransportMsg, "[%s] Got error response to "
2931 "kXR_endsess: %s", hsData->streamName.c_str(),
2932 errorMsg.c_str() );
2933 return Status( stFatal, errHandShakeFailed );
2934 }
2935
2936 // Wait Response.
2937 if( rsp->hdr.status == kXR_wait )
2938 {
2939 std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 );
2940 log->Info( XRootDTransportMsg, "[%s] Got wait response to "
2941 "kXR_endsess: %s", hsData->streamName.c_str(),
2942 msg.c_str() );
2943 hsData->out = GenerateEndSession( hsData, info );
2944 return Status( stOK, suRetry );
2945 }
2946
2947 // Any other response is protocol violation
2948 return Status( stError, errDataError );
2949 }
2950
2951 //----------------------------------------------------------------------------
2952 // Get a string representation of the server flags
2953 //----------------------------------------------------------------------------
2954 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2955 {
2956 std::string repr = "type: ";
2957 if( flags & kXR_isManager )
2958 repr += "manager ";
2959
2960 else if( flags & kXR_isServer )
2961 repr += "server ";
2962
2963 repr += "[";
2964
2965 if( flags & kXR_attrMeta )
2966 repr += "meta ";
2967
2968 else if( flags & kXR_attrCache )
2969 repr += "cache ";
2970
2971 else if( flags & kXR_attrProxy )
2972 repr += "proxy ";
2973
2974 else if( flags & kXR_attrSuper )
2975 repr += "super ";
2976
2977 else
2978 repr += " ";
2979
2980 repr.erase( repr.length()-1, 1 );
2981
2982 repr += "]";
2983 return repr;
2984 }
2985}
2986
2987namespace
2988{
2989 // Extract file name from a request
2990 //----------------------------------------------------------------------------
2991 char *GetDataAsString( char *msg )
2992 {
2994 char *fn = new char[req->dlen+1];
2995 memcpy( fn, msg + 24, req->dlen );
2996 fn[req->dlen] = 0;
2997 return fn;
2998 }
2999}
3000
3001namespace XrdCl
3002{
3003 //----------------------------------------------------------------------------
3004 // Get the description of a message
3005 //----------------------------------------------------------------------------
3006 void XRootDTransport::GenerateDescription( char *msg, std::ostringstream &o )
3007 {
3008 Log *log = DefaultEnv::GetLog();
3009 if( log->GetLevel() < Log::ErrorMsg )
3010 return;
3011
3012 ClientRequestHdr *req = (ClientRequestHdr *)msg;
3013 switch( req->requestid )
3014 {
3015 //------------------------------------------------------------------------
3016 // kXR_open
3017 //------------------------------------------------------------------------
3018 case kXR_open:
3019 {
3020 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
3021 o << "kXR_open (";
3022 char *fn = GetDataAsString( msg );
3023 o << "file: " << fn << ", ";
3024 delete [] fn;
3025 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3026 o << std::setbase(10);
3027 o << "flags: ";
3028 if( sreq->options == 0 )
3029 o << "none ";
3030 else
3031 {
3032 if( sreq->options & kXR_compress )
3033 o << "kXR_compress ";
3034 if( sreq->options & kXR_delete )
3035 o << "kXR_delete ";
3036 if( sreq->options & kXR_force )
3037 o << "kXR_force ";
3038 if( sreq->options & kXR_mkpath )
3039 o << "kXR_mkpath ";
3040 if( sreq->options & kXR_new )
3041 o << "kXR_new ";
3042 if( sreq->options & kXR_nowait )
3043 o << "kXR_nowait ";
3044 if( sreq->options & kXR_open_apnd )
3045 o << "kXR_open_apnd ";
3046 if( sreq->options & kXR_open_read )
3047 o << "kXR_open_read ";
3048 if( sreq->options & kXR_open_updt )
3049 o << "kXR_open_updt ";
3050 if( sreq->options & kXR_open_wrto )
3051 o << "kXR_open_wrto ";
3052 if( sreq->options & kXR_posc )
3053 o << "kXR_posc ";
3054 if( sreq->options & kXR_prefname )
3055 o << "kXR_prefname ";
3056 if( sreq->options & kXR_refresh )
3057 o << "kXR_refresh ";
3058 if( sreq->options & kXR_4dirlist )
3059 o << "kXR_4dirlist ";
3060 if( sreq->options & kXR_replica )
3061 o << "kXR_replica ";
3062 if( sreq->options & kXR_seqio )
3063 o << "kXR_seqio ";
3064 if( sreq->options & kXR_async )
3065 o << "kXR_async ";
3066 if( sreq->options & kXR_retstat )
3067 o << "kXR_retstat ";
3068 }
3069 o << "flagt: ";
3070 if( sreq->optiont == 0 )
3071 o << "none ";
3072 else
3073 {
3074 if( sreq->optiont & kXR_dup )
3075 o << "kXR_dup ";
3076 if( sreq->options & kXR_samefs )
3077 o << "kXR_samefs ";
3078 }
3079 o << "fhtemplt: " << FileHandleToStr( sreq->fhtemplt );
3080 o << ")";
3081 break;
3082 }
3083
3084 //------------------------------------------------------------------------
3085 // kXR_clone
3086 //------------------------------------------------------------------------
3087 case kXR_clone:
3088 {
3090 XrdProto::clone_list *dataChunk = (XrdProto::clone_list*)(msg + 24 );
3091 o << "kXR_clone ( ";
3092 o << "handle: " << FileHandleToStr( sreq->fhandle );
3093 o << std::setbase(10);
3094 o << " list [ ";
3095 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::clone_list); ++i )
3096 {
3097 o << "(src_handle: ";
3098 o << FileHandleToStr( dataChunk[i].srcFH );
3099 o << ", ";
3100 o << std::setbase(10);
3101 o << "src_offset: " << dataChunk[i].srcOffs;
3102 o << ", src_length: " << dataChunk[i].srcLen;
3103 o << ", dst_offset: " << dataChunk[i].dstOffs << "); ";
3104 }
3105
3106 o << " ] )";
3107 break;
3108 }
3109
3110 //------------------------------------------------------------------------
3111 // kXR_close
3112 //------------------------------------------------------------------------
3113 case kXR_close:
3114 {
3116 o << "kXR_close (";
3117 o << "handle: " << FileHandleToStr( sreq->fhandle );
3118 o << ")";
3119 break;
3120 }
3121
3122 //------------------------------------------------------------------------
3123 // kXR_stat
3124 //------------------------------------------------------------------------
3125 case kXR_stat:
3126 {
3127 ClientStatRequest *sreq = (ClientStatRequest *)msg;
3128 o << "kXR_stat (";
3129 if( sreq->dlen )
3130 {
3131 char *fn = GetDataAsString( msg );;
3132 o << "path: " << fn << ", ";
3133 delete [] fn;
3134 }
3135 else
3136 {
3137 o << "handle: " << FileHandleToStr( sreq->fhandle );
3138 o << ", ";
3139 }
3140 o << "flags: ";
3141 if( sreq->options == 0 )
3142 o << "none";
3143 else
3144 {
3145 if( sreq->options & kXR_vfs )
3146 o << "kXR_vfs";
3147 }
3148 o << ")";
3149 break;
3150 }
3151
3152 //------------------------------------------------------------------------
3153 // kXR_read
3154 //------------------------------------------------------------------------
3155 case kXR_read:
3156 {
3157 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3158 o << "kXR_read (";
3159 o << "handle: " << FileHandleToStr( sreq->fhandle );
3160 o << std::setbase(10);
3161 o << ", ";
3162 o << "offset: " << sreq->offset << ", ";
3163 o << "size: " << sreq->rlen << ")";
3164 break;
3165 }
3166
3167 //------------------------------------------------------------------------
3168 // kXR_pgread
3169 //------------------------------------------------------------------------
3170 case kXR_pgread:
3171 {
3173 o << "kXR_pgread (";
3174 o << "handle: " << FileHandleToStr( sreq->fhandle );
3175 o << std::setbase(10);
3176 o << ", ";
3177 o << "offset: " << sreq->offset << ", ";
3178 o << "size: " << sreq->rlen << ")";
3179 break;
3180 }
3181
3182 //------------------------------------------------------------------------
3183 // kXR_write
3184 //------------------------------------------------------------------------
3185 case kXR_write:
3186 {
3188 o << "kXR_write (";
3189 o << "handle: " << FileHandleToStr( sreq->fhandle );
3190 o << std::setbase(10);
3191 o << ", ";
3192 o << "offset: " << sreq->offset << ", ";
3193 o << "size: " << sreq->dlen << ")";
3194 break;
3195 }
3196
3197 //------------------------------------------------------------------------
3198 // kXR_pgwrite
3199 //------------------------------------------------------------------------
3200 case kXR_pgwrite:
3201 {
3203 o << "kXR_pgwrite (";
3204 o << "handle: " << FileHandleToStr( sreq->fhandle );
3205 o << std::setbase(10);
3206 o << ", ";
3207 o << "offset: " << sreq->offset << ", ";
3208 o << "size: " << sreq->dlen << ")";
3209 break;
3210 }
3211
3212 //------------------------------------------------------------------------
3213 // kXR_fattr
3214 //------------------------------------------------------------------------
3215 case kXR_fattr:
3216 {
3218 int nattr = sreq->numattr;
3219 int options = sreq->options;
3220 o << "kXR_fattr";
3221 switch (sreq->subcode) {
3222 case kXR_fattrGet:
3223 o << "Get";
3224 break;
3225 case kXR_fattrSet:
3226 o << "Set";
3227 break;
3228 case kXR_fattrList:
3229 o << "List";
3230 break;
3231 case kXR_fattrDel:
3232 o << "Delete";
3233 break;
3234 default:
3235 o << " unknown subcode: " << sreq->subcode;
3236 break;
3237 }
3238 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3239 o << std::setbase(10);
3240 if (nattr)
3241 o << ", numattr: " << nattr;
3242 if (options) {
3243 o << ", options: ";
3244 if (options & 0x01)
3245 o << "new";
3246 if (options & 0x10)
3247 o << "list values";
3248 }
3249 o << ", total size: " << req->dlen << ")";
3250 break;
3251 }
3252
3253 //------------------------------------------------------------------------
3254 // kXR_sync
3255 //------------------------------------------------------------------------
3256 case kXR_sync:
3257 {
3258 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3259 o << "kXR_sync (";
3260 o << "handle: " << FileHandleToStr( sreq->fhandle );
3261 o << ")";
3262 break;
3263 }
3264
3265 //------------------------------------------------------------------------
3266 // kXR_truncate
3267 //------------------------------------------------------------------------
3268 case kXR_truncate:
3269 {
3271 o << "kXR_truncate (";
3272 if( !sreq->dlen )
3273 o << "handle: " << FileHandleToStr( sreq->fhandle );
3274 else
3275 {
3276 char *fn = GetDataAsString( msg );
3277 o << "file: " << fn;
3278 delete [] fn;
3279 }
3280 o << std::setbase(10);
3281 o << ", ";
3282 o << "offset: " << sreq->offset;
3283 o << ")";
3284 break;
3285 }
3286
3287 //------------------------------------------------------------------------
3288 // kXR_readv
3289 //------------------------------------------------------------------------
3290 case kXR_readv:
3291 {
3292 unsigned char *fhandle = 0;
3293 o << "kXR_readv (";
3294
3295 o << "handle: ";
3296 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3297 fhandle = dataChunk[0].fhandle;
3298 if( fhandle )
3299 o << FileHandleToStr( fhandle );
3300 else
3301 o << "unknown";
3302 o << ", ";
3303 o << std::setbase(10);
3304 o << "chunks: [";
3305 uint64_t size = 0;
3306 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3307 {
3308 size += dataChunk[i].rlen;
3309 o << "(offset: " << dataChunk[i].offset;
3310 o << ", size: " << dataChunk[i].rlen << "); ";
3311 }
3312 o << "], ";
3313 o << "total size: " << size << ")";
3314 break;
3315 }
3316
3317 //------------------------------------------------------------------------
3318 // kXR_writev
3319 //------------------------------------------------------------------------
3320 case kXR_writev:
3321 {
3322 unsigned char *fhandle = 0;
3323 o << "kXR_writev (";
3324
3325 XrdProto::write_list *wrtList =
3326 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3327 uint64_t size = 0;
3328 uint32_t numChunks = 0;
3329 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3330 {
3331 fhandle = wrtList[i].fhandle;
3332 size += wrtList[i].wlen;
3333 ++numChunks;
3334 }
3335 o << "handle: ";
3336 if( fhandle )
3337 o << FileHandleToStr( fhandle );
3338 else
3339 o << "unknown";
3340 o << ", ";
3341 o << std::setbase(10);
3342 o << "chunks: " << numChunks << ", ";
3343 o << "total size: " << size << ")";
3344 break;
3345 }
3346
3347 //------------------------------------------------------------------------
3348 // kXR_locate
3349 //------------------------------------------------------------------------
3350 case kXR_locate:
3351 {
3353 char *fn = GetDataAsString( msg );;
3354 o << "kXR_locate (";
3355 o << "path: " << fn << ", ";
3356 delete [] fn;
3357 o << "flags: ";
3358 if( sreq->options == 0 )
3359 o << "none";
3360 else
3361 {
3362 if( sreq->options & kXR_refresh )
3363 o << "kXR_refresh ";
3364 if( sreq->options & kXR_prefname )
3365 o << "kXR_prefname ";
3366 if( sreq->options & kXR_nowait )
3367 o << "kXR_nowait ";
3368 if( sreq->options & kXR_force )
3369 o << "kXR_force ";
3370 if( sreq->options & kXR_compress )
3371 o << "kXR_compress ";
3372 }
3373 o << ")";
3374 break;
3375 }
3376
3377 //------------------------------------------------------------------------
3378 // kXR_mv
3379 //------------------------------------------------------------------------
3380 case kXR_mv:
3381 {
3382 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3383 o << "kXR_mv (";
3384 o << "source: ";
3385 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3386 o << ", ";
3387 o << "destination: ";
3388 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3389 o << ")";
3390 break;
3391 }
3392
3393 //------------------------------------------------------------------------
3394 // kXR_query
3395 //------------------------------------------------------------------------
3396 case kXR_query:
3397 {
3399 o << "kXR_query (";
3400 o << "code: ";
3401 switch( sreq->infotype )
3402 {
3403 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3404 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3405 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3406 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3407 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3408 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3409 case kXR_QPrep: o << "kXR_QPrep"; break;
3410 case kXR_Qspace: o << "kXR_Qspace"; break;
3411 case kXR_QStats: o << "kXR_QStats"; break;
3412 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3413 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3414 default: o << sreq->infotype; break;
3415 }
3416 o << ", ";
3417
3418 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3419 {
3420 o << "handle: " << FileHandleToStr( sreq->fhandle );
3421 o << ", ";
3422 }
3423
3424 o << "arg length: " << sreq->dlen << ")";
3425 break;
3426 }
3427
3428 //------------------------------------------------------------------------
3429 // kXR_rm
3430 //------------------------------------------------------------------------
3431 case kXR_rm:
3432 {
3433 o << "kXR_rm (";
3434 char *fn = GetDataAsString( msg );;
3435 o << "path: " << fn << ")";
3436 delete [] fn;
3437 break;
3438 }
3439
3440 //------------------------------------------------------------------------
3441 // kXR_mkdir
3442 //------------------------------------------------------------------------
3443 case kXR_mkdir:
3444 {
3446 o << "kXR_mkdir (";
3447 char *fn = GetDataAsString( msg );
3448 o << "path: " << fn << ", ";
3449 delete [] fn;
3450 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3451 o << std::setbase(10);
3452 o << "flags: ";
3453 if( sreq->options[0] == 0 )
3454 o << "none";
3455 else
3456 {
3457 if( sreq->options[0] & kXR_mkdirpath )
3458 o << "kXR_mkdirpath";
3459 }
3460 o << ")";
3461 break;
3462 }
3463
3464 //------------------------------------------------------------------------
3465 // kXR_rmdir
3466 //------------------------------------------------------------------------
3467 case kXR_rmdir:
3468 {
3469 o << "kXR_rmdir (";
3470 char *fn = GetDataAsString( msg );
3471 o << "path: " << fn << ")";
3472 delete [] fn;
3473 break;
3474 }
3475
3476 //------------------------------------------------------------------------
3477 // kXR_chmod
3478 //------------------------------------------------------------------------
3479 case kXR_chmod:
3480 {
3482 o << "kXR_chmod (";
3483 char *fn = GetDataAsString( msg );
3484 o << "path: " << fn << ", ";
3485 delete [] fn;
3486 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3487 break;
3488 }
3489
3490 //------------------------------------------------------------------------
3491 // kXR_ping
3492 //------------------------------------------------------------------------
3493 case kXR_ping:
3494 {
3495 o << "kXR_ping ()";
3496 break;
3497 }
3498
3499 //------------------------------------------------------------------------
3500 // kXR_protocol
3501 //------------------------------------------------------------------------
3502 case kXR_protocol:
3503 {
3505 o << "kXR_protocol (";
3506 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3507 break;
3508 }
3509
3510 //------------------------------------------------------------------------
3511 // kXR_dirlist
3512 //------------------------------------------------------------------------
3513 case kXR_dirlist:
3514 {
3515 o << "kXR_dirlist (";
3516 char *fn = GetDataAsString( msg );;
3517 o << "path: " << fn << ")";
3518 delete [] fn;
3519 break;
3520 }
3521
3522 //------------------------------------------------------------------------
3523 // kXR_set
3524 //------------------------------------------------------------------------
3525 case kXR_set:
3526 {
3527 o << "kXR_set (";
3528 char *fn = GetDataAsString( msg );;
3529 o << "data: " << fn << ")";
3530 delete [] fn;
3531 break;
3532 }
3533
3534 //------------------------------------------------------------------------
3535 // kXR_prepare
3536 //------------------------------------------------------------------------
3537 case kXR_prepare:
3538 {
3540 o << "kXR_prepare (";
3541 o << "flags: ";
3542
3543 if( sreq->options == 0 )
3544 o << "none";
3545 else
3546 {
3547 if( sreq->options & kXR_stage )
3548 o << "kXR_stage ";
3549 if( sreq->options & kXR_wmode )
3550 o << "kXR_wmode ";
3551 if( sreq->options & kXR_coloc )
3552 o << "kXR_coloc ";
3553 if( sreq->options & kXR_fresh )
3554 o << "kXR_fresh ";
3555 }
3556
3557 o << ", priority: " << (int) sreq->prty << ", ";
3558
3559 char *fn = GetDataAsString( msg );
3560 char *cursor;
3561 for( cursor = fn; *cursor; ++cursor )
3562 if( *cursor == '\n' ) *cursor = ' ';
3563
3564 o << "paths: " << fn << ")";
3565 delete [] fn;
3566 break;
3567 }
3568
3569 case kXR_chkpoint:
3570 {
3572 o << "kXR_chkpoint (";
3573 o << "opcode: ";
3574 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3575 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3576 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3577 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3578 else if( sreq->opcode == kXR_ckpXeq )
3579 {
3580 o << "kXR_ckpXeq) ";
3581 // In this case our request body will be one of kXR_pgwrite,
3582 // kXR_truncate, kXR_write, or kXR_writev request.
3583 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3584 }
3585
3586 break;
3587 }
3588
3589 //------------------------------------------------------------------------
3590 // Default
3591 //------------------------------------------------------------------------
3592 default:
3593 {
3594 o << "kXR_unknown (length: " << req->dlen << ")";
3595 break;
3596 }
3597 };
3598 }
3599
3600 //----------------------------------------------------------------------------
3601 // Get a string representation of file handle
3602 //----------------------------------------------------------------------------
3603 std::string XRootDTransport::FileHandleToStr( const unsigned char handle[4] )
3604 {
3605 std::ostringstream o;
3606 o << "0x";
3607 for( uint8_t i = 0; i < 4; ++i )
3608 {
3609 o << std::setbase(16) << std::setfill('0') << std::setw(2);
3610 o << (int)handle[i];
3611 }
3612 return o.str();
3613 }
3614}
static const int kXR_ckpRollback
Definition XProtocol.hh:217
@ kXR_NotFound
kXR_int16 arg1len
Definition XProtocol.hh:460
#define kXR_isManager
struct ClientTruncateRequest truncate
Definition XProtocol.hh:917
@ kXR_ecredir
Definition XProtocol.hh:401
#define kXR_tlsLogin
@ kXR_fattrDel
Definition XProtocol.hh:300
@ kXR_fattrSet
Definition XProtocol.hh:303
@ kXR_fattrList
Definition XProtocol.hh:302
@ kXR_fattrGet
Definition XProtocol.hh:301
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:565
kXR_unt16 requestid
Definition XProtocol.hh:424
kXR_char fhandle[4]
Definition XProtocol.hh:823
#define kXR_gotoTLS
#define kXR_attrMeta
struct ClientPgReadRequest pgread
Definition XProtocol.hh:903
kXR_char fhandle[4]
Definition XProtocol.hh:848
#define kXR_haveTLS
kXR_char streamid[2]
Definition XProtocol.hh:158
kXR_char fhandle[4]
Definition XProtocol.hh:812
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:900
kXR_int32 dlen
Definition XProtocol.hh:461
struct ClientAuthRequest auth
Definition XProtocol.hh:888
kXR_char streamid[2]
Definition XProtocol.hh:956
kXR_char fhtemplt[4]
Definition XProtocol.hh:516
kXR_unt16 options
Definition XProtocol.hh:513
static const int kXR_ckpXeq
Definition XProtocol.hh:218
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:904
#define kXR_attrSuper
struct ClientReadVRequest readv
Definition XProtocol.hh:910
kXR_char pathid
Definition XProtocol.hh:689
kXR_char credtype[4]
Definition XProtocol.hh:172
kXR_char username[8]
Definition XProtocol.hh:426
@ kXR_open_wrto
Definition XProtocol.hh:499
@ kXR_compress
Definition XProtocol.hh:482
@ kXR_async
Definition XProtocol.hh:488
@ kXR_delete
Definition XProtocol.hh:483
@ kXR_prefname
Definition XProtocol.hh:491
@ kXR_nowait
Definition XProtocol.hh:497
@ kXR_open_read
Definition XProtocol.hh:486
@ kXR_open_updt
Definition XProtocol.hh:487
@ kXR_mkpath
Definition XProtocol.hh:490
@ kXR_seqio
Definition XProtocol.hh:498
@ kXR_replica
Definition XProtocol.hh:495
@ kXR_posc
Definition XProtocol.hh:496
@ kXR_refresh
Definition XProtocol.hh:489
@ kXR_new
Definition XProtocol.hh:485
@ kXR_force
Definition XProtocol.hh:484
@ kXR_4dirlist
Definition XProtocol.hh:494
@ kXR_open_apnd
Definition XProtocol.hh:492
@ kXR_retstat
Definition XProtocol.hh:493
struct ClientOpenRequest open
Definition XProtocol.hh:902
@ kXR_waitresp
Definition XProtocol.hh:948
@ kXR_redirect
Definition XProtocol.hh:946
@ kXR_status
Definition XProtocol.hh:949
@ kXR_ok
Definition XProtocol.hh:941
@ kXR_authmore
Definition XProtocol.hh:944
@ kXR_attn
Definition XProtocol.hh:943
@ kXR_wait
Definition XProtocol.hh:947
@ kXR_error
Definition XProtocol.hh:945
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:887
kXR_char fhandle[4]
Definition XProtocol.hh:543
kXR_unt16 optiont
Definition XProtocol.hh:514
kXR_char fhandle[4]
Definition XProtocol.hh:681
kXR_char fhandle[4]
Definition XProtocol.hh:695
struct ClientWriteVRequest writev
Definition XProtocol.hh:919
kXR_char fhandle[4]
Definition XProtocol.hh:258
struct ClientLoginRequest login
Definition XProtocol.hh:899
kXR_unt16 requestid
Definition XProtocol.hh:159
kXR_char fhandle[4]
Definition XProtocol.hh:669
kXR_char sessid[16]
Definition XProtocol.hh:183
@ kXR_read
Definition XProtocol.hh:126
@ kXR_open
Definition XProtocol.hh:123
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_clone
Definition XProtocol.hh:145
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_mkdir
Definition XProtocol.hh:121
@ kXR_sync
Definition XProtocol.hh:129
@ kXR_chmod
Definition XProtocol.hh:115
@ kXR_bind
Definition XProtocol.hh:137
@ kXR_dirlist
Definition XProtocol.hh:117
@ kXR_fattr
Definition XProtocol.hh:133
@ kXR_rm
Definition XProtocol.hh:127
@ kXR_query
Definition XProtocol.hh:114
@ kXR_write
Definition XProtocol.hh:132
@ kXR_login
Definition XProtocol.hh:120
@ kXR_auth
Definition XProtocol.hh:113
@ kXR_endsess
Definition XProtocol.hh:136
@ kXR_set
Definition XProtocol.hh:131
@ kXR_rmdir
Definition XProtocol.hh:128
@ kXR_1stRequest
Definition XProtocol.hh:112
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_protocol
Definition XProtocol.hh:119
@ kXR_mv
Definition XProtocol.hh:122
@ kXR_ping
Definition XProtocol.hh:124
@ kXR_stat
Definition XProtocol.hh:130
@ kXR_pgread
Definition XProtocol.hh:143
@ kXR_chkpoint
Definition XProtocol.hh:125
@ kXR_locate
Definition XProtocol.hh:140
@ kXR_close
Definition XProtocol.hh:116
@ kXR_pgwrite
Definition XProtocol.hh:139
@ kXR_prepare
Definition XProtocol.hh:134
struct ClientChmodRequest chmod
Definition XProtocol.hh:891
#define kXR_isServer
#define kXR_attrCache
struct ClientQueryRequest query
Definition XProtocol.hh:908
struct ClientReadRequest read
Definition XProtocol.hh:909
struct ClientMvRequest mv
Definition XProtocol.hh:901
kXR_int32 rlen
Definition XProtocol.hh:696
kXR_unt16 requestid
Definition XProtocol.hh:182
kXR_char sessid[16]
Definition XProtocol.hh:289
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:890
struct ServerResponseHeader hdr
@ kXR_asyncap
Definition XProtocol.hh:408
#define kXR_attrProxy
kXR_char options[1]
Definition XProtocol.hh:446
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
static const int kXR_ckpCommit
Definition XProtocol.hh:215
kXR_int64 offset
Definition XProtocol.hh:697
@ kXR_vfs
Definition XProtocol.hh:799
struct ClientPrepareRequest prepare
Definition XProtocol.hh:906
@ kXR_mkdirpath
Definition XProtocol.hh:440
@ kXR_wmode
Definition XProtocol.hh:625
@ kXR_fresh
Definition XProtocol.hh:627
@ kXR_coloc
Definition XProtocol.hh:626
@ kXR_stage
Definition XProtocol.hh:624
static const int kXR_ckpQuery
Definition XProtocol.hh:216
#define kXR_tlsSess
#define kXR_DataServer
@ kXR_dup
Definition XProtocol.hh:503
@ kXR_samefs
Definition XProtocol.hh:504
struct ClientWriteRequest write
Definition XProtocol.hh:918
#define kXR_PROTTLSVERSION
Definition XProtocol.hh:72
kXR_char capver[1]
Definition XProtocol.hh:429
struct ClientProtocolRequest protocol
Definition XProtocol.hh:907
@ kXR_QPrep
Definition XProtocol.hh:650
@ kXR_Qopaqug
Definition XProtocol.hh:661
@ kXR_Qconfig
Definition XProtocol.hh:655
@ kXR_Qopaquf
Definition XProtocol.hh:660
@ kXR_Qckscan
Definition XProtocol.hh:654
@ kXR_Qxattr
Definition XProtocol.hh:652
@ kXR_Qspace
Definition XProtocol.hh:653
@ kXR_Qvisa
Definition XProtocol.hh:656
@ kXR_QStats
Definition XProtocol.hh:649
@ kXR_Qcksum
Definition XProtocol.hh:651
@ kXR_Qopaque
Definition XProtocol.hh:659
struct ClientLocateRequest locate
Definition XProtocol.hh:898
kXR_char fhandle[4]
Definition XProtocol.hh:231
@ kXR_ver005
Definition XProtocol.hh:419
#define kXR_tlsData
@ kXR_readrdok
Definition XProtocol.hh:390
@ kXR_fullurl
Definition XProtocol.hh:388
@ kXR_onlyprv4
Definition XProtocol.hh:392
@ kXR_lclfile
Definition XProtocol.hh:394
@ kXR_multipr
Definition XProtocol.hh:389
@ kXR_redirflags
Definition XProtocol.hh:395
@ kXR_hasipv64
Definition XProtocol.hh:391
@ kXR_onlyprv6
Definition XProtocol.hh:393
struct ClientCloneRequest clone
Definition XProtocol.hh:892
static const int kXR_ckpBegin
Definition XProtocol.hh:214
long long kXR_int64
Definition XPtypes.hh:98
unsigned char kXR_char
Definition XPtypes.hh:65
XrdVERSIONINFOREF(XrdCl)
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters &sectoken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecBuffer XrdSecCredentials
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
void Zero()
Zero.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Definition XrdClTls.cc:422
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool IsSecure() const
Does the protocol indicate encryption.
Definition XrdClURL.cc:482
bool IsTPC() const
Is the URL used in TPC context.
Definition XrdClURL.cc:490
std::string GetLoginToken() const
Get the login token if present in the opaque info.
Definition XrdClURL.cc:367
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
void SetTLS(bool val)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
static int TimeZone()
const uint16_t suRetry
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suDone
const uint16_t suContinue
bool InitTLS()
Definition XrdClTls.cc:96
const int DefaultTlsNoData
const int DefaultNoTlsOK
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:873
struct ServerResponseBifs_Protocol bifReqs
struct ServerResponseReqs_Protocol secReqs
kXR_char fhandle[4]
Definition XProtocol.hh:318
ServerResponseStatus status
union ServerResponse::@040373375333017131300127053271011057331004327334 body
ServerResponseHeader hdr
BindPrefSelector(std::vector< std::string > &&bindprefs)
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
std::unique_ptr< BindPrefSelector > bindSelector
std::atomic< uint32_t > finstcnt
ServerResponseBody_Protocol * protRespBody
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.