22#include <XrdOuc/XrdOucJson.hh>
29std::atomic<uint64_t> File::m_prefetch_count = 0;
30std::atomic<uint64_t> File::m_prefetch_expired_count = 0;
31std::atomic<uint64_t> File::m_prefetch_failed_count = 0;
32std::atomic<uint64_t> File::m_prefetch_reads_hit = 0;
33std::atomic<uint64_t> File::m_prefetch_reads_miss = 0;
34std::atomic<uint64_t> File::m_prefetch_bytes_used = 0;
45 OpenFullDownloadResponseHandler(
bool *is_opened,
bool send_response_info, XrdCl::ResponseHandler *handler)
46 : m_send_response_info(send_response_info), m_is_opened(is_opened), m_handler(handler)
49 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
50 std::unique_ptr<OpenFullDownloadResponseHandler> holder(
this);
51 std::unique_ptr<XrdCl::AnyObject> response_holder(response);
52 std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
54 if (!status || !status->
IsOK()) {
55 if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
58 if (m_is_opened) *m_is_opened =
true;
62 if (m_send_response_info) {
63 XrdCl::ChunkInfo *ci =
nullptr;
69 std::unique_ptr<XrdClHttp::ReadResponseInfo> read_response_info(
static_cast<XrdClHttp::ReadResponseInfo *
>(ci));
70 auto info = read_response_info->GetResponseInfo();
71 XrdClHttp::OpenResponseInfo *open_info(
new XrdClHttp::OpenResponseInfo());
72 open_info->SetResponseInfo(std::move(info));
73 auto obj =
new XrdCl::AnyObject();
75 m_handler->HandleResponse(status_holder.release(), obj);
77 m_handler->HandleResponse(status_holder.release(),
nullptr);
81 bool m_send_response_info;
83 XrdCl::ResponseHandler *m_handler;
90 OpenResponseHandler(
bool *is_opened, XrdCl::ResponseHandler *handler)
91 : m_is_opened(is_opened), m_handler(handler)
94 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
95 std::unique_ptr<OpenResponseHandler> holder(
this);
96 std::unique_ptr<XrdCl::AnyObject> response_holder(response);
97 std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
99 if (!status || !status->
IsOK()) {
100 if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
103 if (m_is_opened) *m_is_opened =
true;
107 m_handler->HandleResponse(status_holder.release(), response_holder.release());
112 XrdCl::ResponseHandler *m_handler;
120 PgReadResponseHandler(XrdCl::ResponseHandler *handler)
124 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
125 std::unique_ptr<PgReadResponseHandler> holder(
this);
126 if (!status || !status->
IsOK()) {
127 if (m_handler) m_handler->HandleResponse(status, response);
128 else delete response;
137 XrdCl::ChunkInfo *ci =
nullptr;
141 if (m_handler) m_handler->HandleResponse(status,
nullptr);
144 std::vector<uint32_t> cksums;
147 cksums.reserve(nbpages);
149 auto buffer =
static_cast<const char *
>(ci->
GetBuffer());
151 for (
size_t pg=0; pg<nbpages; ++pg)
154 if (pgsize > size) pgsize = size;
161 auto obj =
new XrdCl::AnyObject();
164 auto handle = m_handler;
166 handle->HandleResponse(status, obj);
170 XrdCl::ResponseHandler *m_handler;
177 CloseCreateHandler(XrdCl::ResponseHandler *handler)
181 virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
182 std::unique_ptr<CloseCreateHandler> self(
this);
183 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
184 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
186 if (m_handler) m_handler->HandleResponse(status.release(),
nullptr);
190 XrdCl::ResponseHandler *m_handler;
197struct timespec
XrdClHttp::File::m_min_client_timeout = {2, 0};
198struct timespec
XrdClHttp::File::m_default_header_timeout = {9, 5};
199struct timespec
XrdClHttp::File::m_fed_timeout = {5, 0};
203 auto handler = m_put_handler.load(std::memory_order_acquire);
210 handler->WaitForCompletion();
216File::GetConnCallout()
const {
217 std::string pointer_str;
218 if (!
GetProperty(
"XrdClConnectionCallout", pointer_str) && pointer_str.empty()) {
223 pointer = std::stoll(pointer_str,
nullptr, 16);
237 if (!timeout_string.empty()) {
243 logger->Error(
kLogXrdClHttp,
"Failed to parse xrdclhttp.timeout parameter: %s", errmsg.c_str());
244 }
else if (
ts.tv_sec >= 1) {
251 if (
ts.tv_sec < mct.tv_sec ||
252 (
ts.tv_sec == mct.tv_sec &&
ts.tv_nsec < mct.tv_nsec))
254 ts.tv_sec = mct.tv_sec;
255 ts.tv_nsec = mct.tv_nsec;
264 if (oper_timeout == 0) {
269 if (oper_timeout <= 0) {
270 return header_timeout;
272 if (oper_timeout == header_timeout.tv_sec) {
273 return {header_timeout.tv_sec, 0};
274 }
else if (header_timeout.tv_sec < oper_timeout) {
275 return header_timeout;
277 return {oper_timeout, 0};
290 return "{\"prefetch\": {"
291 "\"count\": " + std::to_string(m_prefetch_count) +
","
292 "\"expired\": " + std::to_string(m_prefetch_expired_count) +
","
293 "\"failed\": " + std::to_string(m_prefetch_failed_count) +
","
294 "\"reads_hit\": " + std::to_string(m_prefetch_reads_hit) +
","
295 "\"reads_miss\": " + std::to_string(m_prefetch_reads_miss) +
","
296 "\"bytes_used\": " + std::to_string(m_prefetch_bytes_used) +
308 m_logger->Error(
kLogXrdClHttp,
"URL %s already open", url.c_str());
327 (handler ==
nullptr) && (timeout == 0))
332 m_open_flags = flags;
334 m_header_timeout.tv_nsec = m_default_header_timeout.tv_nsec;
335 m_header_timeout.tv_sec = m_default_header_timeout.tv_sec;
337 parsed_url.SetPort(0);
338 if (!parsed_url.FromString(url)) {
339 m_logger->Error(
kLogXrdClHttp,
"Failed to parse provided URL as a valid URL: %s", url.c_str());
342 auto pm = parsed_url.GetParams();
343 auto iter = pm.find(
"xrdclhttp.timeout");
344 std::string timeout_string = (iter == pm.end()) ?
"" : iter->second;
347 parsed_url.SetParams(pm);
348 iter = pm.find(
"oss.asize");
349 if (iter != pm.end()) {
351 auto ec = std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), asize);
352 if ((
ec.ec == std::errc()) && (
ec.ptr == iter->second.c_str() + iter->second.size()) && asize >= 0) {
358 parsed_url.SetParams(pm);
361 m_url = parsed_url.GetURL();
367 bool full_download = m_full_download.load(std::memory_order_relaxed);
368 m_default_prefetch_handler.reset(
new PrefetchDefaultHandler(*
this));
370 m_default_prefetch_handler->m_prefetch_enabled.store(
true, std::memory_order_relaxed);
374 m_logger->Debug(
kLogXrdClHttp,
"Opening %s in full download mode", m_url.c_str());
376 handler =
new OpenFullDownloadResponseHandler(&m_is_opened, SendResponseInfo(), handler);
377 m_prefetch_size = std::numeric_limits<off_t>::max();
378 auto [status, ok] = ReadPrefetch(0, 0,
nullptr, handler, timeout,
false);
382 m_logger->Error(
kLogXrdClHttp,
"Failed to start prefetch of data at open (URL %s): %s", m_url.c_str(), status.
ToString().c_str());
388 m_logger->Debug(
kLogXrdClHttp,
"Opening %s (with timeout %lld)", m_url.c_str(), (
long long) timeout);
391 handler =
new OpenResponseHandler(&m_is_opened, handler);
393 std::shared_ptr<XrdClHttp::CurlOpenOp> openOp(
395 handler, GetCurrentURL(),
ts, m_logger,
this, SendResponseInfo(), GetConnCallout(),
396 &m_default_header_callout
400 m_queue->Produce(std::move(openOp));
402 m_logger->Warning(
kLogXrdClHttp,
"Failed to add open op to queue");
414 m_logger->Error(
kLogXrdClHttp,
"Cannot close. URL isn't open");
420 if (m_put_op && !m_put_op->HasFailed()) {
421 auto put_size = m_put_offset.load(std::memory_order_relaxed);
422 if (m_asize >= 0 && put_size == m_asize) {
423 if (put_size == m_asize) {
424 m_logger->Debug(
kLogXrdClHttp,
"Closing a finished file %s", m_url.c_str());
426 m_logger->Debug(
kLogXrdClHttp,
"Closing a file %s with partial size (offset %llu, expected %lld)",
427 m_url.c_str(),
static_cast<unsigned long long>(put_size),
static_cast<long long>(m_asize));
429 0,
"Cannot close file with partial size"));
432 m_logger->Debug(
kLogXrdClHttp,
"Flushing final write buffer on close");
433 auto put_handler = m_put_handler.load(std::memory_order_acquire);
435 return put_handler->QueueWrite(std::make_pair(
nullptr, 0), handler);
437 m_logger->Error(
kLogXrdClHttp,
"Internal state error - put operation ongoing without handle");
443 timespec_get(&
ts, TIME_UTC);
444 ts.tv_sec += timeout;
446 auto handler_wrapper =
new PutResponseHandler(
new CloseCreateHandler(handler));
447 m_put_handler.store(handler_wrapper, std::memory_order_release);
449 handler_wrapper, m_default_put_handler, m_url,
nullptr, 0,
ts, m_logger,
450 GetConnCallout(), &m_default_header_callout
452 handler_wrapper->SetOp(m_put_op);
455 m_logger->Debug(
kLogXrdClHttp,
"Creating a zero-sized object at %s for close", m_url.c_str());
457 m_queue->Produce(m_put_op);
459 m_put_handler.store(
nullptr, std::memory_order_release);
460 m_logger->Warning(
kLogXrdClHttp,
"Failed to add put op to queue");
482 m_logger->Error(
kLogXrdClHttp,
"Cannot stat. URL isn't open");
486 std::string content_length_str;
487 int64_t content_length;
488 if (!
GetProperty(
"ContentLength", content_length_str)) {
489 m_logger->Error(
kLogXrdClHttp,
"Content length missing for %s", m_url.c_str());
493 content_length = std::stoll(content_length_str);
495 m_logger->Error(
kLogXrdClHttp,
"Content length not an integer for %s", m_url.c_str());
498 if (content_length < 0) {
499 m_logger->Error(
kLogXrdClHttp,
"Content length negative for %s", m_url.c_str());
503 m_logger->Debug(
kLogXrdClHttp,
"Successful stat operation on %s (size %lld)", m_url.c_str(),
static_cast<long long>(content_length));
518 m_logger->Error(
kLogXrdClHttp,
"Cannot run fcntl. URL isn't open");
533 xatt[
"ETag"] = etagRes;
538 if (cc.find(
"must-revalidate") != std::string::npos)
540 xatt[
"revalidate"] =
true;
542 size_t fm = cc.find(
"max-age=");
543 if (fm != std::string::npos)
546 for (
size_t i = fm; i < cc.length(); i++)
548 if (!std::isdigit(cc[i]))
550 std::string sa = cc.substr(fm, i);
551 long int a = std::stol(sa);
552 time_t t = time(NULL) + a;
560 m_logger->Debug(
kLogXrdClHttp,
"Fcntl content %s", xatt.dump().c_str());
572 msg =
"Server status query not supported.";
576 msg =
"Checksum query not supported.";
579 msg =
"Server configuration query not supported.";
582 msg =
"Local space stats query not supported.";
587 msg =
"Opaque query not supported.";
590 msg =
"Prepare status query not supported.";
593 msg =
"Invalid information query type code";
599 catch (
const std::exception& e)
601 m_logger->Warning(
kLogXrdClHttp,
"Failed to parse query code %s", e.what());
617 m_logger->Error(
kLogXrdClHttp,
"Cannot read. URL isn't open");
620 auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout,
false);
623 m_logger->Debug(
kLogXrdClHttp,
"Read %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size,
static_cast<long long>(offset));
625 m_logger->Warning(
kLogXrdClHttp,
"Read %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size,
static_cast<long long>(offset), status.
GetErrorMessage().c_str());
628 }
else if (m_full_download.load(std::memory_order_relaxed)) {
629 std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
630 if (m_prefetch_op && m_prefetch_op->IsDone() && (
static_cast<off_t
>(offset) == m_prefetch_offset.load(std::memory_order_acquire))) {
643 auto url = GetCurrentURL();
644 m_logger->Debug(
kLogXrdClHttp,
"Read %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size,
static_cast<long long>(offset),
static_cast<long long>(
ts.tv_sec));
646 std::shared_ptr<XrdClHttp::CurlReadOp> readOp(
648 handler, m_default_prefetch_handler, url,
ts, std::make_pair(offset, size),
649 static_cast<char*
>(buffer), size, m_logger,
650 GetConnCallout(), &m_default_header_callout
654 m_queue->Produce(std::move(readOp));
656 m_logger->Warning(
kLogXrdClHttp,
"Failed to add read op to queue");
663std::tuple<XrdCl::XRootDStatus, bool>
664File::ReadPrefetch(uint64_t offset, uint64_t size,
void *buffer,
XrdCl::ResponseHandler *handler, time_t timeout,
bool isPgRead)
667 auto prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled.load(std::memory_order_relaxed);
668 if (!prefetch_enabled) {
669 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
670 m_logger->
Dump(
kLogXrdClHttp,
"%sRead prefetch skipping due to prefetching being disabled", isPgRead ?
"Pg":
"");
673 std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
674 if (m_prefetch_size == -1) {
675 m_logger->Debug(
kLogXrdClHttp,
"%sRead prefetch skipping due to unknown file size", isPgRead ?
"Pg":
"");
676 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
677 m_default_prefetch_handler->m_prefetch_enabled =
false;
679 prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled;
680 if (!prefetch_enabled) {
681 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
682 return std::make_tuple(XrdCl::XRootDStatus{},
false);
686 handler =
new PgReadResponseHandler(handler);
689 auto url = GetCurrentURL();
690 if (!m_prefetch_op) {
692 if (m_prefetch_size == INT64_MAX) {
693 m_logger->Debug(
kLogXrdClHttp,
"%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch full object)", isPgRead ?
"Pg" :
"", url.c_str(),
static_cast<unsigned long long>(size),
static_cast<long long>(offset),
static_cast<long long>(
ts.tv_sec));
695 m_logger->Debug(
kLogXrdClHttp,
"%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch of size %lld)", isPgRead ?
"Pg" :
"", url.c_str(),
static_cast<unsigned long long>(size),
static_cast<long long>(offset),
static_cast<long long>(
ts.tv_sec),
static_cast<long long>(m_prefetch_size));
700 new PrefetchResponseHandler(*
this, offset, size, &m_prefetch_offset,
static_cast<char *
>(buffer), handler,
nullptr, timeout);
701 }
catch (std::runtime_error &exc) {
702 m_logger->Warning(
kLogXrdClHttp,
"Failed to create prefetch response handler: %s", exc.what());
703 m_default_prefetch_handler->m_prefetch_enabled =
false;
704 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
712 new XrdClHttp::CurlReadOp(
713 m_last_prefetch_handler, m_default_prefetch_handler, url,
ts, std::make_pair(offset, m_prefetch_size),
714 static_cast<char*
>(buffer), size, m_logger,
715 GetConnCallout(), &m_default_header_callout
718 new XrdClHttp::CurlPrefetchOpenOp(
719 *
this, m_last_prefetch_handler, m_default_prefetch_handler, url,
ts,
720 std::make_pair(offset, m_prefetch_size),
static_cast<char*
>(buffer), size, m_logger,
721 GetConnCallout(), &m_default_header_callout
725 m_prefetch_count.fetch_add(1, std::memory_order_relaxed);
726 m_prefetch_reads_hit.fetch_add(1, std::memory_order_relaxed);
727 m_prefetch_offset.store(offset + size, std::memory_order_release);
729 m_queue->Produce(m_prefetch_op);
731 m_logger->Warning(
kLogXrdClHttp,
"Failed to add prefetch read op to queue");
733 m_prefetch_op.reset();
734 m_default_prefetch_handler->m_prefetch_enabled =
false;
735 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
738 return std::make_tuple(XrdCl::XRootDStatus{},
true);
740 if (m_prefetch_op->IsDone()) {
742 m_default_prefetch_handler->m_prefetch_enabled =
false;
743 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
744 m_logger->Dump(
kLogXrdClHttp,
"%sRead prefetch skipping due to prefetching being already complete", isPgRead ?
"Pg":
"");
745 return std::make_tuple(XrdCl::XRootDStatus{},
false);
748 auto expected_offset =
static_cast<off_t
>(offset);
749 if (!m_prefetch_offset.compare_exchange_strong(expected_offset,
static_cast<off_t
>(offset + size), std::memory_order_acq_rel)) {
751 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
752 m_logger->Dump(
kLogXrdClHttp,
"%sRead prefetch skipping due to out-of-order reads (requested %lld; current offset %lld)", isPgRead ?
"Pg":
"",
static_cast<long long>(offset),
static_cast<long long>(expected_offset));
753 return std::make_tuple(XrdCl::XRootDStatus{},
false);
756 m_logger->Debug(
kLogXrdClHttp,
"%sRead %s (%llu bytes at offset %lld; using ongoing prefetch)", isPgRead ?
"Pg" :
"", GetCurrentURL().c_str(),
static_cast<unsigned long long>(size),
static_cast<long long>(offset));
761 new PrefetchResponseHandler(*
this, offset, size, &m_prefetch_offset,
static_cast<char *
>(buffer), handler, &lock, timeout);
762 }
catch (std::runtime_error &exc) {
763 m_logger->Warning(
kLogXrdClHttp,
"Failed to create prefetch response handler: %s", exc.what());
764 m_default_prefetch_handler->m_prefetch_enabled =
false;
765 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
769 return std::make_tuple(XrdCl::XRootDStatus{},
true);
779 m_logger->Error(
kLogXrdClHttp,
"Cannot do vector read: URL isn't open");
781 }
else if (m_full_download.load(std::memory_order_relaxed)) {
784 if (chunks.empty()) {
787 auto vr = std::make_unique<XrdCl::VectorReadInfo>();
790 obj->Set(vr.release());
797 auto url = GetCurrentURL();
798 m_logger->Debug(
kLogXrdClHttp,
"Read %s (%lld chunks; first chunk is %u bytes at offset %lld with timeout %lld)", url.c_str(),
static_cast<long long>(chunks.size()),
static_cast<unsigned>(chunks[0].GetLength()),
static_cast<long long>(chunks[0].GetOffset()),
static_cast<long long>(
ts.tv_sec));
800 std::shared_ptr<XrdClHttp::CurlVectorReadOp> readOp(
802 handler, url,
ts, chunks, m_logger, GetConnCallout(), &m_default_header_callout
806 m_queue->Produce(std::move(readOp));
808 m_logger->Warning(
kLogXrdClHttp,
"Failed to add vector read op to queue");
823 m_logger->Error(
kLogXrdClHttp,
"Cannot write: URL isn't open");
825 }
else if (m_full_download.load(std::memory_order_relaxed)) {
828 m_default_prefetch_handler->DisablePrefetch();
831 auto url = GetCurrentURL();
832 m_logger->Debug(
kLogXrdClHttp,
"Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size,
static_cast<long long>(offset),
static_cast<long long>(
ts.tv_sec));
834 auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
835 if (!handler_wrapper) {
836 handler_wrapper =
new PutResponseHandler(handler);
837 PutResponseHandler *expected_value =
nullptr;
838 if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
839 delete handler_wrapper;
840 return expected_value->QueueWrite(std::make_pair(buffer, size), handler);
844 m_put_handler.store(
nullptr, std::memory_order_release);
845 delete handler_wrapper;
846 m_logger->Warning(
kLogXrdClHttp,
"Cannot start PUT operation at non-zero offset");
850 handler_wrapper, m_default_put_handler, url,
static_cast<const char*
>(buffer), size,
ts, m_logger,
851 GetConnCallout(), &m_default_header_callout
853 handler_wrapper->SetOp(m_put_op);
854 m_put_offset.fetch_add(size, std::memory_order_acq_rel);
856 m_queue->Produce(m_put_op);
858 m_put_handler.store(
nullptr, std::memory_order_release);
859 delete handler_wrapper;
860 m_logger->Warning(
kLogXrdClHttp,
"Failed to add put op to queue");
866 auto old_offset = m_put_offset.fetch_add(size, std::memory_order_acq_rel);
867 if (
static_cast<off_t
>(offset) != old_offset) {
868 m_put_offset.fetch_sub(size, std::memory_order_acq_rel);
869 m_logger->Warning(
kLogXrdClHttp,
"Requested write offset at %lld does not match current file descriptor offset at %lld",
870 static_cast<long long>(offset),
static_cast<long long>(old_offset));
873 return handler_wrapper->QueueWrite(std::make_pair(buffer, size), handler);
883 m_logger->Error(
kLogXrdClHttp,
"Cannot write: URL isn't open");
886 m_default_prefetch_handler->DisablePrefetch();
889 auto url = GetCurrentURL();
890 m_logger->Debug(
kLogXrdClHttp,
"Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(),
static_cast<int>(buffer.GetSize()),
static_cast<long long>(offset),
static_cast<long long>(
ts.tv_sec));
892 auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
893 if (!handler_wrapper) {
894 handler_wrapper =
new PutResponseHandler(handler);
895 PutResponseHandler *expected_value =
nullptr;
896 if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
897 delete handler_wrapper;
898 return expected_value->QueueWrite(std::move(buffer), handler);
902 m_put_handler.store(
nullptr, std::memory_order_release);
903 delete handler_wrapper;
907 handler_wrapper, m_default_put_handler, url, std::move(buffer),
ts, m_logger,
908 GetConnCallout(), &m_default_header_callout
910 handler_wrapper->SetOp(m_put_op);
911 m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
913 m_queue->Produce(m_put_op);
915 m_put_handler.store(
nullptr, std::memory_order_release);
916 delete handler_wrapper;
917 m_logger->Warning(
kLogXrdClHttp,
"Failed to add put op to queue");
923 auto old_offset = m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
924 if (
static_cast<off_t
>(offset) != old_offset) {
925 m_put_offset.fetch_sub(buffer.GetSize(), std::memory_order_acq_rel);
926 m_logger->Warning(
kLogXrdClHttp,
"Requested write offset at %lld does not match current file descriptor offset at %lld",
927 static_cast<long long>(offset),
static_cast<long long>(old_offset));
930 return handler_wrapper->QueueWrite(std::move(buffer), handler);
941 m_logger->Error(
kLogXrdClHttp,
"Cannot pgread. URL isn't open");
944 auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout,
true);
947 m_logger->Debug(
kLogXrdClHttp,
"PgRead %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size,
static_cast<long long>(offset));
949 m_logger->Warning(
kLogXrdClHttp,
"PgRead %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size,
static_cast<long long>(offset), status.
GetErrorMessage().c_str());
952 }
else if (m_full_download.load(std::memory_order_relaxed)) {
957 auto url = GetCurrentURL();
958 m_logger->Debug(
kLogXrdClHttp,
"PgRead %s (%d bytes at offset %lld)", url.c_str(), size,
static_cast<long long>(offset));
960 std::shared_ptr<XrdClHttp::CurlPgReadOp> readOp(
962 handler, m_default_prefetch_handler, url,
ts, std::make_pair(offset, size),
963 static_cast<char*
>(buffer), size, m_logger,
964 GetConnCallout(), &m_default_header_callout
969 m_queue->Produce(std::move(readOp));
971 m_logger->Warning(
kLogXrdClHttp,
"Failed to add read op to queue");
986 std::string &value)
const
988 if (name ==
"CurrentURL") {
989 value = GetCurrentURL();
993 if (name ==
"IsPrefetching") {
994 value = m_default_prefetch_handler->IsPrefetching() ?
"true" :
"false";
998 std::shared_lock lock(m_properties_mutex);
999 if (name ==
"LastURL") {
1004 const auto p = m_properties.find(name);
1005 if (p == std::end(m_properties)) {
1013bool File::SendResponseInfo()
const {
1020 const std::string &value)
1022 if (name ==
"XrdClHttpHeaderCallout") {
1025 pointer = std::stoll(value,
nullptr, 16);
1030 }
else if (name ==
"XrdClHttpFullDownload") {
1031 if (value ==
"true") {
1032 auto prefetch_handler = m_default_prefetch_handler;
1033 if (prefetch_handler) {
1034 std::unique_lock lock(prefetch_handler->m_prefetch_mutex);
1035 prefetch_handler->m_prefetch_enabled.store(
true, std::memory_order_relaxed);
1037 m_full_download.store(
true, std::memory_order_relaxed);
1041 std::unique_lock lock(m_properties_mutex);
1043 m_properties[name] = value;
1044 if (name ==
"LastURL") {
1048 else if (name ==
"XrdClHttpQueryParam") {
1049 CalculateCurrentURL(value);
1051 else if (name ==
"XrdClHttpMaintenancePeriod") {
1053 auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), period);
1054 if ((
ec.ec == std::errc()) && (
ec.ptr == value.c_str() + value.size()) && period > 0) {
1055 m_logger->Debug(
kLogXrdClHttp,
"Setting maintenance period to %u", period);
1059 else if (name ==
"XrdClHttpStallTimeout") {
1063 m_logger->Debug(
kLogXrdClHttp,
"Failed to parse timeout value (%s): %s", value.c_str(), errmsg.c_str());
1068 else if (name ==
"XrdClHttpPrefetchSize") {
1070 auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), size);
1071 if ((
ec.ec == std::errc()) && (
ec.ptr == value.c_str() + value.size())) {
1073 std::unique_lock lock2(m_default_prefetch_handler->m_prefetch_mutex);
1074 m_prefetch_size = size;
1076 m_logger->Debug(
kLogXrdClHttp,
"XrdClHttpPrefetchSize value (%s) was not parseable", value.c_str());
1083File::GetCurrentURL()
const {
1085 std::shared_lock lock(m_properties_mutex);
1087 if (!m_url_current.empty()) {
1088 return m_url_current;
1089 }
else if (m_url.empty() && m_last_url.empty()) {
1093 std::unique_lock lock(m_properties_mutex);
1095 auto iter = m_properties.find(
"XrdClHttpQueryParam");
1096 if (iter == m_properties.end()) {
1097 return m_last_url.empty() ? m_url : m_last_url;
1099 CalculateCurrentURL(iter->second);
1101 return m_url_current;
1105File::CalculateCurrentURL(
const std::string &value)
const {
1106 const auto &last_url = m_last_url.empty() ? m_url : m_last_url;
1107 if (value.empty()) {
1108 m_url_current = last_url;
1110 auto loc = last_url.find(
'?');
1111 if (loc == std::string::npos) {
1112 m_url_current = last_url +
'?' + value;
1114 XrdCl::URL url(last_url);
1115 auto map = url.GetParams();
1116 url.SetParams(value);
1117 auto update_map = url.GetParams();
1118 for (
const auto &entry : map) {
1119 if (update_map.find(entry.first) == update_map.end()) {
1120 update_map[entry.first] = entry.second;
1124 std::stringstream ss;
1125 for (
const auto &entry : update_map) {
1126 ss << (first ?
"?" :
"&") << entry.first <<
"=" << entry.second;
1129 m_url_current = last_url.substr(0, loc) + ss.str();
1134File::PrefetchResponseHandler::PrefetchResponseHandler(
1135 File &
parent, off_t offset,
size_t size, std::atomic<off_t> *prefetch_offset,
char *buffer,
1136 XrdCl::ResponseHandler *handler, std::unique_lock<std::mutex> *lock, time_t timeout
1143 m_prefetch_offset(prefetch_offset),
1146 if (
parent.m_last_prefetch_handler) {
1147 parent.m_last_prefetch_handler->m_next = this;
1148 parent.m_last_prefetch_handler = this;
1150 m_parent.m_last_prefetch_handler = this;
1153 if (lock && m_parent.m_prefetch_op) {
1158 if (!parent.m_prefetch_op->Continue(parent.m_prefetch_op, this, buffer, size)) {
1163 if (parent.m_last_prefetch_handler == this)
1164 parent.m_last_prefetch_handler = nullptr;
1165 throw std::runtime_error(
"Failed to continue prefetch operation");
1172File::PrefetchResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1174 std::unique_ptr<PrefetchResponseHandler> owner(
this);
1176 bool mismatched_size =
false;
1178 if (status->
IsOK() && response) {
1179 XrdCl::ChunkInfo *ci =
nullptr;
1182 auto missing_bytes = m_size - ci->
GetLength();
1183 if (missing_bytes) {
1184 mismatched_size =
true;
1185 m_prefetch_offset->fetch_sub(missing_bytes, std::memory_order_relaxed);
1187 m_prefetch_bytes_used.fetch_add(ci->
GetLength(), std::memory_order_relaxed);
1189 }
else if (!status->
IsOK()) {
1190 m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1194 PrefetchResponseHandler *next;
1196 std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1200 if (status && status->
IsOK() && !mismatched_size) {
1201 m_parent.m_prefetch_op->Continue(m_parent.m_prefetch_op, next, next->m_buffer, next->m_size);
1208 m_parent.m_default_prefetch_handler->DisablePrefetch();
1209 next->ResubmitOperation();
1214 std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1215 if (m_parent.m_last_prefetch_handler ==
this) {
1216 m_parent.m_last_prefetch_handler =
nullptr;
1218 if (!status || !status->
IsOK()) {
1219 m_parent.m_prefetch_op.reset();
1220 m_parent.m_default_prefetch_handler->m_prefetch_enabled =
false;
1224 if (m_handler) m_handler->HandleResponse(status, response);
1225 else delete response;
1229File::PrefetchResponseHandler::ResubmitOperation()
1231 m_parent.m_logger->Debug(
kLogXrdClHttp,
"Resubmitting waiting prefetch operations as new reads due to prefetch failure");
1232 PrefetchResponseHandler *next =
this;
1235 auto st = next->m_parent.Read(next->m_offset, next->m_size, next->m_buffer, next->m_handler, next->m_timeout);
1236 if (!st.IsOK() && next->m_handler) {
1237 next->m_handler->HandleResponse(
new XrdCl::XRootDStatus(st),
nullptr);
1240 std::unique_lock lock(next->m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1241 next = next->m_next;
1248File::PrefetchDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
1249 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1250 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1251 if (status && !status->
IsOK()) {
1253 m_prefetch_expired_count.fetch_add(1, std::memory_order_relaxed);
1254 m_logger->Debug(
kLogXrdClHttp,
"Prefetch data for %s went unused; disabling.", m_url.c_str());
1256 m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1257 m_logger->Warning(
kLogXrdClHttp,
"Disabling prefetch of %s due to error: %s", m_url.c_str(), status->
ToStr().c_str());
1264File::PutDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1267 m_logger->Warning(
kLogXrdClHttp,
"Failing future write calls due to error: %s", status->
ToStr().c_str());
1272std::shared_ptr<XrdClHttp::HeaderCallout::HeaderList>
1273File::HeaderCallout::GetHeaders(
const std::string &verb,
1274 const std::string &url,
1275 const HeaderList &headers)
1277 auto parent_callout = m_parent.m_header_callout.load(std::memory_order_acquire);
1278 std::shared_ptr<std::vector<std::pair<std::string, std::string>>> result_headers;
1279 if (parent_callout !=
nullptr) {
1280 result_headers = parent_callout->GetHeaders(verb, url, headers);
1282 result_headers.reset(
new std::vector<std::pair<std::string, std::string>>{});
1283 for (
const auto & info : headers) {
1284 result_headers->emplace_back(info.first, info.second);
1287 if (m_parent.m_asize >= 0 && verb ==
"PUT") {
1288 if (!result_headers) {
1289 result_headers.reset(
new std::vector<std::pair<std::string, std::string>>{});
1291 auto iter = std::find_if(result_headers->begin(), result_headers->end(),
1292 [](
const auto &pair) { return !strcasecmp(pair.first.c_str(),
"Content-Length"); });
1293 if (iter == result_headers->end()) {
1294 result_headers->emplace_back(
"Content-Length", std::to_string(m_parent.m_asize));
1296 }
else if (!result_headers) {
1297 result_headers.reset(
new std::vector<std::pair<std::string, std::string>>{});
1299 return result_headers;
1302File::PutResponseHandler::PutResponseHandler(XrdCl::ResponseHandler *handler)
1303 : m_active_handler(handler)
1307File::PutResponseHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
1309 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1310 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1321 XrdCl::ResponseHandler *current_handler =
nullptr;
1322 if (!status->
IsOK()) {
1326 std::vector<XrdCl::ResponseHandler *> pending_handlers;
1328 std::lock_guard<std::mutex> lg(m_mutex);
1329 current_handler = m_active_handler;
1330 for (
auto &[_, h] : m_pending_writes) {
1331 if (h) pending_handlers.push_back(h);
1334 m_pending_writes.clear();
1336 m_active_handler =
nullptr;
1340 XrdCl::XRootDStatus status_copy(*status);
1341 if (current_handler) {
1342 current_handler->
HandleResponse(status.release(), response.release());
1345 for (
auto *h : pending_handlers) {
1346 h->HandleResponse(
new XrdCl::XRootDStatus(status_copy),
nullptr);
1351 current_handler = m_active_handler;
1352 if (ProcessQueue() && current_handler) {
1353 current_handler->
HandleResponse(status.release(), response.release());
1358File::PutResponseHandler::QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler)
1360 if (m_op->HasFailed()) {
1361 auto sc = m_op->GetStatusCode();
1364 auto err_msg = m_op->GetCurlErrorMessage();
1365 if (err_msg.empty()) {
1366 err_msg = m_op->GetStatusMessage();
1368 return XrdCl::XRootDStatus(
XrdCl::stError, httpErr.first, httpErr.second, err_msg);
1372 std::lock_guard<std::mutex> lg(m_mutex);
1375 m_active_handler = handler;
1376 if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1377 if (!m_op->Continue(m_op,
this, std::move(std::get<XrdCl::Buffer>(buffer)))) {
1383 auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1384 if (!m_op->Continue(m_op,
this,
static_cast<const char *
>(buffer_info.first), buffer_info.second)) {
1391 m_pending_writes.emplace_back(std::move(buffer), handler);
1393 return XrdCl::XRootDStatus{};
1398File::PutResponseHandler::ProcessQueue() {
1399 std::lock_guard<std::mutex> lg(m_mutex);
1400 if (m_pending_writes.empty()) {
1403 m_active_handler =
nullptr;
1409 auto & [buffer, handler] = m_pending_writes.front();
1411 m_active_handler = handler;
1412 if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1413 rv = m_op->Continue(m_op,
this, std::move(std::get<XrdCl::Buffer>(buffer)));
1415 auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1416 rv = m_op->Continue(m_op,
this,
static_cast<const char *
>(buffer_info.first), buffer_info.second);
1418 m_pending_writes.pop_front();
1422 if (m_active_handler) {
1425 for (
auto& [_, h] : m_pending_writes) {
1438File::PutResponseHandler::WaitForCompletion() {
1439 std::unique_lock lock(m_mutex);
1440 m_cv.wait(lock, [&]{
return !m_active;});
static std::string ts()
timestamp output for logging messages
#define ResponseInfoProperty
static void SetStallTimeout(int stall_interval)
static void SetMaintenancePeriod(unsigned maint)
virtual XrdCl::XRootDStatus Open(const std::string &url, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode mode, XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger)
File(std::shared_ptr< XrdClHttp::HandlerQueue > queue, XrdCl::Log *log)
static const struct timespec & GetDefaultHeaderTimeout()
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
virtual bool SetProperty(const std::string &name, const std::string &value) override
virtual bool IsOpen() const override
virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
static std::string GetMonitoringJson()
virtual bool GetProperty(const std::string &name, std::string &value) const override
static const struct timespec & GetMinimumHeaderTimeout()
virtual XrdCl::XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Stat(bool force, XrdCl::ResponseHandler *handler, time_t timeout) override
struct timespec GetHeaderTimeout(time_t oper_timeout) const
virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, time_t timeout) override
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void FromString(const std::string str)
Fill the buffer from a string.
std::string ToString() const
Convert the buffer to a string.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
@ DebugMsg
print debug info
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
@ IsReadable
Read access is allowed.
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
bool HTTPStatusIsError(unsigned status)
const uint64_t kLogXrdClHttp
std::string MarshalDuration(const struct timespec &timeout)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t errInvalidOp
const uint16_t errOSError
const uint16_t errInvalidResponse
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
Code
XRootD query request codes.
@ OpaqueFile
Implementation dependent.
@ XAttr
Query file extended attributes.
@ Opaque
Implementation dependent.
@ Config
Query server configuration.
@ Stats
Query server stats.
@ ChecksumCancel
Query file checksum cancellation.
@ Checksum
Query file checksum.
@ Space
Query logical space stats.
@ Prepare
Query prepare status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.