37#include <sys/random.h>
43std::chrono::steady_clock::duration CurlOperation::m_stall_interval{CurlOperation::m_default_stall_interval};
53thread_local int64_t fake_dns_counter = -1;
61thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> fake_dns_map;
64thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> reverse_fake_dns_map;
70struct refcount_entry {
72 std::unique_ptr<std::string> addr;
73 std::chrono::steady_clock::time_point last_used;
75 bool IsExpired(std::chrono::steady_clock::time_point now)
const {
76 return (now - last_used) > std::chrono::minutes(1);
80thread_local std::unordered_map<std::string *, std::unique_ptr<refcount_entry>> fake_dns_refcount;
82std::string GenerateFakeEndpoint() {
83 if (fake_dns_counter == -1) {
85 fake_dns_counter = arc4random();
88 while (fake_dns_counter < 0 || errno == EINTR) {
89 if (getrandom((
void*)&fake_dns_counter,
sizeof(fake_dns_counter), 0) ==
sizeof(fake_dns_counter)) {
95 uint64_t addr =
static_cast<uint64_t
>(fake_dns_counter);
96 uint32_t class_d = addr & 0xff;
97 uint32_t class_c = (addr >> 8) & 0xff;
98 uint32_t port = 1024 + ((addr >> 16) % (65535 - 1024));
101 return std::string(
"169.254.") + std::to_string(class_c) +
"." + std::to_string(class_d) +
":" + std::to_string(port);
104std::string *GetFakeEndpointForHost(
const std::string &host,
int port) {
105 std::string key = host +
":" + std::to_string(port);
106 auto it = fake_dns_map.find(key);
107 if (it != fake_dns_map.end()) {
108 return it->second.second;
110 auto addr = GenerateFakeEndpoint();
111 if (reverse_fake_dns_map.find(addr) != reverse_fake_dns_map.end()) {
114 auto addr_ptr_raw =
new std::string(addr);
115 std::unique_ptr<std::string> addr_ptr(addr_ptr_raw);
116 fake_dns_map[key] = {addr, addr_ptr.get()};
117 reverse_fake_dns_map[addr] = {key, addr_ptr.get()};
118 std::unique_ptr<refcount_entry> new_entry(
new refcount_entry{0, std::move(addr_ptr), std::chrono::steady_clock::now()});
119 fake_dns_refcount[addr_ptr_raw] = std::move(new_entry);
123std::pair<std::string, int> ParseHostPort(
const std::string &location) {
124 auto pos = location.find(
"://");
125 std::string authority = (pos == std::string::npos) ? location : location.substr(pos + 3);
126 std::string schema = (pos == std::string::npos) ?
"" : location.substr(0, pos);
127 int std_port = (schema ==
"https" || schema ==
"davs") ? 443 : 80;
128 auto at_pos = authority.find(
'@');
129 std::string hostport = (at_pos == std::string::npos) ? authority : authority.substr(at_pos + 1);
130 pos = hostport.find(
'/');
131 if (pos != std::string::npos) {
132 hostport = hostport.substr(0, pos);
134 pos = hostport.find(
':');
135 if (pos == std::string::npos) {
136 return {hostport, std_port};
140 port = std::stoi(hostport.substr(pos + 1));
144 return {hostport.substr(0, pos), port};
147std::string DavToHttp(
const std::string &url) {
148 if (url.compare(0, 6,
"dav://") == 0) {
149 return "http://" + url.substr(6);
151 if (url.compare(0, 7,
"davs://") == 0) {
152 return "https://" + url.substr(7);
160 if (timeout.tv_sec == 0 && timeout.tv_nsec == 0) {
161 return std::chrono::steady_clock::now() + std::chrono::seconds(30);
163 return std::chrono::steady_clock::now() + std::chrono::seconds(timeout.tv_sec) + std::chrono::nanoseconds(timeout.tv_nsec);
173 std::chrono::steady_clock::time_point expiry,
XrdCl::Log *logger,
177 m_last_reset(std::chrono::steady_clock::now()),
178 m_last_header_reset(m_last_reset),
179 m_start_op(m_last_reset),
180 m_header_start(m_last_reset),
181 m_conn_callout(callout),
182 m_url(DavToHttp(url)),
184 m_curl(nullptr, &curl_easy_cleanup),
203 handle->HandleResponse(status,
nullptr);
208 m_callback_error_code = ecode;
209 m_callback_error_str =
emsg;
219 m_header_slist.reset();
221 m_header_slist.reset(curl_slist_append(m_header_slist.release(),
222 (header.first +
": " + header.second).c_str()));
224 return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
229 if (!extra_headers) {
233 m_header_slist.reset();
234 for (
const auto &header : *extra_headers) {
235 if (!strcasecmp(header.first.c_str(),
"Content-Length")) {
236 auto upload_size = std::stoull(header.second);
237 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, upload_size);
240 m_header_slist.reset(curl_slist_append(m_header_slist.release(),
241 (header.first +
": " + header.second).c_str()));
243 return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
273CurlOperation::HeaderCallback(
char *buffer,
size_t size,
size_t nitems,
void *this_ptr)
275 std::string header(buffer, size * nitems);
277 auto now = std::chrono::steady_clock::now();
278 if (!me->m_received_header) {
279 me->m_received_header =
true;
280 me->m_header_start = now;
282 me->m_header_lastop = now;
283 auto rv = me->Header(header);
284 return rv ? (size * nitems) : 0;
288CurlOperation::Header(
const std::string &header)
296 if (!m_response_info) {
297 m_response_info.reset(
new ResponseInfo());
299 m_response_info->AddResponse(
m_headers.MoveHeaders());
308 m_conn_callout_result = -1;
309 m_conn_callout_listener = -1;
310 m_tried_broker =
false;
313 if (location.empty()) {
318 if (location.size() && location[0] ==
'/') {
319 std::string_view orig_url(
m_url);
320 auto scheme_loc = orig_url.find(
"://");
321 if (scheme_loc == std::string_view::npos) {
325 auto path_loc = orig_url.find(
'/', scheme_loc + 3);
326 if (path_loc == std::string_view::npos) {
327 location =
m_url + location;
329 location = std::string(orig_url.substr(0, path_loc)) + location;
334 curl_easy_setopt(
m_curl.get(), CURLOPT_URL, location.c_str());
337 if (env->GetInt(
"HttpDisableX509", disable_x509) && !disable_x509) {
338 std::string cert, key;
339 env->GetString(
"HttpClientCertFile", cert);
340 env->GetString(
"HttpClientKeyFile", key);
342 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
344 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLKEY, key.c_str());
348 if (m_conn_callout) {
349 auto conn_callout = m_conn_callout(location, *m_response_info);
350 if (conn_callout !=
nullptr) {
352 auto [host, port] = ParseHostPort(location);
353 if (host.empty() || port == -1) {
357 auto fake_addr = GetFakeEndpointForHost(host, port);
358 if (!fake_addr || fake_addr->empty()) {
362 m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
363 (host +
":" + std::to_string(port) +
":" + *fake_addr).c_str()));
364 m_logger->Debug(
kLogXrdClHttp,
"For connection callout in redirect, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
366 m_callout.reset(conn_callout);
369 if ((m_conn_callout_listener = m_callout->BeginCallout(err,
m_header_expiry)) == -1) {
370 auto errMsg =
"Failed to start a connection callout request: " + err;
374 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
375 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
376 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETDATA,
this);
377 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
378 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
379 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTDATA,
this);
380 curl_easy_setopt(
m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
383 m_received_header =
false;
385 m_last_header_reset = m_last_reset = m_header_start = m_start_op = m_header_lastop = std::chrono::steady_clock::now();
392NullCallback(
char * ,
size_t size,
size_t nitems,
void * )
394 return size * nitems;
401 m_is_paused = paused;
403 m_pause_start = std::chrono::steady_clock::now();
404 }
else if (m_pause_start != std::chrono::steady_clock::time_point{}) {
405 m_pause_duration += std::chrono::steady_clock::now() - m_pause_start;
406 m_pause_start = std::chrono::steady_clock::time_point{};
413 if ((m_conn_callout_listener = m_callout->BeginCallout(err,
m_header_expiry)) == -1) {
414 err =
"Failed to start a callout for a socket connection: " + err;
421std::tuple<uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration>
423 auto now = std::chrono::steady_clock::now();
424 std::chrono::steady_clock::duration pre_header{}, post_header{}, pause_duration{};
425 if (m_received_header) {
426 if (m_last_header_reset < m_header_start) {
427 pre_header = m_header_start - m_last_header_reset;
428 m_last_header_reset = m_header_start;
430 post_header = now - ((m_last_reset < m_header_start) ? m_header_start : m_last_reset);
433 pre_header = now - m_last_header_reset;
434 m_last_header_reset = now;
437 m_pause_duration += now - m_pause_start;
440 if (m_pause_duration != std::chrono::steady_clock::duration::zero()) {
441 pause_duration = m_pause_duration;
442 m_pause_duration = std::chrono::steady_clock::duration::zero();
444 auto bytes = m_bytes;
446 return {bytes, pre_header, post_header, pause_duration};
451 if (m_received_header)
return false;
463 !m_received_header) {
478 if (m_last_xfer == std::chrono::steady_clock::time_point()) {
479 m_last_xfer = m_header_lastop;
481 auto elapsed = now - m_last_xfer;
482 uint64_t xfer_diff = 0;
483 if (xfer > m_last_xfer_count) {
484 xfer_diff = xfer - m_last_xfer_count;
485 m_last_xfer_count = xfer;
490 if (elapsed > m_stall_interval && xfer_diff == 0) {
496 if (xfer_diff == 0) {
504 auto elapsed_since_last_headerop = now - m_header_lastop;
505 if (elapsed_since_last_headerop < m_stall_interval) {
507 }
else if (m_ema_rate < 0) {
508 m_ema_rate = xfer / std::chrono::duration<double>(elapsed_since_last_headerop).count();
511 double elapsed_seconds = std::chrono::duration<double>(elapsed).count();
512 auto recent_rate =
static_cast<double>(xfer_diff) / elapsed_seconds;
513 auto alpha = 1.0 - exp(-elapsed_seconds / std::chrono::duration<double>(m_stall_interval).count());
514 m_ema_rate = (1.0 - alpha) * m_ema_rate + alpha * recent_rate;
525 if (curl ==
nullptr) {
526 throw std::runtime_error(
"Unable to setup curl operation with no handle");
529 if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
530 throw std::runtime_error(
"Unable to get current time");
534 m_last_header_reset = m_last_reset = m_start_op = m_header_start = m_header_lastop = std::chrono::steady_clock::now();
537 m_curl_error_buffer[0] =
'\0';
538 curl_easy_setopt(
m_curl.get(), CURLOPT_URL,
m_url.c_str());
539 curl_easy_setopt(
m_curl.get(), CURLOPT_ERRORBUFFER, m_curl_error_buffer);
540 curl_easy_setopt(
m_curl.get(), CURLOPT_HEADERFUNCTION, CurlStatOp::HeaderCallback);
541 curl_easy_setopt(
m_curl.get(), CURLOPT_HEADERDATA,
this);
542 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEFUNCTION, NullCallback);
543 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEDATA,
nullptr);
544 curl_easy_setopt(
m_curl.get(), CURLOPT_XFERINFOFUNCTION, CurlOperation::XferInfoCallback);
545 curl_easy_setopt(
m_curl.get(), CURLOPT_XFERINFODATA,
this);
546 curl_easy_setopt(
m_curl.get(), CURLOPT_NOPROGRESS, 0L);
549 curl_easy_setopt(
m_curl.get(), CURLOPT_NOSIGNAL, 1L);
554 if ((env->GetInt(
"HttpDisableX509", disable_x509) && !disable_x509)) {
558 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
562 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLKEY, key.c_str());
567 if (m_conn_callout) {
569 auto callout = m_conn_callout(
m_url, info);
571 m_callout.reset(callout);
572 m_conn_callout_listener = -1;
573 m_conn_callout_result = -1;
574 m_tried_broker =
false;
576 auto [host, port] = ParseHostPort(
m_url);
577 if (host.empty() || port == -1) {
578 throw std::runtime_error (
"Failed to parse host and port from URL " +
m_url);
580 auto fake_addr = GetFakeEndpointForHost(host, port);
581 if (!fake_addr || fake_addr->empty()) {
582 throw std::runtime_error(
"Failed to generate a fake address for host " + host);
584 m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
585 (host +
":" + std::to_string(port) +
":" + *fake_addr).c_str()));
586 m_logger->Debug(
kLogXrdClHttp,
"For connection callout in operation setup, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
588 curl_easy_setopt(
m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
590 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
591 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
592 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETDATA,
this);
593 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
594 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
595 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTDATA,
this);
605 m_conn_callout_listener = -1;
606 m_conn_callout_result = -1;
607 m_tried_broker =
false;
610 if (
m_curl ==
nullptr)
return;
611 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETFUNCTION,
nullptr);
612 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION,
nullptr);
613 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETDATA,
nullptr);
614 curl_easy_setopt(
m_curl.get(), CURLOPT_CLOSESOCKETDATA,
nullptr);
615 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTFUNCTION,
nullptr);
616 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTDATA,
nullptr);
617 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLCERT,
nullptr);
618 curl_easy_setopt(
m_curl.get(), CURLOPT_SSLKEY,
nullptr);
619 curl_easy_setopt(
m_curl.get(), CURLOPT_HTTPHEADER,
nullptr);
620 curl_easy_setopt(
m_curl.get(), CURLOPT_CONNECT_TO,
nullptr);
621 m_header_slist.reset();
626CurlOperation::OpenSocketCallback(
void *clientp, curlsocktype purpose,
struct curl_sockaddr *address)
629 auto fd = me->m_conn_callout_result;
630 me->m_conn_callout_result = -1;
633 if ((me->m_conn_callout_listener = me->m_callout->BeginCallout(err, me->m_header_expiry)) == -1) {
634 me->m_logger->Debug(
kLogXrdClHttp,
"Failed to start a connection callout request: %s", err.c_str());
636 return CURL_SOCKET_BAD;
638 sockaddr_in *inaddr =
reinterpret_cast<sockaddr_in*
>(&address->addr);
639 char ip_str[INET_ADDRSTRLEN];
640 char full_address_str[INET_ADDRSTRLEN + 6];
641 inet_ntop(AF_INET, &(inaddr->sin_addr), ip_str, INET_ADDRSTRLEN);
642 int port = ntohs(inaddr->sin_port);
643 snprintf(full_address_str,
sizeof(full_address_str),
"%s:%d", ip_str, port);
644 me->m_logger->Debug(
kLogXrdClHttp,
"Recording socket %d for %s", fd, full_address_str);
645 auto reverse_iter = reverse_fake_dns_map.find(full_address_str);
646 if (reverse_iter == reverse_fake_dns_map.end()) {
647 me->m_logger->Error(
kLogXrdClHttp,
"Failed to find fake DNS reverse entry for %s", full_address_str);
649 return CURL_SOCKET_BAD;
651 auto iter = fake_dns_refcount.find(reverse_iter->second.second);
652 if (iter == fake_dns_refcount.end()) {
653 me->m_logger->Error(
kLogXrdClHttp,
"Failed to find fake DNS refcount entry for %s", full_address_str);
655 return CURL_SOCKET_BAD;
657 iter->second->count++;
658 iter->second->last_used = std::chrono::steady_clock::now();
666CurlOperation::SockOptCallback(
void *clientp, curl_socket_t curlfd, curlsocktype purpose)
668 return CURL_SOCKOPT_ALREADY_CONNECTED;
672CurlOperation::CloseSocketCallback(
void *clientp, curl_socket_t fd)
675 auto me =
reinterpret_cast<std::string*
>(clientp);
676 if (me ==
nullptr) {
return 0;}
677 auto iter = fake_dns_refcount.find(me);
678 if (iter != fake_dns_refcount.end()) {
679 iter->second->count--;
680 if (iter->second->count <= 0 && iter->second->IsExpired(std::chrono::steady_clock::now())) {
681 auto rev_iter = reverse_fake_dns_map.find(*me);
682 if (rev_iter != reverse_fake_dns_map.end()) {
683 fake_dns_map.erase(rev_iter->second.first);
684 reverse_fake_dns_map.erase(rev_iter);
686 fake_dns_refcount.erase(iter);
696 auto now = std::chrono::steady_clock::now();
697 for (
auto it = fake_dns_refcount.begin(); it != fake_dns_refcount.end(); ) {
698 if (it->second->count <= 0 && it->second->IsExpired(now)) {
699 auto rev_iter = reverse_fake_dns_map.find(*it->first);
700 if (rev_iter != reverse_fake_dns_map.end()) {
701 fake_dns_map.erase(rev_iter->second.first);
702 reverse_fake_dns_map.erase(rev_iter);
704 it = fake_dns_refcount.erase(it);
712CurlOperation::XferInfoCallback(
void *clientp, curl_off_t , curl_off_t dlnow, curl_off_t , curl_off_t ulnow)
715 auto now = std::chrono::steady_clock::now();
716 if (me->HeaderTimeoutExpired(now) || me->OperationTimeoutExpired(now)) {
719 uint64_t xfer_bytes = dlnow > ulnow ? dlnow : ulnow;
720 if (me->TransferStalled(xfer_bytes, now)) {
729 m_conn_callout_result = m_callout ? m_callout->FinishCallout(err) : -1;
730 if (m_callout && m_conn_callout_result == -1) {
732 }
else if (m_callout) {
735 return m_conn_callout_result;
std::chrono::steady_clock::time_point CalculateExpiry(struct timespec timeout)
int emsg(int rc, char *msg)
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
static int m_minimum_transfer_rate
bool FinishSetup(CURL *curl)
std::chrono::steady_clock::time_point m_header_expiry
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now)
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
virtual void ReleaseHandle()
static void CleanupDnsCache()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
static constexpr int m_default_minimum_rate
std::vector< std::pair< std::string, std::string > > m_headers_list
HeaderCallout * m_header_callout
bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual int WaitSocketCallback(std::string &err)
std::chrono::steady_clock::time_point m_operation_expiry
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
virtual RedirectAction Redirect(std::string &target)
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
void SetPaused(bool paused)
bool StartConnectionCallout(std::string &err)
bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual bool Setup(CURL *curl, CurlWorker &)
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
static Env * GetEnv()
Get default client environment.
Handle an async response.
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
const uint16_t errErrorResponse
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.