XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO.
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file.
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
 
void DeRegisterPrefetchFile (File *)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &)
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
 
void Purge ()
 Thread function invoked to scan and purge files from disk when needed.
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
 
char * RequestRAM (long long size)
 
void ResourceMonitorHeartBeat ()
 Thread function checking resource usage periodically.
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor.
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Unlink (const char *path)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
 
static CacheGetInstance ()
 Singleton access.
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.
 

Static Public Attributes

static XrdSchedulerschedP = 0
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
 
static const int optRW = 0x0004
 File is read/write (o/w read/only)
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 266 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 185 of file XrdPfc.cc.

185 :
186 XrdOucCache("pfc"),
187 m_env(env),
188 m_log(logger, "XrdPfc_"),
189 m_trace(new XrdSysTrace("XrdPfc", logger)),
190 m_traceID("Cache"),
191 m_oss(0),
192 m_gstream(0),
193 m_prefetch_condVar(0),
194 m_prefetch_enabled(false),
195 m_RAM_used(0),
196 m_RAM_write_queue(0),
197 m_RAM_std_size(0),
198 m_isClient(false),
199 m_in_purge(false),
200 m_active_cond(0),
201 m_stats_n_purge_cond(0),
202 m_fs_state(0),
203 m_last_scan_duration(0),
204 m_last_purge_duration(0),
205 m_spt_state(SPTS_Idle)
206{
207 // Default log level is Warning.
208 m_trace->What = 2;
209}

References XrdSysTrace::What.

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 253 of file XrdPfc.cc.

254{
255 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
256
257 {
258 XrdSysMutexHelper lock(&m_RAM_mutex);
259 m_RAM_write_queue += b->get_size();
260 }
261
262 m_writeQ.condVar.Lock();
263 if (fromRead)
264 m_writeQ.queue.push_back(b);
265 else
266 m_writeQ.queue.push_front(b);
267 m_writeQ.size++;
268 m_writeQ.condVar.Signal();
269 m_writeQ.condVar.UnLock();
270}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
std::string & GetLocalPath()

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 211 of file XrdPfc.cc.

212{
213 const char* tpfx = "Attach() ";
214
215 if (Cache::GetInstance().Decide(io))
216 {
217 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
218
219 IO *cio;
220
221 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
222 {
223 cio = new IOFileBlock(io, *this);
224 }
225 else
226 {
227 IOFile *iof = new IOFile(io, *this);
228
229 if ( ! iof->HasFile())
230 {
231 delete iof;
232 // TODO - redirect instead. But this is kind of an awkward place for it.
233 // errno is set during IOFile construction.
234 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
235 return io;
236 }
237
238 cio = iof;
239 }
240
241 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
242 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
243
244 return cio;
245 }
246 else
247 {
248 TRACE(Info, tpfx << "decision decline " << io->Path());
249 }
250 return io;
251}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:160
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:164
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO * XrdOucCache::Attach ( XrdOucCacheIO ioP,
int  opts = 0 
)
virtual

Obtain a new IO object that fronts existing XrdOucCacheIO.

Implements XrdOucCache.

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 162 of file XrdPfc.cc.

