XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
30#include "XrdOuc/XrdOucJson.hh"
32
33#include "XrdCl/XrdClURL.hh"
35
36#include <cassert>
37#include <cstdio>
38#include <sstream>
39#include <unordered_map>
40
41#include <fcntl.h>
42
43using namespace XrdPfc;
44
45namespace
46{
47
48const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49
50Cache* cache() { return &Cache::GetInstance(); }
51
52}
53
54const char *File::m_traceID = "File";
55
56//------------------------------------------------------------------------------
57
58File::File(const std::string& path, long long iOffset, long long iFileSize) :
59 m_ref_cnt(0),
60 m_data_file(0),
61 m_info_file(0),
62 m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
63 m_filename(path),
64 m_offset(iOffset),
65 m_file_size(iFileSize),
66 m_current_io(m_io_set.end()),
67 m_ios_in_detach(0),
68 m_non_flushed_cnt(0),
69 m_in_sync(false),
70 m_detach_time_logged(false),
71 m_in_shutdown(false),
72 m_state_cond(0),
73 m_block_size(0),
74 m_num_blocks(0),
75 m_resmon_token(-1),
76 m_prefetch_state(kOff),
77 m_prefetch_bytes(0),
78 m_prefetch_read_cnt(0),
79 m_prefetch_hit_cnt(0),
80 m_prefetch_score(0)
81{}
82
83File::~File()
84{
85 TRACEF(Debug, "~File() for ");
86}
87
88void File::Close()
89{
90 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
91 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
92 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
93 // in report_and_merge_delta_stats() below.
94 //
95 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
96 // get reported in as stat.st_blocks.
97 // The reported number is correct in a stat immediately following a close.
98 // If one starts off by writing the last byte of the file, this pre-allocation does not get
99 // triggered up to that point. But comes back with a vengeance right after.
100 //
101 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
102
103 if (m_info_file)
104 {
105 TRACEF(Debug, "Close() closing info-file ");
106 m_info_file->Close();
107 delete m_info_file;
108 m_info_file = nullptr;
109 }
110
111 if (m_data_file)
112 {
113 TRACEF(Debug, "Close() closing data-file ");
114 m_data_file->Close();
115 delete m_data_file;
116 m_data_file = nullptr;
117 }
118
119 if (m_resmon_token >= 0)
120 {
121 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
122 // but in this case the file will get unlinked by the cache and reported as purge event.
123 // We check if the reported st_blocks so far is correct.
124 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
125 struct stat s;
126 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
127 if (sr == 0 && s.st_blocks != m_st_blocks) {
128 Stats stats;
129 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
130 m_st_blocks = s.st_blocks;
131 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
132 }
133 }
134
135 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
136 }
137
138 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
139}
140
141//------------------------------------------------------------------------------
142
143File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
144{
145 File *file = new File(path, offset, fileSize);
146 if ( ! file->Open(inputIO))
147 {
148 delete file;
149 file = 0;
150 }
151 return file;
152}
153
154//------------------------------------------------------------------------------
155
157{
158 // Called from Cache::Unlink() when the file is currently open.
159 // Cache::Unlink is also called on FSync error and when wrong number of bytes
160 // is received from a remote read.
161 //
162 // From this point onward the file will not be written to, cinfo file will
163 // not be updated, and all new read requests will return -ENOENT.
164 //
165 // File's entry in the Cache's active map is set to nullptr and will be
166 // removed from there shortly, in any case, well before this File object
167 // shuts down. Cache::Unlink() also reports the appropriate purge event.
168
169 XrdSysCondVarHelper _lck(m_state_cond);
170
171 m_in_shutdown = true;
172
173 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
174 {
175 m_prefetch_state = kStopped;
176 cache()->DeRegisterPrefetchFile(this);
177 }
178
179 report_and_merge_delta_stats();
180
181 return m_st_blocks;
182}
183
184//------------------------------------------------------------------------------
185
186void File::check_delta_stats()
187{
188 // Called under m_state_cond lock.
189 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
190 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
191 report_and_merge_delta_stats();
192}
193
194void File::report_and_merge_delta_stats()
195{
196 // Called under m_state_cond lock.
197 struct stat s;
198 m_data_file->Fstat(&s);
199 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
200 // aggressive pre-allocation in this field (XFS, 4GB).
201 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
202 : m_file_size >> 9;
203 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
204 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
205 m_st_blocks = st_blocks_to_report;
206 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
207 m_stats.AddUp(m_delta_stats);
208 m_delta_stats.Reset();
209}
210
211//------------------------------------------------------------------------------
212
214{
215 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
216
217 XrdSysCondVarHelper _lck(m_state_cond);
218 dec_ref_count(b);
219}
220
221void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
222{
223 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
224
225 XrdSysCondVarHelper _lck(m_state_cond);
226
227 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
228 {
229 dec_ref_count(*i);
230 }
231}
232
233//------------------------------------------------------------------------------
234
236{
237 std::string loc(io->GetLocation());
238 XrdSysCondVarHelper _lck(m_state_cond);
239 insert_remote_location(loc);
240}
241
242//------------------------------------------------------------------------------
243
245{
246 // Returns true if delay is needed.
247
248 TRACEF(Debug, "ioActive start for io " << io);
249
250 std::string loc(io->GetLocation());
251
252 {
253 XrdSysCondVarHelper _lck(m_state_cond);
254
255 IoSet_i mi = m_io_set.find(io);
256
257 if (mi != m_io_set.end())
258 {
259 unsigned int n_active_reads = io->m_active_read_reqs;
260
261 TRACE(Info, "ioActive for io " << io <<
262 ", active_reads " << n_active_reads <<
263 ", active_prefetches " << io->m_active_prefetches <<
264 ", allow_prefetching " << io->m_allow_prefetching <<
265 ", ios_in_detach " << m_ios_in_detach);
266 TRACEF(Info,
267 "\tio_map.size() " << m_io_set.size() <<
268 ", block_map.size() " << m_block_map.size() << ", file");
269
270 insert_remote_location(loc);
271
272 io->m_allow_prefetching = false;
273 io->m_in_detach = true;
274
275 // Check if any IO is still available for prfetching. If not, stop it.
276 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
277 {
278 if ( ! select_current_io_or_disable_prefetching(false) )
279 {
280 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
281 }
282 }
283
284 // On last IO, consider write queue blocks. Note, this also contains
285 // blocks being prefetched.
286
287 bool io_active_result;
288
289 if (n_active_reads > 0)
290 {
291 io_active_result = true;
292 }
293 else if (m_io_set.size() - m_ios_in_detach == 1)
294 {
295 io_active_result = ! m_block_map.empty();
296 }
297 else
298 {
299 io_active_result = io->m_active_prefetches > 0;
300 }
301
302 if ( ! io_active_result)
303 {
304 ++m_ios_in_detach;
305 }
306
307 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
308
309 return io_active_result;
310 }
311 else
312 {
313 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
314 return false;
315 }
316 }
317}
318
319//------------------------------------------------------------------------------
320
322{
323 XrdSysCondVarHelper _lck(m_state_cond);
324 m_detach_time_logged = false;
325}
326
328{
329 // Returns true if sync is required.
330 // This method is called after corresponding IO is detached from PosixCache.
331
332 XrdSysCondVarHelper _lck(m_state_cond);
333 if ( ! m_in_shutdown)
334 {
335 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
336 {
337 report_and_merge_delta_stats();
338 m_cfi.WriteIOStatDetach(m_stats);
339 m_detach_time_logged = true;
340 m_in_sync = true;
341 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
342 return true;
343 }
344 }
345 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
346 return false;
347}
348
349//------------------------------------------------------------------------------
350
352{
353 // Called from Cache::GetFile() when a new IO asks for the file.
354
355 TRACEF(Debug, "AddIO() io = " << (void*)io);
356
357 time_t now = time(0);
358 std::string loc(io->GetLocation());
359
360 m_state_cond.Lock();
361
362 IoSet_i mi = m_io_set.find(io);
363
364 if (mi == m_io_set.end())
365 {
366 m_io_set.insert(io);
367 io->m_attach_time = now;
368 m_delta_stats.IoAttach();
369
370 insert_remote_location(loc);
371
372 if (m_prefetch_state == kStopped)
373 {
374 m_prefetch_state = kOn;
375 cache()->RegisterPrefetchFile(this);
376 }
377 }
378 else
379 {
380 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
381 }
382
383 m_state_cond.UnLock();
384}
385
386//------------------------------------------------------------------------------
387
389{
390 // Called from Cache::ReleaseFile.
391
392 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
393
394 time_t now = time(0);
395
396 m_state_cond.Lock();
397
398 IoSet_i mi = m_io_set.find(io);
399
400 if (mi != m_io_set.end())
401 {
402 if (mi == m_current_io)
403 {
404 ++m_current_io;
405 }
406
407 m_delta_stats.IoDetach(now - io->m_attach_time);
408 m_io_set.erase(mi);
409 --m_ios_in_detach;
410
411 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
412 {
413 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
414 m_prefetch_state = kStopped;
415 cache()->DeRegisterPrefetchFile(this);
416 }
417 }
418 else
419 {
420 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
421 }
422
423 m_state_cond.UnLock();
424}
425
426//------------------------------------------------------------------------------
427
428bool File::Open(XrdOucCacheIO *inputIO)
429{
430 // Sets errno accordingly.
431
432 static const char *tpfx = "Open() ";
433
434 TRACEF(Dump, tpfx << "entered");
435
436 // Before touching anything, check with ResourceMonitor if a scan is in progress.
437 // This function will wait internally if needed until it is safe to proceed.
438 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
439
441
442 XrdOss &myOss = * Cache::GetInstance().GetOss();
443 const char *myUser = conf.m_username.c_str();
444 XrdOucEnv myEnv;
445 struct stat data_stat, info_stat;
446
447 std::string ifn = m_filename + Info::s_infoExtension;
448
449 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
450 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
451
452 // Create the data file itself.
453 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
454 myEnv.Put("oss.asize", size_str);
455 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
456
457 int res;
458
459 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
460 {
461 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
462 errno = -res;
463 return false;
464 }
465
466 m_data_file = myOss.newFile(myUser);
467 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
468 {
469 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
470 errno = -res;
471 delete m_data_file; m_data_file = 0;
472 return false;
473 }
474
475 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
476 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
477 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
478 {
479 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
480 errno = -res;
481 m_data_file->Close(); delete m_data_file; m_data_file = 0;
482 return false;
483 }
484
485 m_info_file = myOss.newFile(myUser);
486 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
487 {
488 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
489 errno = -res;
490 delete m_info_file; m_info_file = 0;
491 m_data_file->Close(); delete m_data_file; m_data_file = 0;
492 return false;
493 }
494
495 bool initialize_info_file = true;
496
497 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
498 {
499 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
500 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
501 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
502 ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
503
504 // Check if data file exists and is of reasonable size.
505 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
506 {
507 initialize_info_file = false;
508 } else {
509 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
510 m_cfi.ResetAllAccessStats();
511 m_data_file->Ftruncate(0);
512 // data-file might not have existed at entry -- data_stat is then undefined
513 if (data_existed)
514 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
515 }
516 }
517
518 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
519 {
520 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
521 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
522 {
523 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
524 initialize_info_file = true;
525 m_cfi.ResetAllAccessStats();
526 m_data_file->Ftruncate(0);
527 // data-file is known to exist due to checks in the previous if block
528 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
529 } else {
530 // TODO: If the file is complete, we don't need to reset net cksums.
531 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
532 }
533 }
534
535 // Check if we have pfc url arguments.
536 long long pfc_blocksize = conf.m_bufferSize;
537 int pfc_prefetch = conf.m_prefetch_max_blocks;
539 {
540 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
541 }
542
543 if (initialize_info_file)
544 {
545 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
546 m_cfi.SetCkSumState(conf.get_cs_Chk());
547 m_cfi.ResetNoCkSumTime();
548 m_cfi.Write(m_info_file, ifn.c_str());
549 m_info_file->Fsync();
550 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
551
552 if (cache()->RefConfiguration().m_httpcc)
553 {
554 std::string responseFctl;
555 int resFctl = inputIO->Fcntl(XrdOucCacheOp::Code::QFinfo, "head", responseFctl);
556 if (resFctl == 0)
557 {
558 std::string cc_str = responseFctl;
559 nlohmann::json cc_json = nlohmann::json::parse(cc_str);
560 if (cc_json.contains("max-age"))
561 {
562 time_t ma = cc_json["max-age"];
563 ma += time(NULL);
564 cc_json["expire"] = ma;
565 cc_str = cc_json.dump();
566 }
567 TRACE(Error, "GetFile() XrdCl::File::Fcntl value " << cc_str);
568 cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str);
569 }
570 else if (resFctl != kXR_Unsupported)
571 {
572 TRACE(Error, "GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::FInfo failed " << inputIO->Path());
573 }
574 }
575
576 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size <<
577 " num blocks = " << m_cfi.GetNBlocks() <<
578 " block size = " << pfc_blocksize);
579 }
580 else
581 {
582 if (futimens(m_info_file->getFD(), NULL)) {
583 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
584 }
585 if (pfc_blocksize != conf.m_bufferSize) {
586 TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
587 }
588 }
589
590 m_cfi.WriteIOStatAttach();
591 m_state_cond.Lock();
592 m_block_size = m_cfi.GetBufferSize();
593 m_num_blocks = m_cfi.GetNBlocks();
594 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
595 m_prefetch_max_blocks_in_flight = pfc_prefetch;
596 if (pfc_prefetch != conf.m_prefetch_max_blocks)
597 TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
598
599 m_data_file->Fstat(&data_stat);
600 m_st_blocks = data_stat.st_blocks;
601
602 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
603 constexpr long long MB = 1024 * 1024;
604 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
605 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
606 // actual threshold based on return values from register_file_update_stats().
607
608 m_state_cond.UnLock();
609
610 return true;
611}
612
613void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
614{
615 const Configuration &conf = Cache::TheOne().RefConfiguration();
616
617 XrdCl::URL url(inputIO->Path());
618 auto const & urlp = url.GetParams();
619
620 auto extract = [&](const std::string &key, std::string &value) -> bool {
621 auto it = urlp.find(key);
622 if (it != urlp.end()) {
623 value = it->second;
624 return true;
625 } else {
626 value.clear();
627 return false;
628 }
629 };
630
631 std::string val;
632 if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
633 {
634 const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
635 long long bsize;
636 if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
638 {
639 pfc_blocksize = bsize;
640 } else {
641 TRACEF(Error, tpfx << "Error processing the parameter.");
642 }
643 }
644 if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
645 {
646 const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
647 int pref;
648 if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
650 {
651 pfc_prefetch = pref;
652 } else {
653 TRACEF(Error, tpfx << "Error processing the parameter.");
654 }
655 }
656}
657
658//------------------------------------------------------------------------------
659
660int File::Fstat(struct stat &sbuff)
661{
662 // Stat on an open file.
663 // Corrects size to actual full size of the file.
664 // Sets atime to 0 if the file is only partially downloaded, in accordance
665 // with pfc.onlyifcached settings.
666 // Called from IO::Fstat() and Cache::Stat() when the file is active.
667 // Returns 0 on success, -errno on error.
668
669 int res;
670
671 if ((res = m_data_file->Fstat(&sbuff))) return res;
672
673 sbuff.st_size = m_file_size;
674
675 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
676 if ( ! is_cached)
677 sbuff.st_atime = 0;
678
679 return 0;
680}
681
682//==============================================================================
683// Read and helpers
684//==============================================================================
685
686bool File::overlap(int blk, // block to query
687 long long blk_size, //
688 long long req_off, // offset of user request
689 int req_size, // size of user request
690 // output:
691 long long &off, // offset in user buffer
692 long long &blk_off, // offset in block
693 int &size) // size to copy
694{
695 const long long beg = blk * blk_size;
696 const long long end = beg + blk_size;
697 const long long req_end = req_off + req_size;
698
699 if (req_off < end && req_end > beg)
700 {
701 const long long ovlp_beg = std::max(beg, req_off);
702 const long long ovlp_end = std::min(end, req_end);
703
704 off = ovlp_beg - req_off;
705 blk_off = ovlp_beg - beg;
706 size = (int) (ovlp_end - ovlp_beg);
707
708 assert(size <= blk_size);
709 return true;
710 }
711 else
712 {
713 return false;
714 }
715}
716
717//------------------------------------------------------------------------------
718
719Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
720{
721 // Must be called w/ state_cond locked.
722 // Checks on size etc should be done before.
723 //
724 // Reference count is 0 so increase it in calling function if you want to
725 // catch the block while still in memory.
726
727 const long long off = i * m_block_size;
728 const int last_block = m_num_blocks - 1;
729 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
730
731 int blk_size, req_size;
732 if (i == last_block) {
733 blk_size = req_size = m_file_size - off;
734 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
735 } else {
736 blk_size = req_size = m_block_size;
737 }
738
739 Block *b = 0;
740 char *buf = cache()->RequestRAM(req_size);
741
742 if (buf)
743 {
744 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
745
746 if (b)
747 {
748 m_block_map[i] = b;
749
750 // Actual Read request is issued in ProcessBlockRequests().
751
752 if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
753 {
754 m_prefetch_state = kHold;
755 cache()->DeRegisterPrefetchFile(this);
756 }
757 }
758 else
759 {
760 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
761 }
762 }
763
764 return b;
765}
766
767void File::ProcessBlockRequest(Block *b)
768{
769 // This *must not* be called with block_map locked.
770
772
773 if (XRD_TRACE What >= TRACE_Dump) {
774 char buf[256];
775 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
776 b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
777 TRACEF(Dump, "ProcessBlockRequest() " << buf);
778 }
779
780 if (b->req_cksum_net())
781 {
782 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
783 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
784 } else {
785 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
786 }
787}
788
789void File::ProcessBlockRequests(BlockList_t& blks)
790{
791 // This *must not* be called with block_map locked.
792
793 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
794 {
795 ProcessBlockRequest(*bi);
796 }
797}
798
799//------------------------------------------------------------------------------
800
801void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
802{
803 int n_chunks = ioVec.size();
804 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
805
806 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
807 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
808
809 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
810
811 int pos = 0;
812 while (n_chunks > XrdProto::maxRvecsz) {
813 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
814 pos += XrdProto::maxRvecsz;
815 n_chunks -= XrdProto::maxRvecsz;
816 }
817 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
818}
819
820//------------------------------------------------------------------------------
821
822int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
823{
824 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
825
826 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
827
828 if (rs < 0)
829 {
830 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
831 return rs;
832 }
833
834 if (rs != expected_size)
835 {
836 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
837 return -EIO;
838 }
839
840 return (int) rs;
841}
842
843//------------------------------------------------------------------------------
844
845int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
846{
847 // rrc_func is ONLY called from async processing.
848 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
849 // This streamlines implementation of synchronous IO::Read().
850
851 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
852
853 m_state_cond.Lock();
854
855 if (m_in_shutdown || io->m_in_detach)
856 {
857 m_state_cond.UnLock();
858 return m_in_shutdown ? -ENOENT : -EBADF;
859 }
860
861 // Shortcut -- file is fully downloaded.
862
863 if (m_cfi.IsComplete())
864 {
865 m_state_cond.UnLock();
866 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
867 if (ret > 0) {
868 XrdSysCondVarHelper _lck(m_state_cond);
869 m_delta_stats.AddBytesHit(ret);
870 check_delta_stats();
871 }
872 return ret;
873 }
874
875 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
876
877 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
878}
879
880//------------------------------------------------------------------------------
881
882int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
883{
884 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
885
886 m_state_cond.Lock();
887
888 if (m_in_shutdown || io->m_in_detach)
889 {
890 m_state_cond.UnLock();
891 return m_in_shutdown ? -ENOENT : -EBADF;
892 }
893
894 // Shortcut -- file is fully downloaded.
895
896 if (m_cfi.IsComplete())
897 {
898 m_state_cond.UnLock();
899 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
900 if (ret > 0) {
901 XrdSysCondVarHelper _lck(m_state_cond);
902 m_delta_stats.AddBytesHit(ret);
903 check_delta_stats();
904 }
905 return ret;
906 }
907
908 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
909}
910
911//------------------------------------------------------------------------------
912
913int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
914 ReadReqRH *rh, const char *tpfx)
915{
916 // Non-trivial processing for Read and ReadV.
917 // Entered under lock.
918 //
919 // loop over reqired blocks:
920 // - if on disk, ok;
921 // - if in ram or incoming, inc ref-count
922 // - otherwise request and inc ref count (unless RAM full => request direct)
923 // unlock
924
925 int prefetch_cnt = 0;
926
927 ReadRequest *read_req = nullptr;
928 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
929
930 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
931
932 std::vector<XrdOucIOVec> iovec_disk;
933 std::vector<XrdOucIOVec> iovec_direct;
934 int iovec_disk_total = 0;
935 int iovec_direct_total = 0;
936
937 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
938 {
939 const XrdOucIOVec &iov = readV[iov_idx];
940 long long iUserOff = iov.offset;
941 int iUserSize = iov.size;
942 char *iUserBuff = iov.data;
943
944 const int idx_first = iUserOff / m_block_size;
945 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
946
947 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
948
949 enum LastBlock_e { LB_other, LB_disk, LB_direct };
950
951 LastBlock_e lbe = LB_other;
952
953 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
954 {
955 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
956 BlockMap_i bi = m_block_map.find(block_idx);
957
958 // overlap and read
959 long long off; // offset in user buffer
960 long long blk_off; // offset in block
961 int size; // size to copy
962
963 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
964
965 // In RAM or incoming?
966 if (bi != m_block_map.end())
967 {
968 inc_ref_count(bi->second);
969 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
970
971 if (bi->second->is_finished())
972 {
973 // note, blocks with error should not be here !!!
974 // they should be either removed or reissued in ProcessBlockResponse()
975 assert(bi->second->is_ok());
976
977 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
978
979 if (bi->second->m_prefetch)
980 ++prefetch_cnt;
981 }
982 else
983 {
984 if ( ! read_req)
985 read_req = new ReadRequest(io, rh);
986
987 // We have a lock on state_cond --> as we register the request before releasing the lock,
988 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
989
990 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
991 ++read_req->m_n_chunk_reqs;
992 }
993
994 lbe = LB_other;
995 }
996 // On disk?
997 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
998 {
999 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
1000
1001 if (lbe == LB_disk)
1002 iovec_disk.back().size += size;
1003 else
1004 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
1005 iovec_disk_total += size;
1006
1007 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
1008 ++prefetch_cnt;
1009
1010 lbe = LB_disk;
1011 }
1012 // Neither ... then we have to go get it ...
1013 else
1014 {
1015 if ( ! read_req)
1016 read_req = new ReadRequest(io, rh);
1017
1018 // Is there room for one more RAM Block?
1019 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
1020 if (b)
1021 {
1022 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
1023 inc_ref_count(b);
1024 blks_to_request.push_back(b);
1025
1026 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1027 ++read_req->m_n_chunk_reqs;
1028
1029 lbe = LB_other;
1030 }
1031 else // Nope ... read this directly without caching.
1032 {
1033 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1034
1035 iovec_direct_total += size;
1036 read_req->m_direct_done = false;
1037
1038 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1039 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1040 // is determined in the RequestBlocksDirect().
1041 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1042 iovec_direct.back().size += size;
1043 } else {
1044 long long in_offset = block_idx * m_block_size + blk_off;
1045 char *out_pos = iUserBuff + off;
1046 while (size > XrdProto::maxRVdsz) {
1047 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1048 in_offset += XrdProto::maxRVdsz;
1049 out_pos += XrdProto::maxRVdsz;
1050 size -= XrdProto::maxRVdsz;
1051 }
1052 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1053 }
1054
1055 lbe = LB_direct;
1056 }
1057 }
1058 } // end for over blocks in an IOVec
1059 } // end for over readV IOVec
1060
1061 inc_prefetch_hit_cnt(prefetch_cnt);
1062
1063 m_state_cond.UnLock();
1064
1065 // First, send out remote requests for new blocks.
1066 if ( ! blks_to_request.empty())
1067 {
1068 ProcessBlockRequests(blks_to_request);
1069 blks_to_request.clear();
1070 }
1071
1072 // Second, send out remote direct read requests.
1073 if ( ! iovec_direct.empty())
1074 {
1075 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1076
1077 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1078 }
1079
1080 // Begin synchronous part where we process data that is already in RAM or on disk.
1081
1082 long long bytes_read = 0;
1083 int error_cond = 0; // to be set to -errno
1084
1085 // Third, process blocks that are available in RAM.
1086 if ( ! blks_ready.empty())
1087 {
1088 for (auto &bvi : blks_ready)
1089 {
1090 for (auto &cr : bvi.second)
1091 {
1092 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1093 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1094 bytes_read += cr.m_size;
1095 }
1096 }
1097 }
1098
1099 // Fourth, read blocks from disk.
1100 if ( ! iovec_disk.empty())
1101 {
1102 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1103 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1104 if (rc >= 0)
1105 {
1106 bytes_read += rc;
1107 }
1108 else
1109 {
1110 error_cond = rc;
1111 TRACEF(Error, tpfx << "failed read from disk");
1112 }
1113 }
1114
1115 // End synchronous part -- update with sync stats and determine actual state of this read.
1116 // Note: remote reads might have already finished during disk-read!
1117
1118 m_state_cond.Lock();
1119
1120 for (auto &bvi : blks_ready)
1121 dec_ref_count(bvi.first, (int) bvi.second.size());
1122
1123 if (read_req)
1124 {
1125 read_req->m_bytes_read += bytes_read;
1126 if (error_cond)
1127 read_req->update_error_cond(error_cond);
1128 read_req->m_stats.m_BytesHit += bytes_read;
1129 read_req->m_sync_done = true;
1130
1131 if (read_req->is_complete())
1132 {
1133 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1134 m_delta_stats.AddReadStats(read_req->m_stats);
1135 check_delta_stats();
1136 m_state_cond.UnLock();
1137
1138 int ret = read_req->return_value();
1139 delete read_req;
1140 return ret;
1141 }
1142 else
1143 {
1144 m_state_cond.UnLock();
1145 return -EWOULDBLOCK;
1146 }
1147 }
1148 else
1149 {
1150 m_delta_stats.m_BytesHit += bytes_read;
1151 check_delta_stats();
1152 m_state_cond.UnLock();
1153
1154 // !!! No callout.
1155
1156 return error_cond ? error_cond : bytes_read;
1157 }
1158}
1159
1160
1161//==============================================================================
1162// WriteBlock and Sync
1163//==============================================================================
1164
1166{
1167 // write block buffer into disk file
1168 long long offset = b->m_offset - m_offset;
1169 long long size = b->get_size();
1170 ssize_t retval;
1171
1172 if (m_cfi.IsCkSumCache())
1173 if (b->has_cksums())
1174 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1175 else
1176 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1177 else
1178 retval = m_data_file->Write(b->get_buff(), offset, size);
1179
1180 if (retval < size)
1181 {
1182 if (retval < 0) {
1183 TRACEF(Error, "WriteToDisk() write error " << retval);
1184 } else {
1185 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1186 }
1187
1188 XrdSysCondVarHelper _lck(m_state_cond);
1189
1190 dec_ref_count(b);
1191
1192 return;
1193 }
1194
1195 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1196
1197 // Set written bit.
1198 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1199
1200 bool schedule_sync = false;
1201 {
1202 XrdSysCondVarHelper _lck(m_state_cond);
1203
1204 m_cfi.SetBitWritten(blk_idx);
1205
1206 if (b->m_prefetch)
1207 {
1208 m_cfi.SetBitPrefetch(blk_idx);
1209 }
1210 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1211 {
1212 m_cfi.ResetCkSumNet();
1213 }
1214
1215 // Set synced bit or stash block index if in actual sync.
1216 // Synced state is only written out to cinfo file when data file is synced.
1217 if (m_in_sync)
1218 {
1219 m_writes_during_sync.push_back(blk_idx);
1220 }
1221 else
1222 {
1223 m_cfi.SetBitSynced(blk_idx);
1224 ++m_non_flushed_cnt;
1225 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1226 ! m_in_shutdown)
1227 {
1228 schedule_sync = true;
1229 m_in_sync = true;
1230 m_non_flushed_cnt = 0;
1231 }
1232 }
1233 // As soon as the reference count is decreased on the block, the
1234 // file object may be deleted. Thus, to avoid holding both locks at a time,
1235 // we defer the ref count decrease until later if a sync is needed
1236 if (!schedule_sync) {
1237 dec_ref_count(b);
1238 }
1239 }
1240
1241 if (schedule_sync)
1242 {
1243 cache()->ScheduleFileSync(this);
1244 XrdSysCondVarHelper _lck(m_state_cond);
1245 dec_ref_count(b);
1246 }
1247}
1248
1249//------------------------------------------------------------------------------
1250
1252{
1253 TRACEF(Dump, "Sync()");
1254
1255 int ret = m_data_file->Fsync();
1256 bool errorp = false;
1257 if (ret == XrdOssOK)
1258 {
1259 Stats loc_stats;
1260 {
1261 XrdSysCondVarHelper _lck(&m_state_cond);
1262 report_and_merge_delta_stats();
1263 loc_stats = m_stats;
1264 }
1265 m_cfi.WriteIOStat(loc_stats);
1266 m_cfi.Write(m_info_file, m_filename.c_str());
1267 int cret = m_info_file->Fsync();
1268 if (cret != XrdOssOK)
1269 {
1270 TRACEF(Error, "Sync cinfo file sync error " << cret);
1271 errorp = true;
1272 }
1273 }
1274 else
1275 {
1276 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1277 errorp = true;
1278 }
1279
1280 if (errorp)
1281 {
1282 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1283
1284 // Unlink will also call this->initiate_emergency_shutdown()
1285 Cache::GetInstance().UnlinkFile(m_filename, false);
1286
1287 XrdSysCondVarHelper _lck(&m_state_cond);
1288
1289 m_writes_during_sync.clear();
1290 m_in_sync = false;
1291
1292 return;
1293 }
1294
1295 int written_while_in_sync;
1296 bool resync = false;
1297 {
1298 XrdSysCondVarHelper _lck(&m_state_cond);
1299 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1300 {
1301 m_cfi.SetBitSynced(*i);
1302 }
1303 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1304 m_writes_during_sync.clear();
1305
1306 // If there were writes during sync and the file is now complete,
1307 // let us call Sync again without resetting the m_in_sync flag.
1308 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1309 resync = true;
1310 else
1311 m_in_sync = false;
1312 }
1313 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1314
1315 if (resync)
1316 Sync();
1317}
1318
1319
1320//==============================================================================
1321// Block processing
1322//==============================================================================
1323
1324void File::free_block(Block* b)
1325{
1326 // Method always called under lock.
1327 int i = b->m_offset / m_block_size;
1328 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1329 size_t ret = m_block_map.erase(i);
1330 if (ret != 1)
1331 {
1332 // assert might be a better option than a warning
1333 TRACEF(Error, "free_block did not erase " << i << " from map");
1334 }
1335 else
1336 {
1337 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1338 delete b;
1339 }
1340
1341 if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1342 {
1343 m_prefetch_state = kOn;
1344 cache()->RegisterPrefetchFile(this);
1345 }
1346}
1347
1348//------------------------------------------------------------------------------
1349
1350bool File::select_current_io_or_disable_prefetching(bool skip_current)
1351{
1352 // Method always called under lock. It also expects prefetch to be active.
1353
1354 int io_size = (int) m_io_set.size();
1355 bool io_ok = false;
1356
1357 if (io_size == 1)
1358 {
1359 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1360 if (io_ok)
1361 {
1362 m_current_io = m_io_set.begin();
1363 }
1364 }
1365 else if (io_size > 1)
1366 {
1367 IoSet_i mi = m_current_io;
1368 if (skip_current && mi != m_io_set.end()) ++mi;
1369
1370 for (int i = 0; i < io_size; ++i)
1371 {
1372 if (mi == m_io_set.end()) mi = m_io_set.begin();
1373
1374 if ((*mi)->m_allow_prefetching)
1375 {
1376 m_current_io = mi;
1377 io_ok = true;
1378 break;
1379 }
1380 ++mi;
1381 }
1382 }
1383
1384 if ( ! io_ok)
1385 {
1386 m_current_io = m_io_set.end();
1387 m_prefetch_state = kStopped;
1388 cache()->DeRegisterPrefetchFile(this);
1389 }
1390
1391 return io_ok;
1392}
1393
1394//------------------------------------------------------------------------------
1395
1396void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1397{
1398 // Called from DirectResponseHandler.
1399 // NOT under lock.
1400
1401 if (error_cond)
1402 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1403
1404 m_state_cond.Lock();
1405
1406 if (error_cond)
1407 rreq->update_error_cond(error_cond);
1408 else {
1409 rreq->m_stats.m_BytesBypassed += bytes_read;
1410 rreq->m_bytes_read += bytes_read;
1411 }
1412
1413 rreq->m_direct_done = true;
1414
1415 bool rreq_complete = rreq->is_complete();
1416
1417 m_state_cond.UnLock();
1418
1419 if (rreq_complete)
1420 FinalizeReadRequest(rreq);
1421}
1422
1423void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1424{
1425 // Called from ProcessBlockResponse().
1426 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1427 // Does not manage m_read_req.
1428 // Will not complete the request.
1429
1430 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1431 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1432
1433 rreq->update_error_cond(b->get_error());
1434 --rreq->m_n_chunk_reqs;
1435
1436 dec_ref_count(b);
1437}
1438
1439void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1440{
1441 // Called from ProcessBlockResponse().
1442 // NOT under lock as it does memcopy ofor exisf block data.
1443 // Acquires lock for block, m_read_req and rreq state update.
1444
1445 ReadRequest *rreq = creq.m_read_req;
1446
1447 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1448 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1449
1450 m_state_cond.Lock();
1451
1452 rreq->m_bytes_read += creq.m_size;
1453
1454 if (b->get_req_id() == (void*) rreq)
1455 rreq->m_stats.m_BytesMissed += creq.m_size;
1456 else
1457 rreq->m_stats.m_BytesHit += creq.m_size;
1458
1459 --rreq->m_n_chunk_reqs;
1460
1461 if (b->m_prefetch)
1462 inc_prefetch_hit_cnt(1);
1463
1464 dec_ref_count(b);
1465
1466 bool rreq_complete = rreq->is_complete();
1467
1468 m_state_cond.UnLock();
1469
1470 if (rreq_complete)
1471 FinalizeReadRequest(rreq);
1472}
1473
1474void File::FinalizeReadRequest(ReadRequest *rreq)
1475{
1476 // called from ProcessBlockResponse()
1477 // NOT under lock -- does callout
1478 {
1479 XrdSysCondVarHelper _lck(m_state_cond);
1480 m_delta_stats.AddReadStats(rreq->m_stats);
1481 check_delta_stats();
1482 }
1483
1484 rreq->m_rh->Done(rreq->return_value());
1485 delete rreq;
1486}
1487
1488void File::ProcessBlockResponse(Block *b, int res)
1489{
1490 static const char* tpfx = "ProcessBlockResponse ";
1491
1492 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1493
1494 if (res >= 0 && res != b->get_size())
1495 {
1496 // Incorrect number of bytes received, apparently size of the file on the remote
1497 // is different than what the cache expects it to be.
1498 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1499 Cache::GetInstance().UnlinkFile(m_filename, false);
1500 }
1501
1502 m_state_cond.Lock();
1503
1504 // Deregister block from IO's prefetch count, if needed.
1505 if (b->m_prefetch)
1506 {
1507 IO *io = b->get_io();
1508 IoSet_i mi = m_io_set.find(io);
1509 if (mi != m_io_set.end())
1510 {
1511 --io->m_active_prefetches;
1512
1513 // If failed and IO is still prefetching -- disable prefetching on this IO.
1514 if (res < 0 && io->m_allow_prefetching)
1515 {
1516 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1517 io->m_allow_prefetching = false;
1518
1519 // Check if any IO is still available for prfetching. If not, stop it.
1520 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1521 {
1522 if ( ! select_current_io_or_disable_prefetching(false) )
1523 {
1524 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1525 }
1526 }
1527 }
1528
1529 // If failed with no subscribers -- delete the block and exit.
1530 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1531 {
1532 free_block(b);
1533 m_state_cond.UnLock();
1534 return;
1535 }
1536 m_prefetch_bytes += b->get_size();
1537 }
1538 else
1539 {
1540 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1541 }
1542 }
1543
1544 if (res == b->get_size())
1545 {
1546 b->set_downloaded();
1547 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1548 if ( ! m_in_shutdown)
1549 {
1550 // Increase ref-count for the writer.
1551 inc_ref_count(b);
1552 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1553 // No check for writes, report-and-merge forced during Sync().
1554 cache()->AddWriteTask(b, true);
1555 }
1556
1557 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1558 vChunkRequest_t creqs_to_notify;
1559 creqs_to_notify.swap( b->m_chunk_reqs );
1560
1561 m_state_cond.UnLock();
1562
1563 for (auto &creq : creqs_to_notify)
1564 {
1565 ProcessBlockSuccess(b, creq);
1566 }
1567 }
1568 else
1569 {
1570 if (res < 0) {
1571 bool new_error = b->get_io()->register_block_error(res);
1572 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1573 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1574 << ", io=" << b->get_io() << ", error=" << res);
1575 } else {
1576 bool first_p = b->get_io()->register_incomplete_read();
1577 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1578 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1579 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1580#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1581 res = -EIO;
1582#else
1583 res = -EREMOTEIO;
1584#endif
1585 }
1586 b->set_error(res);
1587
1588 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1589 // Collect others with a different IO, the first of them will be used to reissue the request.
1590 // This is then done outside of lock.
1591 std::list<ReadRequest*> rreqs_to_complete;
1592 vChunkRequest_t creqs_to_keep;
1593
1594 for(ChunkRequest &creq : b->m_chunk_reqs)
1595 {
1596 ReadRequest *rreq = creq.m_read_req;
1597
1598 if (rreq->m_io == b->get_io())
1599 {
1600 ProcessBlockError(b, rreq);
1601 if (rreq->is_complete())
1602 {
1603 rreqs_to_complete.push_back(rreq);
1604 }
1605 }
1606 else
1607 {
1608 creqs_to_keep.push_back(creq);
1609 }
1610 }
1611
1612 bool reissue = false;
1613 if ( ! creqs_to_keep.empty())
1614 {
1615 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1616
1617 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1618 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1619
1620 b->reset_error_and_set_io(rreq->m_io, rreq);
1621 b->m_chunk_reqs.swap( creqs_to_keep );
1622 reissue = true;
1623 }
1624
1625 m_state_cond.UnLock();
1626
1627 for (auto rreq : rreqs_to_complete)
1628 FinalizeReadRequest(rreq);
1629
1630 if (reissue)
1631 ProcessBlockRequest(b);
1632 }
1633}
1634
1635//------------------------------------------------------------------------------
1636
1637const char* File::lPath() const
1638{
1639 return m_filename.c_str();
1640}
1641
1642//------------------------------------------------------------------------------
1643
1644int File::offsetIdx(int iIdx) const
1645{
1646 return iIdx - m_offset/m_block_size;
1647}
1648
1649
1650//------------------------------------------------------------------------------
1651
1653{
1654 // Check that block is not on disk and not in RAM.
1655 // TODO: Could prefetch several blocks at once!
1656 // blks_max could be an argument
1657
1658 BlockList_t blks;
1659
1660 TRACEF(DumpXL, "Prefetch() entering.");
1661 {
1662 XrdSysCondVarHelper _lck(m_state_cond);
1663
1664 if (m_prefetch_state != kOn)
1665 {
1666 return;
1667 }
1668
1669 if ( ! select_current_io_or_disable_prefetching(true) )
1670 {
1671 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1672 return;
1673 }
1674
1675 // Select block(s) to fetch.
1676 for (int f = 0; f < m_num_blocks; ++f)
1677 {
1678 if ( ! m_cfi.TestBitWritten(f))
1679 {
1680 int f_act = f + m_offset / m_block_size;
1681
1682 BlockMap_i bi = m_block_map.find(f_act);
1683 if (bi == m_block_map.end())
1684 {
1685 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1686 if (b)
1687 {
1688 TRACEF(Dump, "Prefetch take block " << f_act);
1689 blks.push_back(b);
1690 // Note: block ref_cnt not increased, it will be when placed into write queue.
1691
1692 inc_prefetch_read_cnt(1);
1693 }
1694 else
1695 {
1696 // This shouldn't happen as prefetching stops when RAM is 70% full.
1697 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1698 }
1699 break;
1700 }
1701 }
1702 }
1703
1704 if (blks.empty())
1705 {
1706 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1707 m_prefetch_state = kComplete;
1708 cache()->DeRegisterPrefetchFile(this);
1709 }
1710 else
1711 {
1712 (*m_current_io)->m_active_prefetches += (int) blks.size();
1713 }
1714 }
1715
1716 if ( ! blks.empty())
1717 {
1718 ProcessBlockRequests(blks);
1719 }
1720}
1721
1722
1723//------------------------------------------------------------------------------
1724
1726{
1727 return m_prefetch_score;
1728}
1729
1731{
1732 return Cache::TheOne().GetLog();
1733}
1734
1736{
1737 return Cache::TheOne().GetTrace();
1738}
1739
1740void File::insert_remote_location(const std::string &loc)
1741{
1742 if ( ! loc.empty())
1743 {
1744 size_t p = loc.find_first_of('@');
1745 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1746 }
1747}
1748
1749std::string File::GetRemoteLocations() const
1750{
1751 std::string s;
1752 if ( ! m_remote_locations.empty())
1753 {
1754 size_t sl = 0;
1755 int nl = 0;
1756 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1757 {
1758 sl += i->size();
1759 }
1760 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1761 s = '[';
1762 int j = 1;
1763 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1764 {
1765 s += '"'; s += *i; s += '"';
1766 if (j < nl) s += ',';
1767 }
1768 s += ']';
1769 }
1770 else
1771 {
1772 s = "[]";
1773 }
1774 return s;
1775}
1776
1777//==============================================================================
1778//======================= RESPONSE HANDLERS ==============================
1779//==============================================================================
1780
1782{
1783 m_block->m_file->ProcessBlockResponse(m_block, res);
1784 delete this;
1785}
1786
1787//------------------------------------------------------------------------------
1788
1790{
1791 m_mutex.Lock();
1792
1793 int n_left = --m_to_wait;
1794
1795 if (res < 0) {
1796 if (m_errno == 0) m_errno = res; // store first reported error
1797 } else {
1798 m_bytes_read += res;
1799 }
1800
1801 m_mutex.UnLock();
1802
1803 if (n_left == 0)
1804 {
1805 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1806 delete this;
1807 }
1808}
@ kXR_Unsupported
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:54
#define XRDOSS_mkpath
Definition XrdOss.hh:526
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:164
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:228
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fcntl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:169
XrdSysError * GetLog() const
Definition XrdPfc.hh:304
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:225
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:139
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:136
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:305
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1264
static const Cache & TheOne()
Definition XrdPfc.cc:137
XrdOss * GetOss() const
Definition XrdPfc.hh:290
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:87
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:90
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:67
const char * GetLocation()
Definition XrdPfcIO.hh:41
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
XrdPosixStats Stats
static const int maxRVdsz
Definition XProtocol.hh:724
static const int maxRvecsz
Definition XProtocol.hh:722
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:66
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:118
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:119
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:82
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:122
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:75
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:115
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:84
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:91
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:110
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:117
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:92
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:120
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:90
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:121
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68