XRootD
Loading...
Searching...
No Matches
XrdHttpTpcState.cc
Go to the documentation of this file.
1
2#include <algorithm>
3#include <sstream>
4#include <stdexcept>
5
6#include "XrdVersion.hh"
10
11#include <curl/curl.h>
12
13#include "XrdHttpTpcState.hh"
14#include "XrdHttpTpcStream.hh"
15
17
18using namespace TPC;
19
20
22 if (m_headers) {
23 curl_slist_free_all(m_headers);
24 m_headers = NULL;
25 if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
26 }
27}
28
29
30void State::Move(State &other)
31{
32 m_push = other.m_push;
33 m_recv_status_line = other.m_recv_status_line;
34 m_recv_all_headers = other.m_recv_all_headers;
35 m_offset = other.m_offset;
36 m_start_offset = other.m_start_offset;
37 m_status_code = other.m_status_code;
38 m_content_length = other.m_content_length;
39 m_push_length = other.m_push_length;
40 m_stream = other.m_stream;
41 m_curl = other.m_curl;
42 m_headers = other.m_headers;
43 m_headers_copy = other.m_headers_copy;
44 m_resp_protocol = other.m_resp_protocol;
45 m_is_transfer_state = other.m_is_transfer_state;
46 curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
47 if (m_is_transfer_state) {
48 if (m_push) {
49 curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
50 } else {
51 curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
52 }
53 }
54 tpcForwardCreds = other.tpcForwardCreds;
55 other.m_headers_copy.clear();
56 other.m_curl = NULL;
57 other.m_headers = NULL;
58 other.m_stream = NULL;
59 other.m_repr_digests = m_repr_digests;
60}
61
62
63bool State::InstallHandlers(CURL *curl) {
64 curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
65 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
66 curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
67 if(m_is_transfer_state) {
68 if (m_push) {
69 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
70 curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
71 curl_easy_setopt(curl, CURLOPT_READDATA, this);
72 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::PushRespCB);
73 curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
74 struct stat buf;
75 if (SFS_OK == m_stream->Stat(&buf)) {
76 m_push_length = buf.st_size;
77 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
78 }
79 } else {
80 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
81 curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
82 }
83 }
84 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
85 if(tpcForwardCreds) {
86 curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
87 }
88
89 // Only use low-speed limits with libcurl v7.38 or later.
90 // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
91 curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
92 if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
93 // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
94 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
95 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
96 }
97 return true;
98}
99
108 struct curl_slist *list = NULL;
109 for (const auto & [header,value]: req.headers) {
110 if (!strncasecmp(header.c_str(),"copy-header", 11)) {
111 list = curl_slist_append(list, value.c_str());
112 m_headers_copy.emplace_back(value);
113 }
114 // Note: len("TransferHeader") == 14
115 if (!strncasecmp(header.c_str(),"transferheader",14)) {
116 std::stringstream ss;
117 ss << header.substr(14) << ": " << value;
118 list = curl_slist_append(list, ss.str().c_str());
119 m_headers_copy.emplace_back(ss.str());
120 }
121 }
122
123 if(m_is_transfer_state && !m_push && !req.mReprDigest.empty()) {
124 size_t reprDigestSize = req.mReprDigest.size();
125 std::stringstream ss;
126 ss << "Want-Repr-Digest: ";
127 size_t cpt = 1;
128 for (const auto &kv: req.mReprDigest) {
129 // We put the same weight for the digest names as we do not have any way, according to the specs,
130 // to give priority to a digest name in particular
131 ss << kv.first << '=' << 5;
132 if(cpt < reprDigestSize) {
133 ss << ',';
134 }
135 cpt++;
136 }
137 list = curl_slist_append(list, ss.str().c_str());
138 m_headers_copy.emplace_back(ss.str());
139 }
140
141 if (m_is_transfer_state && m_push && m_push_length > 0) {
142 // On libcurl 8.5.0 - 8.9.1, we've observed bugs causing failures whenever
143 // `Expect: 100-continue` is not used. Older versions of libcurl unconditionally
144 // set `Expect` whenever PUT is used (likely an older bug). To workaround the issue,
145 // we force `Expect` to be set, triggering the older libcurl behavior.
146 // See: https://github.com/xrootd/xrootd/issues/2470
147 // See: https://github.com/curl/curl/issues/17004
148 list = curl_slist_append(list, "Expect: 100-continue");
149 // Add Repr-Digest header to PUT request (PUSH)
150 auto reprDigest = XrdOucTUtils::caseInsensitiveFind(req.headers,"repr-digest");
151 if(reprDigest != req.headers.end()) {
152 std::string reprDigestHeader {"Repr-Digest: " + reprDigest->second};
153 curl_slist_append(list,reprDigestHeader.c_str());
154 }
155 }
156
157 if (list != nullptr) {
158 curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
159 m_headers = list;
160 }
161}
162
164 struct curl_slist *list = NULL;
165 for (const auto & [header,value]: req.headers) {
166 if (!strncasecmp(header.c_str(),"copy-header", 11)) {
167 list = curl_slist_append(list, value.c_str());
168 }
169 // Note: len("TransferHeader") == 14
170 if (!strncasecmp(header.c_str(),"transferheader",14)) {
171 std::stringstream ss;
172 ss << header.substr(14) << ": " << value;
173 list = curl_slist_append(list, ss.str().c_str());
174 }
175 }
176 if(!req.mReprDigest.empty()) {
177 size_t reprDigestSize = req.mReprDigest.size();
178 std::stringstream ss;
179 ss << "Want-Repr-Digest: ";
180 size_t cpt = 1;
181 for (const auto &kv: req.mReprDigest) {
182 // We put the same weight for the digest names as we do not have any way, according to the specs,
183 // to give priority to a digest name in particular
184 ss << kv.first << '=' << 5;
185 if(cpt < reprDigestSize) {
186 ss << ',';
187 }
188 cpt++;
189 }
190 list = curl_slist_append(list, ss.str().c_str());
191 }
192
193 if (list != nullptr) {
194 curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
195 }
196}
197
199 m_offset = 0;
200 m_status_code = -1;
201 m_content_length = -1;
202 m_push_length = -1;
203 m_recv_all_headers = false;
204 m_recv_status_line = false;
205 m_repr_digests.clear();
206}
207
208size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
209{
210 State *obj = static_cast<State*>(userdata);
211 std::string header(buffer, size*nitems);
212 return obj->Header(header);
213}
214
215int State::Header(const std::string &header) {
216 //printf("Received remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
217 if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
218 m_recv_all_headers = false;
219 m_recv_status_line = false;
220 }
221 if (!m_recv_status_line) {
222 std::stringstream ss(header);
223 std::string item;
224 if (!std::getline(ss, item, ' ')) return 0;
225 m_resp_protocol = item;
226 //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
227 if (!std::getline(ss, item, ' ')) return 0;
228 try {
229 m_status_code = std::stol(item);
230 } catch (...) {
231 return 0;
232 }
233 m_recv_status_line = true;
234 } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
235 m_recv_all_headers = true;
236 }
237 else if (header != "\r\n") {
238 // Parse the header
239 std::size_t found = header.find(":");
240 if (found != std::string::npos) {
241 std::string header_name = header.substr(0, found);
242 std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
243 std::string header_value = header.substr(found+1);
244 if (header_name == "content-length")
245 {
246 try {
247 m_content_length = std::stoll(header_value);
248 } catch (...) {
249 // Header unparseable -- not a great sign, fail request.
250 //printf("Content-length header unparseable\n");
251 return 0;
252 }
253 }
254 if(header_name == "repr-digest") {
255 XrdHttpHeaderUtils::parseReprDigest(header_value,m_repr_digests);
256 }
257 } else {
258 // Non-empty header that isn't the status line, but no ':' present --
259 // malformed request?
260 //printf("Malformed header: %s\n", header.c_str());
261 return 0;
262 }
263 }
264 return header.size();
265}
266
267size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
268 State *obj = static_cast<State*>(userdata);
269 if (obj->GetStatusCode() < 0) {
270 return 0;
271 } // malformed request - got body before headers.
272 if (obj->GetStatusCode() >= 400) {
273 obj->m_error_buf += std::string(static_cast<char*>(buffer),
274 std::min(static_cast<size_t>(1024), size*nitems));
275 // Record error messages until we hit a KB; at that point, fail out.
276 if (obj->m_error_buf.size() >= 1024)
277 return 0;
278 else
279 return size*nitems;
280 } // Status indicates failure.
281 return obj->Write(static_cast<char*>(buffer), size*nitems);
282}
283
289size_t State::PushRespCB(void *buffer, size_t size, size_t nitems, void *userdata) {
290 State *obj = static_cast<State*>(userdata);
291 // Note: The obj's status code is set by the HeaderCB once there's a reply from the passive server
292 if (obj->GetStatusCode() < 0) {
293 return 0;
294 } // malformed request - got body before headers.
295 if (obj->GetStatusCode() >= 400) {
296 obj->m_error_buf += std::string(static_cast<char*>(buffer),
297 std::min(static_cast<size_t>(1024), size*nitems));
298 // Record error messages until we hit a KB; at that point, fail out.
299 if (obj->m_error_buf.size() >= 1024)
300 return 0;
301 else
302 return size*nitems;
303 }
304 return size*nitems;
305}
306
307ssize_t State::Write(char *buffer, size_t size) {
308 ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
309 if (retval == SFS_ERROR) {
310 m_error_buf = m_stream->GetErrorMessage();
311 m_error_code = 1;
312 return -1;
313 }
314 m_offset += retval;
315 return retval;
316}
317
319 if (m_push) {
320 return 0;
321 }
322
323 ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
324 if (retval == SFS_ERROR) {
325 m_error_buf = m_stream->GetErrorMessage();
326 m_error_code = 2;
327 return -1;
328 }
329 m_offset += retval;
330 return retval;
331}
332
333size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
334 State *obj = static_cast<State*>(userdata);
335 if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
336 if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
337 return obj->Read(static_cast<char*>(buffer), size*nitems);
338}
339
340int State::Read(char *buffer, size_t size) {
341 int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
342 if (retval == SFS_ERROR) {
343 return -1;
344 }
345 m_offset += retval;
346 //printf("Read a total of %ld bytes.\n", m_offset);
347 return retval;
348}
349
351 CURL *curl = curl_easy_duphandle(m_curl);
352 if (!curl) {
353 throw std::runtime_error("Failed to duplicate existing curl handle.");
354 }
355
356 State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
357
358 if (m_headers) {
359 state->m_headers_copy.reserve(m_headers_copy.size());
360 for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
361 header_iter != m_headers_copy.end();
362 header_iter++) {
363 state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
364 state->m_headers_copy.push_back(*header_iter);
365 }
366 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
367 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
368 }
369
370 return state;
371}
372
373void State::SetTransferParameters(off_t offset, size_t size) {
374 m_start_offset = offset;
375 m_offset = 0;
376 m_content_length = size;
377 std::stringstream ss;
378 ss << offset << "-" << (offset+size-1);
379 curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
380}
381
383{
384 return m_stream->AvailableBuffers();
385}
386
388{
389 m_stream->DumpBuffers();
390}
391
393{
394 if (!m_stream->Finalize()) {
395 m_error_buf = m_stream->GetErrorMessage();
396 m_error_code = 3;
397 return false;
398 }
399 return true;
400}
401
403{
404 // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
405 // library versions, simply omit this information.
406#if LIBCURL_VERSION_NUM >= 0x071500
407 char *curl_ip = NULL;
408 CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
409 if ((rc != CURLE_OK) || !curl_ip) {
410 return "";
411 }
412 long curl_port = 0;
413 rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
414 if ((rc != CURLE_OK) || !curl_port) {
415 return "";
416 }
417 std::stringstream ss;
418 // libcurl returns IPv6 addresses of the form:
419 // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
420 // However the HTTP-TPC spec says to use the form
421 // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
422 // Hence, we add '[' and ']' whenever a ':' is seen.
423 if (NULL == strchr(curl_ip, ':'))
424 ss << "tcp:" << curl_ip << ":" << curl_port;
425 else
426 ss << "tcp:[" << curl_ip << "]:" << curl_port;
427 return ss.str();
428#else
429 return "";
430#endif
431}
void CURL
#define stat(a, b)
Definition XrdPosix.hh:101
#define SFS_ERROR
#define SFS_OK
State * Duplicate()
void Move(State &other)
int GetStatusCode() const
void DumpBuffers() const
void ResetAfterRequest()
void SetTransferParameters(off_t offset, size_t size)
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetupHeadersForHEAD(XrdHttpExtReq &req)
int AvailableBuffers() const
int Stat(struct stat *)
std::map< std::string, std::string > & headers
std::map< std::string, std::string > mReprDigest
Repr-Digest map where the key is the digest name and the value is the base64 encoded digest value.
static void parseReprDigest(const std::string &value, std::map< std::string, std::string > &output)
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)