XRootD
Loading...
Searching...
No Matches
XrdClTaskManager.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
20#include "XrdCl/XrdClLog.hh"
21#include "XrdCl/XrdClUtils.hh"
24#include "XrdSys/XrdSysE2T.hh"
25#include "XrdSys/XrdSysTimer.hh"
26
27#include <iostream>
28
29//------------------------------------------------------------------------------
30// The thread
31//------------------------------------------------------------------------------
32extern "C"
33{
34 static void *RunRunnerThread( void *arg )
35 {
36 using namespace XrdCl;
37 TaskManager *mgr = (TaskManager*)arg;
38 mgr->RunTasks();
39 return 0;
40 }
41}
42
43namespace XrdCl
44{
45 //----------------------------------------------------------------------------
46 // Constructor
47 //----------------------------------------------------------------------------
48 TaskManager::TaskManager(): pResolution(1), pRunnerThread(0), pRunning(false)
49 {}
50
51 //----------------------------------------------------------------------------
52 // Destructor
53 //----------------------------------------------------------------------------
55 {
56 TaskSet::iterator it, itE;
57 for( it = pTasks.begin(); it != pTasks.end(); ++it )
58 if( it->own )
59 delete it->task;
60 }
61
62 //----------------------------------------------------------------------------
63 // Start the manager
64 //----------------------------------------------------------------------------
66 {
67 XrdSysMutexHelper scopedLock( pOpMutex );
68 Log *log = DefaultEnv::GetLog();
69 log->Debug( TaskMgrMsg, "Starting the task manager..." );
70
71 if( pRunning )
72 {
73 log->Error( TaskMgrMsg, "The task manager is already running" );
74 return false;
75 }
76
77 int ret = ::pthread_create( &pRunnerThread, 0, ::RunRunnerThread, this );
78 if( ret != 0 )
79 {
80 log->Error( TaskMgrMsg, "Unable to spawn the task runner thread: %s",
81 XrdSysE2T( errno ) );
82 return false;
83 }
84 pRunning = true;
85 log->Debug( TaskMgrMsg, "Task manager started" );
86 return true;
87 }
88
89 //----------------------------------------------------------------------------
90 // Stop the manager
91 //----------------------------------------------------------------------------
93 {
94 XrdSysMutexHelper scopedLock( pOpMutex );
95 Log *log = DefaultEnv::GetLog();
96 log->Debug( TaskMgrMsg, "Stopping the task manager..." );
97 if( !pRunning )
98 {
99 log->Error( TaskMgrMsg, "The task manager is not running" );
100 return false;
101 }
102
103 if( ::pthread_cancel( pRunnerThread ) != 0 )
104 {
105 log->Error( TaskMgrMsg, "Unable to cancel the task runner thread: %s",
106 XrdSysE2T( errno ) );
107 return false;
108 }
109
110 void *threadRet;
111 int ret = pthread_join( pRunnerThread, (void **)&threadRet );
112 if( ret != 0 )
113 {
114 log->Error( TaskMgrMsg, "Failed to join the task runner thread: %s",
115 XrdSysE2T( errno ) );
116 return false;
117 }
118
119 pRunning = false;
120 log->Debug( TaskMgrMsg, "Task manager stopped" );
121 return true;
122 }
123
124 //----------------------------------------------------------------------------
125 // Run the given task at the given time
126 //----------------------------------------------------------------------------
127 void TaskManager::RegisterTask( Task *task, time_t time, bool own )
128 {
129 Log *log = DefaultEnv::GetLog();
130
131 log->Debug( TaskMgrMsg, "Registering task: \"%s\" to be run at: [%s]",
132 task->GetName().c_str(), Utils::TimeToString(time).c_str() );
133
134 XrdSysMutexHelper scopedLock( pMutex );
135 pTasks.insert( TaskHelper( task, time, own ) );
136 }
137
138 //--------------------------------------------------------------------------
139 // Remove a task if it hasn't run yet
140 //--------------------------------------------------------------------------
142 {
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( TaskMgrMsg, "Requesting unregistration of: \"%s\"",
145 task->GetName().c_str() );
146 XrdSysMutexHelper scopedLock( pMutex );
147 pToBeUnregistered.push_back( task );
148 }
149
150 //----------------------------------------------------------------------------
151 // Run tasks
152 //----------------------------------------------------------------------------
154 {
155 Log *log = DefaultEnv::GetLog();
156
157 //--------------------------------------------------------------------------
158 // We want the thread to be cancelable only when we sleep between tasks
159 // execution
160 //--------------------------------------------------------------------------
161 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
162
163 for(;;)
164 {
165 pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
166 pMutex.Lock();
167
168 //------------------------------------------------------------------------
169 // Remove the tasks from the active set - super inefficient,
170 // but, hopefully, never really necessary. We first need to build a list
171 // of iterators because it is impossible to remove elements from
172 // a multiset when iterating over it
173 //------------------------------------------------------------------------
174 TaskList::iterator listIt = pToBeUnregistered.begin();
175 TaskSet::iterator it, itE;
176 std::list<TaskSet::iterator> iteratorList;
177 std::list<TaskSet::iterator>::iterator itRem;
178 for( ; listIt != pToBeUnregistered.end(); ++listIt )
179 {
180 for( it = pTasks.begin(); it != pTasks.end(); ++it )
181 {
182 if( it->task == *listIt )
183 iteratorList.push_back( it );
184 }
185 }
186
187 for( itRem = iteratorList.begin(); itRem != iteratorList.end(); ++itRem )
188 {
189 Task *tsk = (*itRem)->task;
190 bool own = (*itRem)->own;
191 log->Debug( TaskMgrMsg, "Removing task: \"%s\"", tsk->GetName().c_str() );
192 pTasks.erase( *itRem );
193 if( own )
194 delete tsk;
195 }
196
197 pToBeUnregistered.clear();
198
199 //------------------------------------------------------------------------
200 // Select the tasks to be run
201 //------------------------------------------------------------------------
202 time_t now = time(0);
203 std::list<TaskHelper> toRun;
204 std::list<TaskHelper>::iterator trIt;
205
206 it = pTasks.begin();
207 itE = pTasks.upper_bound( TaskHelper( 0, now ) );
208
209 for( ; it != itE; ++it )
210 toRun.push_back( TaskHelper( it->task, 0, it->own ) );
211
212 pTasks.erase( pTasks.begin(), itE );
213 pMutex.UnLock();
214
215 //------------------------------------------------------------------------
216 // Run the tasks and reinsert them if necessary
217 //------------------------------------------------------------------------
218 for( trIt = toRun.begin(); trIt != toRun.end(); ++trIt )
219 {
220 log->Dump( TaskMgrMsg, "Running task: \"%s\"",
221 trIt->task->GetName().c_str() );
222 time_t schedule = trIt->task->Run( now );
223 if( schedule )
224 {
225 log->Dump( TaskMgrMsg, "Will rerun task \"%s\" at [%s]",
226 trIt->task->GetName().c_str(),
227 Utils::TimeToString(schedule).c_str() );
228 pMutex.Lock();
229 pTasks.insert( TaskHelper( trIt->task, schedule, trIt->own ) );
230 pMutex.UnLock();
231 }
232 else
233 {
234 log->Debug( TaskMgrMsg, "Done with task: \"%s\"",
235 trIt->task->GetName().c_str() );
236 if( trIt->own )
237 delete trIt->task;
238 }
239 }
240
241 //------------------------------------------------------------------------
242 // Enable the cancellation and go to sleep
243 //------------------------------------------------------------------------
244 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
245 pthread_testcancel();
246 XrdSysTimer::Wait( pResolution*1000 );
247 }
248 }
249}
static void * RunRunnerThread(void *arg)
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
void RegisterTask(Task *task, time_t time, bool own=true)
void RunTasks()
Run the tasks - this loops infinitely.
TaskManager()
Constructor.
bool Start()
Start the manager.
~TaskManager()
Destructor.
void UnregisterTask(Task *task)
Interface for a task to be run by the TaskManager.
const std::string & GetName() const
Name of the task.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static void Wait(int milliseconds)
const uint64_t TaskMgrMsg