XRootD
Loading...
Searching...
No Matches
XrdOfsEvs.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s E v s . c c */
4/* */
5/* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/* Based on code developed by Derek Feichtinger, CERN. */
30/******************************************************************************/
31
32#include <cctype>
33#include <cstdarg>
34#include <stddef.h>
35#include <cstdlib>
36#include <cstdio>
37#include <cstring>
38#include <sys/stat.h>
39
40#include "XrdOfs/XrdOfsEvs.hh"
41#include "XrdSys/XrdSysError.hh"
42#include "XrdOuc/XrdOucProg.hh"
44#include "XrdNet/XrdNetOpts.hh"
47
48/******************************************************************************/
49/* L o c a l C l a s s e s */
50/******************************************************************************/
51
53{
54public:
55
57char *text;
58int tlen;
60
61 XrdOfsEvsMsg(char *tval=0, int big=0)
62 {text = tval; tlen=0; isBig = big; next=0;}
63
64 ~XrdOfsEvsMsg() {if (text) free(text);}
65};
66
67/******************************************************************************/
68/* E x t e r n a l L i n k a g e s */
69/******************************************************************************/
70
71void *XrdOfsEvsSend(void *pp)
72{
73 XrdOfsEvs *evs = (XrdOfsEvs *)pp;
74 evs->sendEvents();
75 return (void *)0;
76}
77
78/******************************************************************************/
79/* S t a t i c D e f i n i t i o n s */
80/******************************************************************************/
81
82XrdOfsEvsFormat XrdOfsEvs::MsgFmt[XrdOfsEvs::nCount];
83
84const int XrdOfsEvs::minMsgSize;
85const int XrdOfsEvs::maxMsgSize;
86
87/******************************************************************************/
88/* X r d E v s F o r m a t : : D e f */
89/******************************************************************************/
90
91void XrdOfsEvsFormat::Def(evFlags theFlags, const char *Fmt, ...)
92{
93 va_list ap;
94 int theVal, i = 0;
95
96// Return if already defined
97//
98 if (Format) return;
99
100// Set flags and format. Prepare the arg vector
101//
102 Flags = theFlags;
103 Format = Fmt;
104 memset(Args, 0, sizeof(Args));
105
106// Pick up all arguments
107//
108 va_start(ap, Fmt);
109 while((theVal = va_arg(ap, int)) >= 0)
110 Args[i++] = static_cast<XrdOfsEvsInfo::evArg>(theVal);
111 va_end(ap);
112}
113
114/******************************************************************************/
115/* C o n s t r u c t o r */
116/******************************************************************************/
117
118XrdOfsEvs::XrdOfsEvs(Event theEvents, const char *Target, int minq, int maxq)
119{
120
121// Set common variables
122//
123 enEvents = static_cast<Event>(theEvents & enMask);
124 endIT = 0;
125 theTarget = strdup(Target);
126 eDest = 0;
127 theProg = 0;
128 maxMin = minq; maxMax = maxq;
129 msgFirst = msgLast = msgFreeMax = msgFreeMin = 0;
130 numMax = numMin = 0;
131 tid = 0;
132 msgFD = -1;
133
134// Initialize all static format entries that have not been initialized yet.
135// Note that format may be specified prior to this object being created!
136//
137// <tid> chmod <mode> <path>
138//
139 MsgFmt[Chmod & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s chmod %s %s\n",
142// <tid> closer <path>
143//
144 MsgFmt[Closer & Mask].Def(XrdOfsEvsFormat::Null, "%s closer %s\n",
146
147// <tid> closew <path>
148//
149 MsgFmt[Closew & Mask].Def(XrdOfsEvsFormat::Null, "%s closew %s\n",
151
152// <tid> create <mode> <path>
153//
154 MsgFmt[Create & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s create %s %s\n",
157// <tid> mkdir <mode> <path>
158//
159 MsgFmt[Mkdir & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s mkdir %s %s\n",
162// <tid> mv <path> <path>
163//
164 MsgFmt[Mv & Mask].Def(XrdOfsEvsFormat::Null, "%s mv %s %s\n",
167// <tid> openr <path>
168//
169 MsgFmt[Openr & Mask].Def(XrdOfsEvsFormat::Null, "%s openr %s\n",
171
172// <tid> openw <path>
173//
174 MsgFmt[Openw & Mask].Def(XrdOfsEvsFormat::Null, "%s openw %s\n",
176
177// <tid> rm <path>
178//
179 MsgFmt[Rm & Mask].Def(XrdOfsEvsFormat::Null, "%s rm %s\n",
181
182// <tid> rmdir <path>
183//
184 MsgFmt[Rmdir & Mask].Def(XrdOfsEvsFormat::Null, "%s rmdir %s\n",
186
187// <tid> trunc <size>
188//
189 MsgFmt[Trunc & Mask].Def(XrdOfsEvsFormat::cvtFSize,"%s trunc %s\n",
191
192// <tid> fwrite <path>
193//
194 MsgFmt[Fwrite & Mask].Def(XrdOfsEvsFormat::Null, "%s fwrite %s\n",
196}
197
198/******************************************************************************/
199/* D e s t r u c t o r */
200/******************************************************************************/
201
203{
204 XrdOfsEvsMsg *tp;
205
206// Kill the notification thread. This may cause a msg block to be orphaned
207// but, in practice, this object does not really get deleted after being
208// started. So, the problem is moot.
209//
210 endIT = 1;
211 if (tid) XrdSysThread::Kill(tid);
212
213// Release all queued message bocks
214//
215 qMut.Lock();
216 while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
217 if (theTarget) free(theTarget);
218 if (msgFD >= 0)close(msgFD);
219 if (theProg) delete theProg;
220 qMut.UnLock();
221
222// Release all free message blocks
223//
224 fMut.Lock();
225 while ((tp = msgFreeMax)) {msgFreeMax = tp->next; delete tp;}
226 while ((tp = msgFreeMin)) {msgFreeMin = tp->next; delete tp;}
227 fMut.UnLock();
228}
229
230/******************************************************************************/
231/* N o t i f y */
232/******************************************************************************/
233
235{
236 static int warnings = 0;
237 XrdOfsEvsFormat *fP;
238 XrdOfsEvsMsg *tp;
239 char modebuff[8], sizebuff[16];
240 int eNum, isBig = (eID & Mv), msgSize = (isBig ? maxMsgSize : minMsgSize);
241
242// Validate event number and set event name
243//
244 eNum = eID & Mask;
245 if (eNum < 0 || eNum >= nCount) return;
246
247// Check if we need to do any conversions
248//
249 fP = &MsgFmt[eNum];
251 {sprintf(modebuff, "%o", static_cast<int>((Info.FMode() & S_IAMB)));
252 Info.Set(XrdOfsEvsInfo::evFMODE, modebuff);
253 } else Info.Set(XrdOfsEvsInfo::evFMODE, "$FMODE");
255 {sprintf(sizebuff, "%lld", Info.FSize());
256 Info.Set(XrdOfsEvsInfo::evFSIZE, sizebuff);
257 } else Info.Set(XrdOfsEvsInfo::evFSIZE, "$FSIZE");
258
259// Get a message block
260//
261 if (!(tp = getMsg(isBig)))
262 {if ((++warnings & 0xff) == 1)
263 {eDest->Emsg("Notify", "Ran out of message objects;", eName(eNum),
264 "event notification not sent.");
265 }
266 return;
267 }
268
269// Format the message
270//
271 tp->tlen = fP->SNP(Info, tp->text, msgSize);
272
273// Put the message on the queue and return
274//
275 tp->next = 0;
276 qMut.Lock();
277 if (msgLast) {msgLast->next = tp; msgLast = tp;}
278 else msgFirst = msgLast = tp;
279 qMut.UnLock();
280 qSem.Post();
281}
282
283/******************************************************************************/
284/* P a r s e */
285/******************************************************************************/
286
287int XrdOfsEvs::Parse(XrdSysError &Eroute, XrdOfsEvs::Event eNum, char *mText)
288{
289 static struct valVar {const char *vname;
292 Vars[] = {
302 };
303 int numvars = sizeof(Vars)/sizeof(struct valVar);
304 char parms[1024], *pP = parms;
305 char *pE = parms+sizeof(parms)-((XrdOfsEvsInfo::evARGS*2)-8);
306 char varbuff[16], *bVar, *eVar;
307 int i, j, aNum = 0, Args[XrdOfsEvsInfo::evARGS] = {0};
309
310// Parse the text
311//
312 parms[0] = '\0';
313 while(*mText && pP < pE)
314 {if (*mText == '\\' && *(mText+1) == '$')
315 {*pP++ = '$'; mText += 2; continue;}
316 else if (*mText != '$') {*pP++ = *mText++; continue;}
317 bVar = mText+1;
318 if (*mText == '{') {eVar = index(mText, '}'); j = 1;}
319 else if (*mText == '[') {eVar = index(mText, ']'); j = 1;}
320 else {eVar = bVar; while(isalpha(*eVar)) eVar++; j = 0;}
321 i = eVar - bVar;
322 if (i < 1 || i >= (int)sizeof(varbuff))
323 {Eroute.Emsg("Parse","Invalid notifymsg variable starting at",mText);
324 return 1;
325 }
326 strncpy(varbuff, bVar, i); varbuff[i] = '\0';
327 for (i = 0; i < numvars; i++)
328 if (!strcmp(varbuff, Vars[i].vname)) break;
329 if (i >= numvars)
330 {Eroute.Emsg("Parse", "Unknown notifymsg variable -",varbuff);
331 return 1;
332 }
333 if (aNum >= XrdOfsEvsInfo::evARGS)
334 {Eroute.Say("Parse", "Too many notifymsg variables"); return 1;}
335 strcpy(pP, "%s"); pP += 2;
336 Args[aNum++] = Vars[i].vnum;
337 ArgOpts = static_cast<XrdOfsEvsFormat::evFlags>(ArgOpts|Vars[i].vopt);
338 mText = eVar+j;
339 }
340
341// Check if we overran the buffer or didn't have any text
342//
343 if (pP >= pE)
344 {Eroute.Emsg("Parse","notifymsg text too long");return 1;}
345 if (!parms[0])
346 {Eroute.Emsg("Parse","notifymsg text not specified");return 1;}
347
348// Set the format
349//
350 strcpy(pP, "\n");
351 eNum = static_cast<Event>(eNum & Mask);
352 MsgFmt[eNum].Set(ArgOpts, strdup(parms), Args);
353
354// All done
355//
356 return 0;
357}
358
359/******************************************************************************/
360/* s e n d E v e n t s */
361/******************************************************************************/
362
364{
365 XrdOfsEvsMsg *tp;
366 const char *theData[2] = {0,0};
367 int theDlen[2] = {0,0};
368
369// This is an endless loop that just gets things off the event queue and
370// send them out. This allows us to only hang a simgle thread should the
371// receiver get blocked, instead of the whole process.
372//
373 while(1)
374 {qSem.Wait();
375 qMut.Lock();
376 if (endIT) break;
377 if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
378 qMut.UnLock();
379 if (tp)
380 {if (!theProg) Feed(tp->text, tp->tlen);
381 else {theData[0] = tp->text; theDlen[0] = tp->tlen;
382 theProg->Feed(theData, theDlen);
383 }
384 retMsg(tp);
385 }
386 }
387 qMut.UnLock();
388}
389
390/******************************************************************************/
391/* S t a r t */
392/******************************************************************************/
393
395{
396 int rc;
397
398// Set the error object pointer
399//
400 eDest = eobj;
401
402// Check if we need to create a socket to a path
403//
404 if (*theTarget == '>')
405 {XrdNetSocket *msgSock;
406 if (!(msgSock = XrdNetSocket::Create(eobj,theTarget+1,0,0660,XRDNET_FIFO)))
407 return -1;
408 msgFD = msgSock->Detach();
409 delete msgSock;
410
411 } else {
412
413 // Allocate a new program object if we don't have one
414 //
415 if (theProg) return 0;
416 theProg = new XrdOucProg(eobj);
417
418 // Setup the program
419 //
420 if (theProg->Setup(theTarget, eobj)) return -1;
421 if ((rc = theProg->Start()))
422 {eobj->Emsg("Evs", rc, "start event collector"); return -1;}
423 }
424
425// Now start a thread to get messages and send them to the collector
426//
427 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvsSend, static_cast<void *>(this),
428 0, "Event notification sender")))
429 {eobj->Emsg("Evs", rc, "create event notification thread");
430 return -1;
431 }
432
433// All done
434//
435 return 0;
436}
437
438/******************************************************************************/
439/* P r i v a t e M e t h o d s */
440/******************************************************************************/
441/******************************************************************************/
442/* e N a m e */
443/******************************************************************************/
444
445const char *XrdOfsEvs::eName(int eNum)
446{
447 static const char *eventName[] = {"Chmod", "closer", "closew", "create",
448 "fwrite", "mkdir", "mv", "openr",
449 "opnw", "rm", "rmdir", "trunc"};
450
451 eNum = (eNum & Mask);
452 return (eNum < 0 || eNum >= nCount ? "?" : eventName[eNum]);
453}
454
455/******************************************************************************/
456/* F e e d */
457/******************************************************************************/
458
459int XrdOfsEvs::Feed(const char *data, int dlen)
460{
461 int retc;
462
463// Write the data. ince this is a udp socket all the data goes or none does
464//
465 do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
466 while (retc < 0 && errno == EINTR);
467 if (retc < 0)
468 {eDest->Emsg("EvsFeed", errno, "write to event socket", theTarget);
469 return -1;
470 }
471
472// All done
473//
474 return 0;
475}
476
477/******************************************************************************/
478/* g e t M s g */
479/******************************************************************************/
480
481XrdOfsEvsMsg *XrdOfsEvs::getMsg(int bigmsg)
482{
483 XrdOfsEvsMsg *tp;
484 int msz = 0;
485
486// Lock the free queue
487//
488 fMut.Lock();
489
490// Get a free element from the big or small queue, as needed
491//
492 if (bigmsg)
493 if ((tp = msgFreeMax)) msgFreeMax = tp->next;
494 else msz = maxMsgSize;
495 else if ((tp = msgFreeMin)) msgFreeMin = tp->next;
496 else msz = minMsgSize;
497
498// Check if we have to allocate a new item
499//
500 if (!tp && (numMax + numMin) < (maxMax + maxMin))
501 {if ((tp = new XrdOfsEvsMsg((char *)malloc(msz), bigmsg)))
502 {if (!(tp->text)) {delete tp; tp = 0;}
503 else if (bigmsg) numMax++;
504 else numMin++;
505 }
506 }
507
508// Unlock and return result
509//
510 fMut.UnLock();
511 return tp;
512}
513
514/******************************************************************************/
515/* r e t M s g */
516/******************************************************************************/
517
518void XrdOfsEvs::retMsg(XrdOfsEvsMsg *tp)
519{
520
521// Lock the free queue
522//
523 fMut.Lock();
524
525// Check if we exceeded the hold quotax
526//
527 if (tp->isBig)
528 if (numMax > maxMax) {delete tp; numMax--;}
529 else {tp->next = msgFreeMax; msgFreeMax = tp;}
530 else
531 if (numMin > maxMin) {delete tp; numMin--;}
532 else {tp->next = msgFreeMin; msgFreeMin = tp;}
533
534// Unlock and return
535//
536 fMut.UnLock();
537}
#define S_IAMB
Definition XrdConfig.cc:159
#define XRDNET_FIFO
Definition XrdNetOpts.hh:83
void * XrdOfsEvsSend(void *pp)
Definition XrdOfsEvs.cc:71
#define close(a)
Definition XrdPosix.hh:43
#define write(a, b, c)
Definition XrdPosix.hh:110
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
int SNP(XrdOfsEvsInfo &Info, char *buff, int blen)
Definition XrdOfsEvs.hh:97
XrdOfsEvsInfo::evArg Args[XrdOfsEvsInfo::evARGS]
Definition XrdOfsEvs.hh:95
void Def(evFlags theFlags, const char *Fmt,...)
Definition XrdOfsEvs.cc:91
const char * Format
Definition XrdOfsEvs.hh:93
void Set(evFlags theFlags, const char *Fmt, int *fullArgs)
Definition XrdOfsEvs.hh:106
XrdOfsEvsMsg * next
Definition XrdOfsEvs.cc:56
XrdOfsEvsMsg(char *tval=0, int big=0)
Definition XrdOfsEvs.cc:61
char * text
Definition XrdOfsEvs.cc:57
static int Parse(XrdSysError &Eroute, Event eNum, char *mText)
Definition XrdOfsEvs.cc:287
XrdOfsEvs(Event theEvents, const char *Target, int minq=90, int maxq=10)
Definition XrdOfsEvs.cc:118
static const int maxMsgSize
Definition XrdOfsEvs.hh:137
void sendEvents(void)
Definition XrdOfsEvs.cc:363
int Start(XrdSysError *eobj)
Definition XrdOfsEvs.cc:394
static const int minMsgSize
Definition XrdOfsEvs.hh:136
void Notify(Event eNum, XrdOfsEvsInfo &Info)
Definition XrdOfsEvs.cc:234
int Start(void)
int Feed(const char *data[], const int dlen[])
Definition XrdOucProg.cc:63
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static int Kill(pthread_t tid)