162{ return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::FPurgeState::CheckFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 273 of file XrdPfcConfiguration.cc.

274{
275 // Indicate whether or not we are a client instance
276 const char *theINS = getenv("XRDINSTANCE");
277 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
278
279 // Tell everyone else we are a caching proxy
280 XrdOucEnv::Export("XRDPFC", 1);
281
282 XrdOucEnv myEnv;
283 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
284
285 if (! config_filename || ! *config_filename)
286 {
287 TRACE(Error, "Config() configuration file not specified.");
288 return false;
289 }
290
291 int fd;
292 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
293 {
294 TRACE( Error, "Config() can't open configuration file " << config_filename);
295 return false;
296 }
297
298 Config.Attach(fd);
299 static const char *cvec[] = { "*** pfc plugin config:", 0 };
300 Config.Capture(cvec);
301
302 // Obtain OFS configurator for OSS plugin.
303 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
304 &XrdVERSIONINFOVAR(XrdOucGetCache));
305 if (! ofsCfg) return false;
306
307 TmpConfiguration tmpc;
308
309 // Adjust default parameters for client/serverless caching
310 if (m_isClient)
311 {
312 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
313 m_configuration.m_wqueue_blocks = 8;
314 m_configuration.m_wqueue_threads = 1;
315 }
316
317 // If network checksum processing is the default, indicate so.
318 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
319
320 // Actual parsing of the config file.
321 bool retval = true, aOK = true;
322 char *var;
323 while ((var = Config.GetMyFirstWord()))
324 {
325 if (! strcmp(var,"pfc.osslib"))
326 {
327 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
328 }
329 else if (! strcmp(var,"pfc.cschk"))
330 {
331 retval = xcschk(Config);
332 }
333 else if (! strcmp(var,"pfc.decisionlib"))
334 {
335 retval = xdlib(Config);
336 }
337 else if (! strcmp(var,"pfc.trace"))
338 {
339 retval = xtrace(Config);
340 }
341 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
342 {
343 m_configuration.m_allow_xrdpfc_command = true;
344 }
345 else if (! strncmp(var,"pfc.", 4))
346 {
347 retval = ConfigParameters(std::string(var+4), Config, tmpc);
348 }
349
350 if ( ! retval)
351 {
352 TRACE(Error, "Config() error in parsing");
353 aOK = false;
354 }
355 }
356
357 Config.Close();
358
359 // Load OSS plugin.
360 myEnv.Put("oss.runmode", "pfc");
361 if (m_configuration.is_cschk_cache())
362 {
363 char csi_conf[128];
364 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
365 {
366 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
367 } else {
368 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
369 return false;
370 }
371 }
372 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
373 {
374 ofsCfg->Plugin(m_oss);
375 }
376 else
377 {
378 TRACE(Error, "Config() Unable to create an OSS object");
379 return false;
380 }
381
382 // sets default value for disk usage
383 XrdOssVSInfo sP;
384 {
385 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
386 {
387 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
388 return false;
389 }
390 if (sP.Total < 10ll << 20)
391 {
392 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
393 m_configuration.m_data_space.c_str());
394 return false;
395 }
396
397 m_configuration.m_diskTotalSpace = sP.Total;
398
399 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
400 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
401 {
402 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
403 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
404 aOK = false;
405 }
406 }
407 else aOK = false;
408
409 if ( ! tmpc.m_fileUsageMax.empty())
410 {
411 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
412 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
413 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
414 {
415 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
416 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
417 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
418 {
419 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
420 aOK = false;
421 }
422 }
423 else aOK = false;
424 }
425 }
426 // sets flush frequency
427 if ( ! tmpc.m_flushRaw.empty())
428 {
429 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
430 {
431 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
432 &m_configuration.m_flushCnt,
433 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
434 {
435 return false;
436 }
437 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
438 }
439 else
440 {
441 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
442 &m_configuration.m_flushCnt, 100, 100000))
443 {
444 return false;
445 }
446 }
447 }
448
449 // get number of available RAM blocks after process configuration
450 if (m_configuration.m_RamAbsAvailable == 0)
451 {
452 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
453 char buff[1024];
454 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
455 m_log.Say("Config info: ", buff);
456 }
457 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
458 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
459
460
461 // Set tracing to debug if this is set in environment
462 char* cenv = getenv("XRDDEBUG");
463 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
464
465 if (aOK)
466 {
467 int loff = 0;
468// 000 001 010
469 const char *csc[] = {"off", "cache nonet", "nocache net notls",
470// 011
471 "cache net notls",
472// 100 101 110
473 "off", "cache nonet", "nocache net tls",
474// 111
475 "cache net tls"};
476 char buff[8192], uvk[32];
477 if (m_configuration.m_cs_UVKeep < 0)
478 strcpy(uvk, "lru");
479 else
480 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
481 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
482 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
483 " pfc.cschk %s uvkeep %s\n"
484 " pfc.blocksize %lld\n"
485 " pfc.prefetch %d\n"
486 " pfc.ram %.fg\n"
487 " pfc.writequeue %d %d\n"
488 " # Total available disk: %lld\n"
489 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
490 " pfc.spaces %s %s\n"
491 " pfc.trace %d\n"
492 " pfc.flush %lld\n"
493 " pfc.acchistorysize %d\n"
494 " pfc.onlyIfCachedMinBytes %lld\n"
495 " pfc.onlyIfCachedMinFrac %.2f\n",
496 config_filename,
497 csc[int(m_configuration.m_cs_Chk)], uvk,
498 m_configuration.m_bufferSize,
499 m_configuration.m_prefetch_max_blocks,
500 rg,
501 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
502 sP.Total,
503 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
504 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
505 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
506 m_configuration.m_data_space.c_str(),
507 m_configuration.m_meta_space.c_str(),
508 m_trace->What,
509 m_configuration.m_flushCnt,
510 m_configuration.m_accHistorySize,
511 m_configuration.m_onlyIfCachedMinSize,
512 m_configuration.m_onlyIfCachedMinFrac);
513
514 if (m_configuration.is_dir_stat_reporting_on())
515 {
516 loff += snprintf(buff + loff, sizeof(buff) - loff,
517 " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
518 m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
519 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
520 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
521 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
522 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
523 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
524 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
525 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
526 }
527
528 if (m_configuration.m_hdfsmode)
529 {
530 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
531 }
532
533 if (m_configuration.m_username.empty())
534 {
535 char unameBuff[256];
536 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
537 m_configuration.m_username = unameBuff;
538 }
539 else
540 {
541 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
542 }
543
544 m_log.Say(buff);
545
546 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
547 }
548
549 // Derived settings
550 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
551 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
552
553 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
554
555 m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive");
556
557 m_log.Say("------ Proxy File Cache configuration parsing ", aOK ? "completed" : "failed");
558
559 if (ofsCfg) delete ofsCfg;
560
561 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
562 // Building of xrdpfc_print fails when this is enabled.
563#ifdef XRDPFC_CKSUM_TEST
564 {
565 int xxx = m_configuration.m_cs_Chk;
566
567 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
568 {
569 Info::TestCksumStuff();
570 }
571
572 m_configuration.m_cs_Chk = xxx;
573 }
574#endif
575
576 return aOK;
577}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:81
#define open
Definition XrdPosix.hh:71
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:108
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:102
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:94
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:105
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:85
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:99
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
bool is_cschk_cache() const
Definition XrdPfc.hh:69
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition XrdPfc.hh:97
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:113
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:89
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:112
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:78
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:92
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition XrdPfc.hh:96
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
bool is_cschk_net() const
Definition XrdPfc.hh:70
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:116
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:111
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:98
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:91
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:115
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:62
std::string m_diskUsageLWM
Definition XrdPfc.hh:123
std::string m_diskUsageHWM
Definition XrdPfc.hh:124
std::string m_fileUsageBaseline
Definition XrdPfc.hh:125
std::string m_fileUsageNominal
Definition XrdPfc.hh:126
std::string m_flushRaw
Definition XrdPfc.hh:128
std::string m_fileUsageMax
Definition XrdPfc.hh:127

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Error, XrdOucEnv::Export(), XrdOucEnv::GetPtr(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsMaxDepth, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Definition at line 914 of file XrdPfc.cc.

915{
916 TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
917
918 XrdCl::URL url(curl);
919 std::string f_name = url.GetPath();
920 std::string i_name = f_name + Info::s_infoExtension;
921
922 {
923 XrdSysCondVarHelper lock(&m_active_cond);
924 m_purge_delay_set.insert(f_name);
925 }
926
927 struct stat sbuff, sbuff2;
928 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
929 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
930 {
931 if (S_ISDIR(sbuff.st_mode))
932 {
933 TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
934 return -EISDIR;
935 }
936 else
937 {
938 bool read_ok = false;
939 bool is_cached = false;
940
941 // Lock and check if the file is active. If NOT, keep the lock
942 // and add dummy access after successful reading of info file.
943 // If it IS active, just release the lock, this ongoing access will
944 // assure the file continues to exist.
945
946 // XXXX How can I just loop over the cinfo file when active?
947 // Can I not get is_complete from the existing file?
948 // Do I still want to inject access record?
949 // Oh, it writes only if not active .... still let's try to use existing File.
950
951 m_active_cond.Lock();
952
953 bool is_active = m_active.find(f_name) != m_active.end();
954
955 if (is_active)
956 m_active_cond.UnLock();
957
958 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
959 XrdOucEnv myEnv;
960 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
961 if (res >= 0)
962 {
963 Info info(m_trace, 0);
964 if (info.Read(infoFile, i_name.c_str()))
965 {
966 read_ok = true;
967
968 if (info.IsComplete())
969 {
970 is_cached = true;
971 }
972 else if (info.GetFileSize() == 0)
973 {
974 is_cached = true;
975 }
976 else
977 {
978 long long fileSize = info.GetFileSize();
979 long long bytesRead = info.GetNDownloadedBytes();
980
981 if (fileSize < m_configuration.m_onlyIfCachedMinSize)
982 {
983 if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
984 is_cached = true;
985 }
986 else
987 {
988 if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
989 (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
990 is_cached = true;
991 }
992 }
993 }
994 infoFile->Close();
995 }
996 delete infoFile;
997
998 if (!is_active) m_active_cond.UnLock();
999
1000 if (read_ok)
1001 {
1002 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
1003 return is_cached ? 0 : -EREMOTE;
1004 }
1005 }
1006 }
1007
1008 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
1009 return -ENOENT;
1010}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:96
URL representation.
Definition XrdClURL.hh:31
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static const char * s_infoExtension

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdPfc::Info::GetNDownloadedBytes(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 153 of file XrdPfc.cc.

154{
155 assert (m_instance == 0);
156 m_instance = new Cache(logger, env);
157 return *m_instance;
158}
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267

Referenced by XrdOucGetCache().

+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 164 of file XrdPfc.cc.

165{
166 if (! m_decisionpoints.empty())
167 {
168 XrdCl::URL url(io->Path());
169 std::string filename = url.GetPath();
170 std::vector<Decision*>::const_iterator it;
171 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
172 {
173 XrdPfc::Decision *d = *it;
174 if (! d) continue;
175 if (! d->Decide(filename, *m_oss))
176 {
177 return false;
178 }
179 }
180 }
181
182 return true;
183}
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 703 of file XrdPfc.cc.

704{
705 // Can be called with other locks held.
706
707 if ( ! m_prefetch_enabled)
708 {
709 return;
710 }
711
712 m_prefetch_condVar.Lock();
713 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
714 {
715 if (*it == file)
716 {
717 m_prefetchList.erase(it);
718 break;
719 }
720 }
721 m_prefetch_condVar.UnLock();
722}

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 48 of file XrdPfcCommand.cc.

49{
50 static const char *top_epfx = "ExecuteCommandUrl ";
51
52 SplitParser cp(command_url, "/");
53
54 std::string token = cp.get_token();
55
56 if (token != "xrdpfc_command")
57 {
58 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
59 return;
60 }
61
62 // Get the command
63 token = cp.get_token();
64
65
66 //================================================================
67 // create_file
68 //================================================================
69
70 if (token == "create_file")
71 {
72 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
73 static const char* usage =
74 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
75 " Creates a cache file with given parameters. Data in file is random.\n"
76 " Useful for cache purge testing.\n"
77 "Notes:\n"
78 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
79 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
80 " . -t and -d can be given multiple times to record several accesses.\n"
81 " . Negative arguments given to -t are interpreted as relative to now.\n";
82
83 const Configuration &conf = m_configuration;
84
85 token = cp.get_token();
86
87 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
88
89 std::vector<char*> argv;
90 SplitParser ap(token, " ");
91 int argc = ap.fill_argv(argv);
92
93 long long file_size = ONE_GB;
94 long long block_size = conf.m_bufferSize;
95 int access_time [MAX_ACCESSES];
96 int access_duration[MAX_ACCESSES];
97 int at_count = 0, ad_count = 0;
98 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
99 "help", 1, "h",
100 "verbose", 1, "v",
101 "size", 1, "s",
102 "blocksize", 1, "b",
103 "time", 1, "t",
104 "duration", 1, "d",
105 (const char *) 0);
106
107 time_t time_now = time(0);
108
109 Spec.Set(argc, &argv[0]);
110 char theOpt;
111
112 while ((theOpt = Spec.getopt()) != (char) -1)
113 {
114 switch (theOpt)
115 {
116 case 'h': {
117 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
118 return;
119 }
120 case 's': {
121 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
122 &file_size, 0ll, 32 * ONE_GB))
123 return;
124 break;
125 }
126 case 'b': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
128 &block_size, 0ll, 64 * ONE_MB))
129 return;
130 break;
131 }
132 case 't': {
133 if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
134 &access_time[at_count++], INT_MIN, INT_MAX))
135 return;
136 break;
137 }
138 case 'd': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
140 &access_duration[ad_count++], 0, 24 * 3600))
141 return;
142 break;
143 }
144 default: {
145 TRACE(Error, err_prefix << "Unhandled command argument.");
146 return;
147 }
148 }
149 }
150 if (Spec.getarg())
151 {
152 TRACE(Error, err_prefix << "Options must take up all the arguments.");
153 return;
154 }
155
156 if (at_count < 1) access_time [at_count++] = time_now - 10;
157 if (ad_count < 1) access_duration[ad_count++] = 10;
158
159 if (at_count != ad_count)
160 {
161 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
162 return;
163 }
164
165 std::string file_path (cp.get_reminder_with_delim());
166 std::string cinfo_path(file_path + Info::s_infoExtension);
167
168 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
169
170 // Check if cinfo exists ... bail out if it does.
171 {
172 struct stat infoStat;
173 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
174 {
175 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
176 return;
177 }
178 }
179
180 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
181
182 {
183 const char *myUser = conf.m_username.c_str();
184 XrdOucEnv myEnv;
185
186 // Create the data file.
187
188 char size_str[32]; sprintf(size_str, "%lld", file_size);
189 myEnv.Put("oss.asize", size_str);
190 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
191 int cret;
192 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
193 {
194 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
195 return;
196 }
197
198 XrdOssDF *myFile = GetOss()->newFile(myUser);
199 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
200 {
201 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
202 delete myFile;
203 return;
204 }
205
206 // Create the info file.
207
208 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
209 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
210 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
211 {
212 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
213 myFile->Close(); delete myFile;
214 return;
215 }
216
217 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
218 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
219 {
220 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
221 delete myInfoFile;
222 myFile->Close(); delete myFile;
223 return;
224 }
225
226 // Allocate space for the data file.
227
228 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
229 {
230 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
231 }
232
233 // Fill up cinfo.
234
235 Info myInfo(m_trace, false);
236 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
237 myInfo.SetAllBitsSynced();
238
239 for (int i = 0; i < at_count; ++i)
240 {
241 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
242
243 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
244 }
245
246 myInfo.Write(myInfoFile, cinfo_path.c_str());
247
248 myInfoFile->Close(); delete myInfoFile;
249 myFile->Close(); delete myFile;
250
251 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
252
253 {
254 XrdSysCondVarHelper lock(&m_writeQ.condVar);
255
256 m_writeQ.writes_between_purges += file_size;
257 }
258 }
259 }
260
261 //================================================================
262 // remove_file
263 //================================================================
264
265 else if (token == "remove_file")
266 {
267 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
268 static const char* usage =
269 "Usage: remove_file/ [-h] /<path>\n"
270 " Removes given file from the cache unless it is currently open.\n"
271 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
272 "Notes:\n"
273 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
274
275 token = cp.get_token();
276
277 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
278
279 std::vector<char*> argv;
280 SplitParser ap(token, " ");
281 int argc = ap.fill_argv(argv);
282
283 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
284 "help", 1, "h",
285 (const char *) 0);
286
287 Spec.Set(argc, &argv[0]);
288 char theOpt;
289
290 while ((theOpt = Spec.getopt()) != (char) -1)
291 {
292 switch (theOpt)
293 {
294 case 'h': {
295 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
296 return;
297 }
298 default: {
299 TRACE(Error, err_prefix << "Unhandled command argument.");
300 return;
301 }
302 }
303 }
304 if (Spec.getarg())
305 {
306 TRACE(Error, err_prefix << "Options must take up all the arguments.");
307 return;
308 }
309
310 std::string f_name(cp.get_reminder());
311
312 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
313
314 int ret = UnlinkFile(f_name, true);
315
316 TRACE(Info, err_prefix << "returned with status " << ret);
317 }
318
319 //================================================================
320 // unknown command
321 //================================================================
322
323 else
324 {
325 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
326 }
327}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1133
XrdOss * GetOss() const
Definition XrdPfc.hh:385
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::fill_argv(), XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdOucArgs::getarg(), XrdOssDF::getFD(), XrdOucArgs::getopt(), GetOss(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_username, MAX_ACCESSES, XrdOss::newFile(), ONE_GB, ONE_MB, XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdOucArgs::Set(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 564 of file XrdPfc.cc.

565{
566 dec_ref_cnt(f, high_debug);
567}

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 412 of file XrdPfc.cc.

413{
414 // Called from virtual IO::Attach
415
416 TRACE(Debug, "GetFile " << path << ", io " << io);
417
418 ActiveMap_i it;
419
420 {
421 XrdSysCondVarHelper lock(&m_active_cond);
422
423 while (true)
424 {
425 it = m_active.find(path);
426
427 // File is not open or being opened. Mark it as being opened and
428 // proceed to opening it outside of while loop.
429 if (it == m_active.end())
430 {
431 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
432 break;
433 }
434
435 if (it->second != 0)
436 {
437 it->second->AddIO(io);
438 inc_ref_cnt(it->second, false, true);
439
440 return it->second;
441 }
442 else
443 {
444 // Wait for some change in m_active, then recheck.
445 m_active_cond.Wait();
446 }
447 }
448 }
449
450 if (filesize == 0)
451 {
452 struct stat st;
453 int res = io->Fstat(st);
454 if (res < 0) {
455 errno = res;
456 TRACE(Error, "GetFile, could not get valid stat");
457 } else if (res > 0) {
458 errno = ENOTSUP;
459 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
460 } else {
461 filesize = st.st_size;
462 }
463 }
464
465 File *file = 0;
466
467 if (filesize >= 0)
468 {
469 file = File::FileOpen(path, off, filesize);
470 }
471
472 {
473 XrdSysCondVarHelper lock(&m_active_cond);
474
475 if (file)
476 {
477 inc_ref_cnt(file, false, true);
478 it->second = file;
479
480 file->AddIO(io);
481 }
482 else
483 {
484 m_active.erase(it);
485 }
486
487 m_active_cond.Broadcast();
488 }
489
490 return file;
491}
virtual int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
void AddIO(IO *io)

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat, TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 400 of file XrdPfc.hh.

400{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 160 of file XrdPfc.cc.

160{ return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::GetTrace(), PrefetchThread(), ProcessWriteTaskThread(), PurgeThread(), ResourceMonitorHeartBeatThread(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( )
inline

Definition at line 397 of file XrdPfc.hh.

397{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 725 of file XrdPfc.cc.

726{
727 m_prefetch_condVar.Lock();
728 while (m_prefetchList.empty())
729 {
730 m_prefetch_condVar.Wait();
731 }
732
733 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
734
735 size_t l = m_prefetchList.size();
736 int idx = rand() % l;
737 File* f = m_prefetchList[idx];
738
739 m_prefetch_condVar.UnLock();
740 return f;
741}

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 385 of file XrdPfc.hh.

385{ return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( )
inline

Definition at line 398 of file XrdPfc.hh.

398{ return m_trace; }

Referenced by XrdPfc::File::GetTrace(), XrdPfc::IO::GetTrace(), and XrdPfc::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path)

Definition at line 674 of file XrdPfc.cc.

675{
676 XrdSysCondVarHelper lock(&m_active_cond);
677
678 return m_active.find(path) != m_active.end() ||
679 m_purge_delay_set.find(path) != m_purge_delay_set.end();
680}

Referenced by Purge().

+ Here is the caller graph for this function:

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 786 of file XrdPfc.cc.

788{
789 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
790 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
791 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
792
793 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
794
795 if (buff && blen > 0) buff[0] = 0;
796
797 XrdCl::URL url(curl);
798 std::string f_name = url.GetPath();
799 std::string i_name = f_name + Info::s_infoExtension;
800
801 if (why == ForPath)
802 {
803 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
804 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
805 return ret;
806 }
807
808 {
809 XrdSysCondVarHelper lock(&m_active_cond);
810 m_purge_delay_set.insert(f_name);
811 }
812
813 struct stat sbuff, sbuff2;
814 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
815 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
816 {
817 if (S_ISDIR(sbuff.st_mode))
818 {
819 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
820 return -EISDIR;
821 }
822 else
823 {
824 bool read_ok = false;
825 bool is_complete = false;
826
827 // Lock and check if the file is active. If NOT, keep the lock
828 // and add dummy access after successful reading of info file.
829 // If it IS active, just release the lock, this ongoing access will
830 // assure the file continues to exist.
831
832 // XXXX How can I just loop over the cinfo file when active?
833 // Can I not get is_complete from the existing file?
834 // Do I still want to inject access record?
835 // Oh, it writes only if not active .... still let's try to use existing File.
836
837 m_active_cond.Lock();
838
839 bool is_active = m_active.find(f_name) != m_active.end();
840
841 if (is_active) m_active_cond.UnLock();
842
843 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
844 XrdOucEnv myEnv;
845 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
846 if (res >= 0)
847 {
848 Info info(m_trace, 0);
849 if (info.Read(infoFile, i_name.c_str()))
850 {
851 read_ok = true;
852
853 is_complete = info.IsComplete();
854
855 // Add full-size access if reason is for access.
856 if ( ! is_active && is_complete && why == ForAccess)
857 {
858 info.WriteIOStatSingle(info.GetFileSize());
859 info.Write(infoFile, i_name.c_str());
860 }
861 }
862 infoFile->Close();
863 }
864 delete infoFile;
865
866 if ( ! is_active) m_active_cond.UnLock();
867
868 if (read_ok)
869 {
870 if ((is_complete || why == ForInfo) && buff != 0)
871 {
872 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
873 if (res2 < 0)
874 return res2;
875
876 // Normally, files are owned by us but when direct cache access
877 // is wanted and possible, make sure the file is world readable.
878 if (why == ForAccess)
879 {mode_t mode = (forall ? worldReadable : groupReadable);
880 if (((sbuff.st_mode & worldReadable) != mode)
881 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
882 {is_complete = false;
883 *buff = 0;
884 }
885 }
886 }
887
888 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
889 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
890
891 return is_complete ? 0 : -EREMOTE;
892 }
893 }
894 }
895
896 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
897 return -ENOENT;
898}
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:873

References XrdOss::Chmod(), XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 744 of file XrdPfc.cc.

745{
746 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
747
748 while (true)
749 {
750 m_RAM_mutex.Lock();
751 bool doPrefetch = (m_RAM_used < limit_RAM);
752 m_RAM_mutex.UnLock();
753
754 if (doPrefetch)
755 {
757 f->Prefetch();
758 }
759 else
760 {
762 }
763 }
764}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:725
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1022 of file XrdPfc.cc.

1023{
1024 XrdCl::URL url(curl);
1025 std::string f_name = url.GetPath();
1026 std::string i_name = f_name + Info::s_infoExtension;
1027
1028 // Do not allow write access.
1029 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1030 {
1031 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1032 return -EROFS;
1033 }
1034
1035 // Intercept xrdpfc_command requests.
1036 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1037 {
1038 // Schedule a job to process command request.
1039 {
1040 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1041
1042 schedP->Schedule(ce);
1043 }
1044
1045 return -EAGAIN;
1046 }
1047
1048 {
1049 XrdSysCondVarHelper lock(&m_active_cond);
1050 m_purge_delay_set.insert(f_name);
1051 }
1052
1053 struct stat sbuff;
1054 int res = m_oss->Stat(i_name.c_str(), &sbuff);
1055 if (res == 0)
1056 {
1057 TRACE(Dump, "Prepare defer open " << f_name);
1058 return 1;
1059 }
1060 else
1061 {
1062 return 0;
1063 }
1064}
static XrdScheduler * schedP
Definition XrdPfc.hh:404
void Schedule(XrdJob *jp)

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat, XrdOss::Stat(), and TRACE.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 305 of file XrdPfc.cc.

306{
307 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
308
309 while (true)
310 {
311 m_writeQ.condVar.Lock();
312 while (m_writeQ.size == 0)
313 {
314 m_writeQ.condVar.Wait();
315 }
316
317 // MT -- optimize to pop several blocks if they are available (or swap the list).
318 // This makes sense especially for smallish block sizes.
319
320 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
321 long long sum_size = 0;
322
323 for (int bi = 0; bi < n_pushed; ++bi)
324 {
325 Block* block = m_writeQ.queue.front();
326 m_writeQ.queue.pop_front();
327 m_writeQ.writes_between_purges += block->get_size();
328 sum_size += block->get_size();
329
330 blks_to_write[bi] = block;
331
332 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
333 }
334 m_writeQ.size -= n_pushed;
335
336 m_writeQ.condVar.UnLock();
337
338 {
339 XrdSysMutexHelper lock(&m_RAM_mutex);
340 m_RAM_write_queue -= sum_size;
341 }
342
343 for (int bi = 0; bi < n_pushed; ++bi)
344 {
345 Block* block = blks_to_write[bi];
346
347 block->m_file->WriteBlockToDisk(block);
348 }
349 }
350}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Purge()

void XrdPfc::Cache::Purge ( )

Thread function invoked to scan and purge files from disk when needed.

Definition at line 698 of file XrdPfcPurge.cc.

699{
700 static const char *trc_pfx = "Purge() ";
701
702 XrdOucEnv env;
703 long long disk_usage;
704 long long estimated_file_usage = m_configuration.m_diskUsageHWM;
705
706 // Pause before initial run
707 sleep(1);
708
709 m_fs_state = new DataFsState;
710
711 // { PathTokenizer p("/a/b/c/f.root", 2, true); p.deboog(); }
712 // { PathTokenizer p("/a/b/f.root", 2, true); p.deboog(); }
713 // { PathTokenizer p("/a/f.root", 2, true); p.deboog(); }
714 // { PathTokenizer p("/f.root", 2, true); p.deboog(); }
715
716 int age_based_purge_countdown = 0; // enforce on first purge loop entry.
717 bool is_first = true;
718
719 while (true)
720 {
721 time_t purge_start = time(0);
722
723 {
724 XrdSysCondVarHelper lock(&m_active_cond);
725
726 m_in_purge = true;
727 }
728
729 TRACE(Info, trc_pfx << "Started.");
730
731 // Bytes to remove based on total disk usage (d) and file usage (f).
732 long long bytesToRemove_d = 0, bytesToRemove_f = 0;
733
734 // get amount of space to potentially erase based on total disk usage
735 XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop
736 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
737 {
738 TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space);
739 continue;
740 }
741 else
742 {
743 disk_usage = sP.Total - sP.Free;
744 TRACE(Debug, trc_pfx << "used disk space " << disk_usage << " bytes.");
745
746 if (disk_usage > m_configuration.m_diskUsageHWM)
747 {
748 bytesToRemove_d = disk_usage - m_configuration.m_diskUsageLWM;
749 }
750 }
751
752 // estimate amount of space to erase based on file usage
753 if (m_configuration.are_file_usage_limits_set())
754 {
755 long long estimated_writes_since_last_purge;
756 {
757 XrdSysCondVarHelper lock(&m_writeQ.condVar);
758
759 estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760 m_writeQ.writes_between_purges = 0;
761 }
762 estimated_file_usage += estimated_writes_since_last_purge;
763
764 TRACE(Debug, trc_pfx << "estimated usage by files " << estimated_file_usage << " bytes.");
765
766 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
767
768 // Here we estimate fractional usages -- to decide if full scan is necessary before actual purge.
769 double frac_du = 0, frac_fu = 0;
770 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
771
772 if (frac_fu > 1.0 - frac_du)
773 {
774 bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
775 }
776 }
777
778 long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
779
780 bool enforce_age_based_purge = false;
781 if (m_configuration.is_age_based_purge_in_effect() || m_configuration.is_uvkeep_purge_in_effect())
782 {
783 // XXXX ... I could collect those guys in larger vectors (maps?) and do traversal when
784 // they are empty.
785 if (--age_based_purge_countdown <= 0)
786 {
787 enforce_age_based_purge = true;
788 age_based_purge_countdown = m_configuration.m_purgeAgeBasedPeriod;
789 }
790 }
791
792 bool enforce_traversal_for_usage_collection = is_first;
793 // XXX Other conditions? Periodic checks?
794
795 copy_out_active_stats_and_update_data_fs_state();
796
797 TRACE(Debug, trc_pfx << "Precheck:");
798 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
799 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (" << (is_first ? "max possible for initial run" : "estimated") << ")");
800 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
801 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
802 is_first = false;
803
804 long long bytesToRemove_at_start = 0; // set after file scan
805 int deleted_file_count = 0;
806
807 bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
808
809 // XXXX-PurgeOpt Need to retain this state between purges so I can avoid doing
810 // the traversal more often than really needed.
811 FPurgeState purgeState(2 * bytesToRemove, *m_oss); // prepare twice more volume than required
812
813 if (purge_required || enforce_traversal_for_usage_collection)
814 {
815 // Make a sorted map of file paths sorted by access time.
816
817 if (m_configuration.is_age_based_purge_in_effect())
818 {
819 purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge);
820 }
821 if (m_configuration.is_uvkeep_purge_in_effect())
822 {
823 purgeState.setUVKeepMinTime(time(0) - m_configuration.m_cs_UVKeep);
824 }
825
826 XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str());
827 if (dh->Opendir("/", env) == XrdOssOK)
828 {
829 purgeState.begin_traversal(m_fs_state->get_root());
830
831 purgeState.TraverseNamespace(dh);
832
833 purgeState.end_traversal();
834
835 dh->Close();
836 }
837 delete dh; dh = 0;
838
839 estimated_file_usage = purgeState.getNBytesTotal();
840
841 TRACE(Debug, trc_pfx << "actual usage by files " << estimated_file_usage << " bytes.");
842
843 // Adjust bytesToRemove_f and then bytesToRemove based on actual file usage,
844 // possibly retreating below nominal file usage (but not below baseline file usage).
845 if (m_configuration.are_file_usage_limits_set())
846 {
847 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
848
849 double frac_du = 0, frac_fu = 0;
850 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
851
852 if (frac_fu > 1.0 - frac_du)
853 {
854 bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
855 bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.m_fileUsageBaseline);
856 }
857 else
858 {
859 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
860 }
861 }
862 else
863 {
864 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
865 }
866 bytesToRemove_at_start = bytesToRemove;
867
868 TRACE(Debug, trc_pfx << "After scan:");
869 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
870 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (measured)");
871 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
872 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
873 TRACE(Debug, "\tmin_time = " << purgeState.getMinTime());
874
875 if (enforce_age_based_purge)
876 {
877 purgeState.MoveListEntriesToMap();
878 }
879 }
880
881 // Dump statistcs before actual purging so maximum usage values get recorded.
882 // Should really go to gstream --- and should really go from Heartbeat.
883 if (m_configuration.is_dir_stat_reporting_on())
884 {
885 m_fs_state->dump_recursively();
886 }
887
888 if (purge_required)
889 {
890 // Loop over map and remove files with oldest values of access time.
891 struct stat fstat;
892 size_t info_ext_len = strlen(Info::s_infoExtension);
893 int protected_cnt = 0;
894 long long protected_sum = 0;
895 for (FPurgeState::map_i it = purgeState.m_fmap.begin(); it != purgeState.m_fmap.end(); ++it)
896 {
897 // Finish when enough space has been freed but not while age-based purging is in progress.
898 // Those files are marked with time-stamp = 0.
899 if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
900 {
901 break;
902 }
903
904 std::string &infoPath = it->second.path;
905 std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
906
907 if (IsFileActiveOrPurgeProtected(dataPath))
908 {
909 ++protected_cnt;
910 protected_sum += it->second.nBytes;
911 TRACE(Debug, trc_pfx << "File is active or purge-protected: " << dataPath << " size: " << it->second.nBytes);
912 continue;
913 }
914
915 // remove info file
916 if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK)
917 {
918 // cinfo file can be on another oss.space, do not subtract for now.
919 // Could be relevant for very small block sizes.
920 // bytesToRemove -= fstat.st_size;
921 // estimated_file_usage -= fstat.st_size;
922 // ++deleted_file_count;
923
924 m_oss->Unlink(infoPath.c_str());
925 TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size);
926 }
927
928 // remove data file
929 if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK)
930 {
931 bytesToRemove -= it->second.nBytes;
932 estimated_file_usage -= it->second.nBytes;
933 ++deleted_file_count;
934
935 m_oss->Unlink(dataPath.c_str());
936 TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first);
937
938 if (it->second.dirState != 0) // XXXX This should now always be true.
939 it->second.dirState->add_usage_purged(it->second.nBytes);
940 else
941 TRACE(Error, trc_pfx << "DirState not set for file '" << dataPath << "'.");
942 }
943 }
944 if (protected_cnt > 0)
945 {
946 TRACE(Info, trc_pfx << "Encountered " << protected_cnt << " protected files, sum of their size: " << protected_sum);
947 }
948
949 m_fs_state->upward_propagate_usage_purged();
950 }
951
952 {
953 XrdSysCondVarHelper lock(&m_active_cond);
954
955 m_purge_delay_set.clear();
956 m_in_purge = false;
957 }
958
959 int purge_duration = time(0) - purge_start;
960
961 TRACE(Info, trc_pfx << "Finished, removed " << deleted_file_count << " data files, total size " <<
962 bytesToRemove_at_start - bytesToRemove << ", bytes to remove at end " << bytesToRemove << ", purge duration " << purge_duration);
963
964 int sleep_time = m_configuration.m_purgeInterval - purge_duration;
965 if (sleep_time > 0)
966 {
967 sleep(sleep_time);
968 }
969 }
970}
#define fstat(a, b)
Definition XrdPosix.hh:57
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition XrdOss.hh:79
long long Free
Definition XrdOssVS.hh:91
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:674
void upward_propagate_usage_purged()
DirState * get_root()
map_t::iterator map_i
bool is_uvkeep_purge_in_effect() const
Definition XrdPfc.hh:61
bool are_file_usage_limits_set() const
Definition XrdPfc.hh:59
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:93
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition XrdPfc.cc:132
bool is_age_based_purge_in_effect() const
Definition XrdPfc.hh:60

