XRootD
Loading...
Searching...
No Matches
XrdClCopyProcess.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
32#include "XrdCl/XrdClMonitor.hh"
33#include "XrdCl/XrdClCopyJob.hh"
34#include "XrdCl/XrdClUtils.hh"
39
40#include <sys/time.h>
41
42#include <memory>
43
44namespace
45{
46 class QueuedCopyJob: public XrdCl::Job
47 {
48 public:
49 QueuedCopyJob( XrdCl::CopyJob *job,
51 uint16_t currentJob,
52 uint16_t totalJobs,
53 XrdSysSemaphore *sem = 0 ):
54 pJob(job), pProgress(progress), pCurrentJob(currentJob),
55 pTotalJobs(totalJobs), pSem(sem),
56 pWrtRetryCnt( XrdCl::DefaultRetryWrtAtLBLimit ),
57 pRetryCnt( XrdCl::DefaultCpRetry ),
58 pRetryPolicy( XrdCl::DefaultCpRetryPolicy )
59 {
60 XrdCl::DefaultEnv::GetEnv()->GetInt( "RetryWrtAtLBLimit", pWrtRetryCnt );
61 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpRetry", pRetryCnt );
62 XrdCl::DefaultEnv::GetEnv()->GetString( "CpRetryPolicy", pRetryPolicy );
63 }
64
65 //------------------------------------------------------------------------
67 //------------------------------------------------------------------------
68 virtual void Run( void * )
69 {
71 timeval bTOD;
72
73 //----------------------------------------------------------------------
74 // Report beginning of the copy
75 //----------------------------------------------------------------------
76 if( pProgress )
77 pProgress->BeginJob( pCurrentJob, pTotalJobs,
78 &pJob->GetSource(),
79 &pJob->GetTarget() );
80
81 if( mon )
82 {
84 i.transfer.origin = &pJob->GetSource();
85 i.transfer.target = &pJob->GetTarget();
87 }
88
89 gettimeofday( &bTOD, 0 );
90
91 //----------------------------------------------------------------------
92 // Do the copy
93 //----------------------------------------------------------------------
95 while( true )
96 {
97 st = pJob->Run( pProgress );
98 //--------------------------------------------------------------------
99 // Retry due to write-recovery
100 //--------------------------------------------------------------------
101 if( !st.IsOK() && st.code == XrdCl::errRetry && pWrtRetryCnt > 0 )
102 {
103 std::string url;
104 pJob->GetResults()->Get( "LastURL", url );
105 XrdCl::URL lastURL( url );
106 XrdCl::URL::ParamsMap cgi = lastURL.GetParams();
107 auto itr = cgi.find( "tried" );
108 if( itr != cgi.end() )
109 {
110 std::string tried = itr->second;
111 if( tried[tried.size() - 1] != ',' ) tried += ',';
112 tried += lastURL.GetHostName();
113 cgi["tried"] = tried;
114 }
115 else
116 cgi["tried"] = lastURL.GetHostName();
117
118 std::string recoveryRedir;
119 pJob->GetResults()->Get( "WrtRecoveryRedir", recoveryRedir );
120 XrdCl::URL recRedirURL( recoveryRedir );
121
122 std::string target;
123 pJob->GetProperties()->Get( "target", target );
124 XrdCl::URL trgURL( target );
125 trgURL.SetHostName( recRedirURL.GetHostName() );
126 trgURL.SetPort( recRedirURL.GetPort() );
127 trgURL.SetProtocol( recRedirURL.GetProtocol() );
128 trgURL.SetParams( cgi );
129 pJob->GetProperties()->Set( "target", trgURL.GetURL() );
130 pJob->Init();
131
132 // we have a new job, let's try again
133 --pWrtRetryCnt;
134 continue;
135 }
136 //--------------------------------------------------------------------
137 // Copy job retry
138 //--------------------------------------------------------------------
139 if( !st.IsOK() && pRetryCnt > 0 &&
143 {
144 if( pRetryPolicy == "continue" )
145 {
146 pJob->GetProperties()->Set( "force", false );
147 pJob->GetProperties()->Set( "continue", true );
148 }
149 else
150 {
151 pJob->GetProperties()->Set( "force", true );
152 pJob->GetProperties()->Set( "continue", false );
153 }
154 --pRetryCnt;
155 continue;
156 }
157
158 // we only loop in case of retry error
159 break;
160 }
161
162 pJob->GetResults()->Set( "status", st );
163
164 //----------------------------------------------------------------------
165 // Report end of the copy
166 //----------------------------------------------------------------------
167 if( mon )
168 {
169 std::vector<std::string> sources;
170 pJob->GetResults()->Get( "sources", sources );
172 i.transfer.origin = &pJob->GetSource();
173 i.transfer.target = &pJob->GetTarget();
174 i.sources = sources.size();
175 i.bTOD = bTOD;
176 gettimeofday( &i.eTOD, 0 );
177 i.status = &st;
179 }
180
181 if( pProgress )
182 pProgress->EndJob( pCurrentJob, pJob->GetResults() );
183
184 if( pSem )
185 pSem->Post();
186 }
187
188 private:
189 XrdCl::CopyJob *pJob;
191 uint16_t pCurrentJob;
192 uint16_t pTotalJobs;
193 XrdSysSemaphore *pSem;
194 int pWrtRetryCnt;
195 int pRetryCnt;
196 std::string pRetryPolicy;
197 };
198};
199
200namespace XrdCl
201{
202 struct CopyProcessImpl
203 {
204 std::vector<PropertyList> pJobProperties;
205 std::vector<PropertyList*> pJobResults;
206 std::vector<CopyJob*> pJobs;
207 };
208
209 //----------------------------------------------------------------------------
210 // Destructor
211 //----------------------------------------------------------------------------
212 CopyProcess::CopyProcess() : pImpl( new CopyProcessImpl() )
213 {
214 }
215
216 //----------------------------------------------------------------------------
217 // Destructor
218 //----------------------------------------------------------------------------
220 {
221 CleanUpJobs();
222 delete pImpl;
223 }
224
225 //----------------------------------------------------------------------------
226 // Add job
227 //----------------------------------------------------------------------------
229 PropertyList *results )
230 {
231 Env *env = DefaultEnv::GetEnv();
232
233 //--------------------------------------------------------------------------
234 // Process a configuraion job
235 //--------------------------------------------------------------------------
236 if( properties.HasProperty( "jobType" ) &&
237 properties.Get<std::string>( "jobType" ) == "configuration" )
238 {
239 if( pImpl->pJobProperties.size() > 0 &&
240 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
241 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
242 {
243 PropertyList &config = *pImpl->pJobProperties.rbegin();
244 PropertyList::PropertyMap::const_iterator it;
245 for( it = properties.begin(); it != properties.end(); ++it )
246 config.Set( it->first, it->second );
247 }
248 else
249 pImpl->pJobProperties.push_back( properties );
250 return XRootDStatus();
251 }
252
253 //--------------------------------------------------------------------------
254 // Validate properties
255 //--------------------------------------------------------------------------
256 if( !properties.HasProperty( "source" ) )
257 return XRootDStatus( stError, errInvalidArgs, 0, "source not specified" );
258
259 if( !properties.HasProperty( "target" ) )
260 return XRootDStatus( stError, errInvalidArgs, 0, "target not specified" );
261
262 pImpl->pJobProperties.push_back( properties );
263 PropertyList &p = pImpl->pJobProperties.back();
264
265 const char *bools[] = {"target", "force", "posc", "coerce", "makeDir",
266 "zipArchive", "xcp", "preserveXAttr", "rmOnBadCksum",
267 "continue", "zipAppend", "doServer", 0};
268 for( int i = 0; bools[i]; ++i )
269 if( !p.HasProperty( bools[i] ) )
270 p.Set( bools[i], false );
271
272 if( !p.HasProperty( "thirdParty" ) )
273 p.Set( "thirdParty", "none" );
274
275 if( !p.HasProperty( "checkSumMode" ) )
276 p.Set( "checkSumMode", "none" );
277 else
278 {
279 if( !p.HasProperty( "checkSumType" ) )
280 {
281 pImpl->pJobProperties.pop_back();
283 "checkSumType not specified" );
284 }
285 else
286 {
287 //----------------------------------------------------------------------
288 // Checksum type has to be case insensitive
289 //----------------------------------------------------------------------
290 std::string checkSumType;
291 p.Get( "checkSumType", checkSumType );
292 std::transform(checkSumType.begin(), checkSumType.end(),
293 checkSumType.begin(), ::tolower);
294 p.Set( "checkSumType", checkSumType );
295 }
296 }
297
298 if( !p.HasProperty( "parallelChunks" ) )
299 {
300 int val = DefaultCPParallelChunks;
301 env->GetInt( "CPParallelChunks", val );
302 p.Set( "parallelChunks", val );
303 }
304
305 if( !p.HasProperty( "chunkSize" ) )
306 {
307 int val = DefaultCPChunkSize;
308 env->GetInt( "CPChunkSize", val );
309 p.Set( "chunkSize", val );
310 }
311
312 if( !p.HasProperty( "xcpBlockSize" ) )
313 {
314 int val = DefaultXCpBlockSize;
315 env->GetInt( "XCpBlockSize", val );
316 p.Set( "xcpBlockSize", val );
317 }
318
319 if( !p.HasProperty( "initTimeout" ) )
320 {
321 int val = DefaultCPInitTimeout;
322 env->GetInt( "CPInitTimeout", val );
323 p.Set( "initTimeout", val );
324 }
325
326 if( !p.HasProperty( "tpcTimeout" ) )
327 {
328 int val = DefaultCPTPCTimeout;
329 env->GetInt( "CPTPCTimeout", val );
330 p.Set( "tpcTimeout", val );
331 }
332
333 if( !p.HasProperty( "cpTimeout" ) )
334 {
335 int val = DefaultCPTimeout;
336 env->GetInt( "CPTimeout", val );
337 p.Set( "cpTimeout", val );
338 }
339
340 if( !p.HasProperty( "dynamicSource" ) )
341 p.Set( "dynamicSource", false );
342
343 if( !p.HasProperty( "xrate" ) )
344 p.Set( "xrate", 0 );
345
346 if( !p.HasProperty( "xrateThreshold" ) || p.Get<long long>( "xrateThreshold" ) == 0 )
347 {
348 int val = DefaultXRateThreshold;
349 env->GetInt( "XRateThreshold", val );
350 p.Set( "xrateThreshold", val );
351 }
352
353 //--------------------------------------------------------------------------
354 // Insert the properties
355 //--------------------------------------------------------------------------
356 Log *log = DefaultEnv::GetLog();
357 Utils::LogPropertyList( log, UtilityMsg, "Adding job with properties: %s",
358 p );
359 pImpl->pJobResults.push_back( results );
360 return XRootDStatus();
361 }
362
363 //----------------------------------------------------------------------------
364 // Prepare the copy jobs
365 //----------------------------------------------------------------------------
367 {
368 Log *log = DefaultEnv::GetLog();
369 std::vector<PropertyList>::iterator it;
370
371 log->Debug( UtilityMsg, "CopyProcess: %llu jobs to prepare",
372 (unsigned long long) pImpl->pJobProperties.size() );
373
374 std::map<std::string, uint32_t> targetFlags;
375 int i = 0;
376 for( it = pImpl->pJobProperties.begin(); it != pImpl->pJobProperties.end(); ++it, ++i )
377 {
378 PropertyList &props = *it;
379
380 if( props.HasProperty( "jobType" ) &&
381 props.Get<std::string>( "jobType" ) == "configuration" )
382 continue;
383
384 PropertyList *res = pImpl->pJobResults[i];
385 std::string tmp;
386
387 props.Get( "source", tmp );
388 URL source = tmp;
389 if( !source.IsValid() )
390 return XRootDStatus( stError, errInvalidArgs, 0, "invalid source" );
391
392 //--------------------------------------------------------------------------
393 // Create a virtual redirector if it is a Metalink file
394 //--------------------------------------------------------------------------
395 if( source.IsMetalink() )
396 {
398 XRootDStatus st = registry.RegisterAndWait( source );
399 if( !st.IsOK() ) return st;
400 }
401
402 // handle UNZIP CGI
403 const URL::ParamsMap &cgi = source.GetParams();
404 URL::ParamsMap::const_iterator itr = cgi.find( "xrdcl.unzip" );
405 if( itr != cgi.end() )
406 {
407 props.Set( "zipArchive", true );
408 props.Set( "zipSource", itr->second );
409 }
410
411 props.Get( "target", tmp );
412 URL target = tmp;
413 if( !target.IsValid() )
414 return XRootDStatus( stError, errInvalidArgs, 0, "invalid target" );
415
416 if( target.GetProtocol() != "stdio" )
417 {
418 // handle directories
419 bool targetIsDir = false;
420 props.Get( "targetIsDir", targetIsDir );
421
422 if( targetIsDir )
423 {
424 std::string path = target.GetPath() + '/';
425 std::string fn;
426
427 bool isZip = false;
428 props.Get( "zipArchive", isZip );
429 if( isZip )
430 {
431 props.Get( "zipSource", fn );
432 }
433 else if( source.IsMetalink() )
434 {
436 VirtualRedirector *redirector = registry.Get( source );
437 fn = redirector->GetTargetName();
438 }
439 else
440 {
441 fn = source.GetPath();
442 }
443
444 size_t pos = fn.rfind( '/' );
445 if( pos != std::string::npos )
446 fn = fn.substr( pos + 1 );
447 path += fn;
448 target.SetPath( path );
449 props.Set( "target", target.GetURL() );
450 }
451 }
452
453 bool tpc = false;
454 props.Get( "thirdParty", tmp );
455 if( tmp != "none" )
456 tpc = true;
457
458 //------------------------------------------------------------------------
459 // Check if we have all we need
460 //------------------------------------------------------------------------
461 if( source.GetProtocol() != "stdio" && source.GetPath().empty() )
462 {
463 log->Debug( UtilityMsg, "CopyProcess (job #%d): no source specified.",
464 i );
465 CleanUpJobs();
467 res->Set( "status", st );
468 return st;
469 }
470
471 if( target.GetProtocol() != "stdio" && target.GetPath().empty() )
472 {
473 log->Debug( UtilityMsg, "CopyProcess (job #%d): no target specified.",
474 i );
475 CleanUpJobs();
477 res->Set( "status", st );
478 return st;
479 }
480
481 //------------------------------------------------------------------------
482 // Check what kind of job we should do
483 //------------------------------------------------------------------------
484 CopyJob *job = 0;
485
486 if( tpc == true )
487 {
488 MarkTPC( props );
489 job = new TPFallBackCopyJob( i+1, &props, res );
490 }
491 else
492 job = new ClassicCopyJob( i+1, &props, res );
493
494 pImpl->pJobs.push_back( job );
495 }
496 return XRootDStatus();
497 }
498
499 //----------------------------------------------------------------------------
500 // Run the copy jobs
501 //----------------------------------------------------------------------------
503 {
504 //--------------------------------------------------------------------------
505 // Get the configuration
506 //--------------------------------------------------------------------------
507 uint8_t parallelThreads = 1;
508 if( pImpl->pJobProperties.size() > 0 &&
509 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
510 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
511 {
512 PropertyList &config = *pImpl->pJobProperties.rbegin();
513 if( config.HasProperty( "parallel" ) )
514 parallelThreads = (uint8_t)config.Get<int>( "parallel" );
515 }
516
517 //--------------------------------------------------------------------------
518 // Run the show
519 //--------------------------------------------------------------------------
520 std::vector<CopyJob *>::iterator it;
521 uint16_t currentJob = 1;
522 uint16_t totalJobs = pImpl->pJobs.size();
523
524 //--------------------------------------------------------------------------
525 // Single thread
526 //--------------------------------------------------------------------------
527 if( parallelThreads == 1 )
528 {
529 XRootDStatus err;
530
531 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
532 {
533 QueuedCopyJob j( *it, progress, currentJob, totalJobs );
534 j.Run(0);
535
536 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
537 if( err.IsOK() && !st.IsOK() )
538 {
539 err = st;
540 }
541 ++currentJob;
542 }
543
544 if( !err.IsOK() ) return err;
545 }
546 //--------------------------------------------------------------------------
547 // Multiple threads
548 //--------------------------------------------------------------------------
549 else
550 {
551 uint16_t workers = std::min( (uint16_t)parallelThreads,
552 (uint16_t)pImpl->pJobs.size() );
553 JobManager jm( workers );
554 jm.Initialize();
555 if( !jm.Start() )
556 return XRootDStatus( stError, errOSError, 0,
557 "Unable to start job manager" );
558
559 XrdSysSemaphore *sem = new XrdSysSemaphore(0);
560 std::vector<QueuedCopyJob*> queued;
561 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
562 {
563 QueuedCopyJob *j = new QueuedCopyJob( *it, progress, currentJob,
564 totalJobs, sem );
565
566 queued.push_back( j );
567 jm.QueueJob(j, 0);
568 ++currentJob;
569 }
570
571 std::vector<QueuedCopyJob*>::iterator itQ;
572 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
573 sem->Wait();
574 delete sem;
575
576 if( !jm.Stop() )
577 return XRootDStatus( stError, errOSError, 0,
578 "Unable to stop job manager" );
579 jm.Finalize();
580 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
581 delete *itQ;
582
583 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
584 {
585 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
586 if( !st.IsOK() ) return st;
587 }
588 };
589 return XRootDStatus();
590 }
591
592 void CopyProcess::CleanUpJobs()
593 {
594 std::vector<CopyJob*>::iterator itJ;
595 for( itJ = pImpl->pJobs.begin(); itJ != pImpl->pJobs.end(); ++itJ )
596 {
597 CopyJob *job = *itJ;
598 URL src = job->GetSource();
599 if( src.IsMetalink() )
600 {
602 registry.Release( src );
603 }
604 delete job;
605 }
606 pImpl->pJobs.clear();
607 }
608}
const URL & GetSource() const
Get source.
virtual ~CopyProcess()
Destructor.
CopyProcess()
Constructor.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
bool Stop()
Stop the workers.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
virtual void Run(void *arg)=0
The job logic.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
TransferInfo transfer
The transfer in question.
@ EvCopyBeg
CopyBInfo: Copy operation started.
@ EvCopyEnd
CopyEInfo: Copy operation ended.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
PropertyMap::const_iterator end() const
Get the end iterator.
bool Get(const std::string &name, Item &item) const
bool HasProperty(const std::string &name) const
Check if we now about the given name.
PropertyMap::const_iterator begin() const
Get the begin iterator.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
XRootDStatus RegisterAndWait(const URL &url)
Creates a new virtual redirector and registers it (sync).
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:458
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
An interface for metadata redirectors.
virtual std::string GetTargetName() const =0
Gets the file name as specified in the metalink.
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const uint16_t errOperationExpired
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const int DefaultRetryWrtAtLBLimit
const int DefaultCPParallelChunks
const uint16_t errOSError
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
const int DefaultCpRetry
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errThresholdExceeded
const char *const DefaultCpRetryPolicy
const int DefaultCPTPCTimeout
Describe an end of copy event.
TransferInfo transfer
The transfer in question.
int sources
Number of sources used for the copy.
timeval bTOD
Copy start time.
const XRootDStatus * status
Status of the copy.
timeval eTOD
Copy end time.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static bool IsSocketError(uint16_t code)