References XrdPfc::Configuration::are_file_usage_limits_set(), XrdPfc::FPurgeState::begin_traversal(), XrdPfc::Configuration::calculate_fractional_usages(), XrdOssDF::Close(), Debug, XrdPfc::DataFsState::dump_recursively(), XrdPfc::FPurgeState::end_traversal(), Error, XrdOssVSInfo::Free, fstat, XrdPfc::DataFsState::get_root(), XrdPfc::FPurgeState::getMinTime(), XrdPfc::FPurgeState::getNBytesTotal(), XrdPfc::Configuration::is_age_based_purge_in_effect(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdPfc::Configuration::is_uvkeep_purge_in_effect(), IsFileActiveOrPurgeProtected(), XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::FPurgeState::m_fmap, XrdPfc::Configuration::m_purgeAgeBasedPeriod, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_username, XrdPfc::FPurgeState::MoveListEntriesToMap(), XrdOss::newDir(), XrdOssDF::Opendir(), XrdPfc::Info::s_infoExtension, XrdPfc::FPurgeState::setMinTime(), XrdPfc::FPurgeState::setUVKeepMinTime(), stat, XrdOss::Stat(), XrdOss::StatVS(), XrdOssVSInfo::Total, TRACE, XrdPfc::FPurgeState::TraverseNamespace(), XrdOss::Unlink(), XrdPfc::DataFsState::upward_propagate_usage_purged(), and XrdOssOK.

Referenced by PurgeThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 315 of file XrdPfc.hh.

315{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 687 of file XrdPfc.cc.

688{
689 // Can be called with other locks held.
690
691 if ( ! m_prefetch_enabled)
692 {
693 return;
694 }
695
696 m_prefetch_condVar.Lock();
697 m_prefetchList.push_back(file);
698 m_prefetch_condVar.Signal();
699 m_prefetch_condVar.UnLock();
700}

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 493 of file XrdPfc.cc.

494{
495 // Called from virtual IO::DetachFinalize.
496
497 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
498
499 {
500 XrdSysCondVarHelper lock(&m_active_cond);
501
502 f->RemoveIO(io);
503 }
504 dec_ref_cnt(f, true);
505}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 394 of file XrdPfc.cc.

395{
396 bool std_size = (size == m_configuration.m_bufferSize);
397 {
398 XrdSysMutexHelper lock(&m_RAM_mutex);
399
400 m_RAM_used -= size;
401
402 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
403 {
404 m_RAM_std_blocks.push_back(buf);
405 ++m_RAM_std_size;
406 return;
407 }
408 }
409 free(buf);
410}

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 272 of file XrdPfc.cc.

273{
274 std::list<Block*> removed_blocks;
275 long long sum_size = 0;
276
277 m_writeQ.condVar.Lock();
278 std::list<Block*>::iterator i = m_writeQ.queue.begin();
279 while (i != m_writeQ.queue.end())
280 {
281 if ((*i)->m_file == file)
282 {
283 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
284 std::list<Block*>::iterator j = i++;
285 removed_blocks.push_back(*j);
286 sum_size += (*j)->get_size();
287 m_writeQ.queue.erase(j);
288 --m_writeQ.size;
289 }
290 else
291 {
292 ++i;
293 }
294 }
295 m_writeQ.condVar.UnLock();
296
297 {
298 XrdSysMutexHelper lock(&m_RAM_mutex);
299 m_RAM_write_queue -= sum_size;
300 }
301
302 file->BlocksRemovedFromWriteQ(removed_blocks);
303}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 354 of file XrdPfc.cc.

355{
356 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
357
358 bool std_size = (size == m_configuration.m_bufferSize);
359
360 m_RAM_mutex.Lock();
361
362 long long total = m_RAM_used + size;
363
364 if (total <= m_configuration.m_RamAbsAvailable)
365 {
366 m_RAM_used = total;
367 if (std_size && m_RAM_std_size > 0)
368 {
369 char *buf = m_RAM_std_blocks.back();
370 m_RAM_std_blocks.pop_back();
371 --m_RAM_std_size;
372
373 m_RAM_mutex.UnLock();
374
375 return buf;
376 }
377 else
378 {
379 m_RAM_mutex.UnLock();
380 char *buf;
381 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
382 {
383 // Report out of mem? Probably should report it at least the first time,
384 // then periodically.
385 return 0;
386 }
387 return buf;
388 }
389 }
390 m_RAM_mutex.UnLock();
391 return 0;
392}

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResourceMonitorHeartBeat()

void XrdPfc::Cache::ResourceMonitorHeartBeat ( )

Thread function checking resource usage periodically.

Definition at line 606 of file XrdPfcPurge.cc.

607{
608 // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
609
610 // Pause before initial run
611 sleep(1);
612
613 // XXXX Setup initial / constant stats (total RAM, total disk, ???)
614
617
618 S.Lock();
619
620 X.DiskSize = m_configuration.m_diskTotalSpace;
621
622 X.MemSize = m_configuration.m_RamAbsAvailable;
623
624 S.UnLock();
625
626 // XXXX Schedule initial disk scan, time it!
627 //
628 // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
629 // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
630 //
631 // bool scan_and_purge_running = true;
632
633 // XXXX Could we really hold last-usage for all files in memory?
634
635 // XXXX Think how to handle disk-full, scan/purge not finishing:
636 // - start dropping things out of write queue, but only when RAM gets near full;
637 // - monitoring this then becomes a high-priority job, inner loop with sleep of,
638 // say, 5 or 10 seconds.
639
640 while (true)
641 {
642 time_t heartbeat_start = time(0);
643
644 // TRACE(Info, trc_pfx << "HeartBeat starting ...");
645
646 // if sumary monitoring configured, pupulate OucCacheStats:
647 S.Lock();
648
649 // - available / used disk space (files usage calculated elsewhere (maybe))
650
651 // - RAM usage
652 { XrdSysMutexHelper lck(&m_RAM_mutex);
653 X.MemUsed = m_RAM_used;
654 X.MemWriteQ = m_RAM_write_queue;
655 }
656 // - files opened / closed etc
657
658 // do estimate of available space
659 S.UnLock();
660
661 // if needed, schedule purge in a different thread.
662 // purge is:
663 // - deep scan + gather FSPurgeState
664 // - actual purge
665 //
666 // this thread can continue running and, if needed, stop writing to disk
667 // if purge is taking too long.
668
669 // think how data is passed / synchronized between this and purge thread
670
671 // !!!! think how stat collection is done and propgated upwards;
672 // until now it was done once per purge-interval.
673 // now stats will be added up more often, but purge will be done
674 // only occasionally.
675 // also, do we report cumulative values or deltas? cumulative should
676 // be easier and consistent with summary data.
677 // still, some are state - like disk usage, num of files.
678
679 // Do we take care of directories that need to be newly added into DirState hierarchy?
680 // I.e., when user creates new directories and these are covered by either full
681 // spec or by root + depth declaration.
682
683 int heartbeat_duration = time(0) - heartbeat_start;
684
685 // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
686
687 // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration;
688 int sleep_time = 60 - heartbeat_duration;
689 if (sleep_time > 0)
690 {
691 sleep(sleep_time);
692 }
693 }
694}
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics

References XrdOucCacheStats::CacheStats::DiskSize, XrdOucCacheStats::Lock(), XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_RamAbsAvailable, XrdOucCacheStats::CacheStats::MemSize, XrdOucCacheStats::CacheStats::MemUsed, XrdOucCacheStats::CacheStats::MemWriteQ, XrdOucCache::Statistics, XrdOucCacheStats::UnLock(), and XrdOucCacheStats::X.

Referenced by ResourceMonitorHeartBeatThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 393 of file XrdPfc.hh.

393{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1074 of file XrdPfc.cc.

1075{
1076 XrdCl::URL url(curl);
1077 std::string f_name = url.GetPath();
1078
1079 {
1080 XrdSysCondVarHelper lock(&m_active_cond);
1081 m_purge_delay_set.insert(f_name);
1082 }
1083
1084 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1085 {
1086 if (S_ISDIR(sbuff.st_mode))
1087 {
1088 return 0;
1089 }
1090 else
1091 {
1092 bool success = false;
1093 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1094 XrdOucEnv myEnv;
1095
1096 f_name += Info::s_infoExtension;
1097 int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1098 if (res >= 0)
1099 {
1100 Info info(m_trace, 0);
1101 if (info.Read(infoFile, f_name.c_str()))
1102 {
1103 sbuff.st_size = info.GetFileSize();
1104 success = true;
1105 }
1106 }
1107 infoFile->Close();
1108 delete infoFile;
1109 return success ? 0 : 1;
1110 }
1111 }
1112
1113 return 1;
1114}

References XrdOssDF::Close(), XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), and XrdOssOK.

+ Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 161 of file XrdPfc.cc.

161{ return *m_instance; }

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1123 of file XrdPfc.cc.

1124{
1125 XrdCl::URL url(curl);
1126 std::string f_name = url.GetPath();
1127
1128 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1129
1130 return UnlinkFile(f_name, false);
1131}

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1133 of file XrdPfc.cc.

1134{
1135 ActiveMap_i it;
1136 File *file = 0;
1137 {
1138 XrdSysCondVarHelper lock(&m_active_cond);
1139
1140 it = m_active.find(f_name);
1141
1142 if (it != m_active.end())
1143 {
1144 if (fail_if_open)
1145 {
1146 TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1147 return -EBUSY;
1148 }
1149
1150 // Null File* in m_active map means an operation is ongoing, probably
1151 // Attach() with possible File::Open(). Ask for retry.
1152 if (it->second == 0)
1153 {
1154 TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1155 return -EAGAIN;
1156 }
1157
1158 file = it->second;
1160 it->second = 0;
1161 }
1162 else
1163 {
1164 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1165 }
1166 }
1167
1168 if (file)
1169 {
1171 }
1172
1173 std::string i_name = f_name + Info::s_infoExtension;
1174
1175 // Unlink file & cinfo
1176 int f_ret = m_oss->Unlink(f_name.c_str());
1177 int i_ret = m_oss->Unlink(i_name.c_str());
1178
1179 TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1180
1181 {
1182 XrdSysCondVarHelper lock(&m_active_cond);
1183
1184 m_active.erase(it);
1185 }
1186
1187 return std::min(f_ret, i_ret);
1188}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:272
void initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, TRACE, and XrdOss::Unlink().

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 342 of file XrdPfc.hh.

342{ return true; }

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = 0
static

Definition at line 404 of file XrdPfc.hh.

Referenced by XrdPfc::IO::Detach(), Prepare(), and XrdOucGetCache().


The documentation for this class was generated from the following files: