XRootD
Loading...
Searching...
No Matches
XrdOssCsiPagesUnaligned.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s C s i P a g e s U n a l i g n e d . c c */
4/* */
5/* (C) Copyright 2021 CERN. */
6/* */
7/* This file is part of the XRootD software suite. */
8/* */
9/* XRootD is free software: you can redistribute it and/or modify it under */
10/* the terms of the GNU Lesser General Public License as published by the */
11/* Free Software Foundation, either version 3 of the License, or (at your */
12/* option) any later version. */
13/* */
14/* In applying this licence, CERN does not waive the privileges and */
15/* immunities granted to it by virtue of its status as an Intergovernmental */
16/* Organization or submit itself to any jurisdiction. */
17/* */
18/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21/* License for more details. */
22/* */
23/* You should have received a copy of the GNU Lesser General Public License */
24/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26/* */
27/* The copyright holder's institutional names and contributor's names may not */
28/* be used to endorse or promote products derived from this software without */
29/* specific prior written permission of the institution or contributor. */
30/******************************************************************************/
31
32#include "XrdOssCsiTrace.hh"
33#include "XrdOssCsiPages.hh"
34#include "XrdOssCsiCrcUtils.hh"
35#include "XrdOuc/XrdOucCRC.hh"
37
38#include <vector>
39#include <assert.h>
40
43
44//
45// UpdateRangeHoleUntilPage
46//
47// Used pgWrite/Write (both aligned and unaligned cases) when extending a file
48// with implied zeros after then current end of file and the new one.
49// fd (data file descriptor pointer) required only when last page in file is partial.
50// current implementation does not use fd in this case, but requires it be set.
51//
52int XrdOssCsiPages::UpdateRangeHoleUntilPage(XrdOssDF *fd, const off_t until, const Sizes_t &sizes)
53{
54 EPNAME("UpdateRangeHoleUntilPage");
55
56 static const uint32_t crczero = CrcUtils.crc32c_extendwith_zero(0u, XrdSys::PageSize);
57 static const std::vector<uint32_t> crc32Vec(stsize_, crczero);
58
59 const off_t trackinglen = sizes.first;
60 const off_t tracked_page = trackinglen / XrdSys::PageSize;
61 if (until <= tracked_page) return 0;
62
63 const size_t tracked_off = trackinglen % XrdSys::PageSize;
64
65 // if last tracked page is before page "until" extend it
66 if (tracked_off>0)
67 {
68 if (fd == NULL)
69 {
70 TRACE(Warn, "Unexpected partially filled last page " << fn_);
71 return -EDOM;
72 }
73
74 uint32_t prevtag;
75 const ssize_t rret = ts_->ReadTags(&prevtag, tracked_page, 1);
76 if (rret < 0)
77 {
78 TRACE(Warn, TagsReadError(tracked_page, 1, rret));
79 return rret;
80 }
81
82 // extend prevtag up to PageSize. If there is a mismatch it will only be
83 // discovered during a later read (but this saves a read now).
84 const uint32_t crc32c = CrcUtils.crc32c_extendwith_zero(prevtag, XrdSys::PageSize - tracked_off);
85 const ssize_t wret = ts_->WriteTags(&crc32c, tracked_page, 1);
86 if (wret < 0)
87 {
88 TRACE(Warn, TagsWriteError(tracked_page, 1, wret) << " (prev)");
89 return wret;
90 }
91 }
92
93 if (!writeHoles_) return 0;
94
95 const off_t nAllEmpty = (tracked_off>0) ? (until - tracked_page - 1) : (until - tracked_page);
96 const off_t firstEmpty = (tracked_off>0) ? (tracked_page + 1) : tracked_page;
97
98 off_t towrite = nAllEmpty;
99 off_t nwritten = 0;
100 while(towrite>0)
101 {
102 const size_t nw = std::min(towrite, (off_t)crc32Vec.size());
103 const ssize_t wret = ts_->WriteTags(&crc32Vec[0], firstEmpty+nwritten, nw);
104 if (wret<0)
105 {
106 TRACE(Warn, TagsWriteError(firstEmpty+nwritten, nw, wret) << " (new)");
107 return wret;
108 }
109 towrite -= wret;
110 nwritten += wret;
111 }
112
113 return 0;
114}
115
116// UpdateRangeUnaligned
117//
118// Used by Write for various cases with mis-alignment that need checksum recalculation. See StoreRangeUnaligned for list of conditions.
119//
120int XrdOssCsiPages::UpdateRangeUnaligned(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
121{
122 return StoreRangeUnaligned(fd, buff, offset, blen, sizes, NULL);
123}
124
125//
126// used by StoreRangeUnaligned when the supplied data does not cover the whole of the first corresponding page in the file
127//
128// offset: offset in file for start of write
129// blen: length of write in first page
130//
131int XrdOssCsiPages::StoreRangeUnaligned_preblock(XrdOssDF *const fd, const void *const buff, const size_t blen,
132 const off_t offset, const off_t trackinglen,
133 const uint32_t *const csvec, uint32_t &prepageval)
134{
135 EPNAME("StoreRangeUnaligned_preblock");
136 const off_t p1 = offset / XrdSys::PageSize;
137 const size_t p1_off = offset % XrdSys::PageSize;
138
139 const off_t tracked_page = trackinglen / XrdSys::PageSize;
140 const size_t tracked_off = trackinglen % XrdSys::PageSize;
141
142 if (p1 > tracked_page)
143 {
144 // the start of will have a number of implied zero bytes
145 uint32_t crc32c = CrcUtils.crc32c_extendwith_zero(0u, p1_off);
146 if (csvec)
147 {
148 crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
149 }
150 else
151 {
152 crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
153 }
154 prepageval = crc32c;
155 return 0;
156 }
157
158 // we're appending, or appending within the last page after a gap of zeros
159 if (p1 == tracked_page && p1_off >= tracked_off)
160 {
161 // appending: with or without some implied zeros.
162
163 // zero initialised value may be used
164 uint32_t crc32v = 0;
165 if (tracked_off > 0)
166 {
167 const ssize_t rret = ts_->ReadTags(&crc32v, p1, 1);
168 if (rret<0)
169 {
170 TRACE(Warn, TagsReadError(p1, 1, rret) << " (append)");
171 return rret;
172 }
173 }
174
175 uint32_t crc32c = 0;
176
177 // only do the loosewrite extending check one time for the page which was the
178 // last page according to the trackinglen at time the check was configured (open or size-resync).
179 // don't do the check every time because it needs an extra read compared to the non loose case;
180 // checklastpg_ is checked and modified here, but is protected from concurrent
181 // access because of the condition that p1==lastpgforloose_
182
184 {
185 checklastpg_ = false;
186 uint8_t b[XrdSys::PageSize];
187
188 // this will reissue read() until eof, or tracked_off bytes read but accept up to PageSize
189 const ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * p1, XrdSys::PageSize, tracked_off);
190
191 if (rlen<0)
192 {
193 TRACE(Warn, PageReadError(tracked_off, p1, rlen));
194 return rlen;
195 }
196 memset(&b[rlen], 0, XrdSys::PageSize - rlen);
197
198 // in the loose-write mode, the new crc is based on the crc of data
199 // read from file up to p1_off, not on the previously stored tag.
200 // However must check if the data read were consistent with stored tag (crc32v)
201
202 uint32_t crc32x = XrdOucCRC::Calc32C(b, tracked_off, 0u);
203 crc32c = XrdOucCRC::Calc32C(&b[tracked_off], p1_off-tracked_off, crc32x);
204
205 do
206 {
207 if (static_cast<size_t>(rlen) == tracked_off)
208 {
209 // this is the expected match
210 if (tracked_off==0 || crc32x == crc32v) break;
211 }
212
213 // any bytes on disk beyond p1_off+blan would not be included in the new crc.
214 // if tracked_off==0 we have no meaningful crc32v value.
215 if ((tracked_off>0 || p1_off==0) && static_cast<size_t>(rlen) <= p1_off+blen)
216 {
217
218 if (tracked_off != 0)
219 {
220 TRACE(Warn, CRCMismatchError(tracked_off, p1, crc32x, crc32v) << " (loose match, still trying)");
221 }
222
223 // there was no tag recorded for the page, and we're completely overwriting anything on disk in the page
224 if (tracked_off==0)
225 {
226 TRACE(Warn, "Recovered page with no tag at offset " << (XrdSys::PageSize * p1) <<
227 " of file " << fn_ << " rlen=" << rlen << " (append)");
228 break;
229 }
230
231 if (static_cast<size_t>(rlen) != tracked_off && rlen>0)
232 {
233 crc32x = XrdOucCRC::Calc32C(b, rlen, 0u);
234 if (crc32x == crc32v)
235 {
236 TRACE(Warn, "Recovered page at offset " << (XrdSys::PageSize * p1)+p1_off << " of file " << fn_ << " (append)");
237 break;
238 }
239 TRACE(Warn, CRCMismatchError(rlen, p1, crc32x, crc32v) << " (loose match, still trying)");
240 }
241
242 memcpy(&b[p1_off], buff, blen);
243 crc32x = XrdOucCRC::Calc32C(b, p1_off+blen, 0u);
244 if (crc32x == crc32v)
245 {
246 TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p1)+p1_off <<
247 " of file " << fn_ << " (append)");
248 break;
249 }
250 TRACE(Warn, CRCMismatchError(p1_off+blen, p1, crc32x, crc32v) << " (append)");
251 }
252 else
253 {
254 if (tracked_off>0)
255 {
256 TRACE(Warn, CRCMismatchError(tracked_off, p1, crc32x, crc32v) << " (append)");
257 }
258 else
259 {
260 TRACE(Warn, "Unexpected content, write at page at offset " << (XrdSys::PageSize * p1) <<
261 " of file " << fn_ << ", offset-in-page=" << p1_off << " rlen=" << rlen << " (append)");
262 }
263 }
264 return -EDOM;
265 } while(0);
266 }
267 else
268 {
269 // non-loose case;
270 // can recalc crc with new data without re-reading existing partial block's data
271 const size_t nz = p1_off - tracked_off;
273 }
274
275 // crc32c is crc up to p1_off. Now add the user's data.
276 if (csvec)
277 {
278 crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
279 }
280 else
281 {
282 crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
283 }
284 prepageval = crc32c;
285 return 0;
286 }
287
288 const size_t bavail = (p1==tracked_page) ? tracked_off : XrdSys::PageSize;
289
290 // assert we're overwriting some (or all) of the previous data (other case was above)
291 assert(p1_off < bavail);
292
293 // case p1_off==0 && blen>=bavail is either handled by aligned case (p1==tracked_page)
294 // or not sent to preblock, so will need to read some preexisting data
295 assert(p1_off !=0 || blen<bavail);
296 uint8_t b[XrdSys::PageSize];
297
298 uint32_t crc32v;
299 ssize_t rret = ts_->ReadTags(&crc32v, p1, 1);
300 if (rret<0)
301 {
302 TRACE(Warn, TagsReadError(p1, 1, rret) << " (overwrite)");
303 return rret;
304 }
305
306 // in either loosewrite or non-loosewrite a read-modify-write sequence is done and the
307 // final crc is that of the modified block. The difference between loose and non-loose
308 // case if that the looser checks are done on the block.
309 //
310 // in either case there are implicit verification(s) (e.g. pgWrite may return EDOM without Verify requested)
311 // as it's not clear if there is a meaningful way to crc a mismatching page during a partial overwrite
312
313 if (loosewrite_)
314 {
315 // this will reissue read() until eof, or bavail bytes read but accept up to PageSize
316 const ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * p1, XrdSys::PageSize, bavail);
317 if (rlen<0)
318 {
319 TRACE(Warn, PageReadError(bavail, p1, rlen));
320 return rlen;
321 }
322 memset(&b[rlen], 0, XrdSys::PageSize - rlen);
323 do
324 {
325 uint32_t crc32c = XrdOucCRC::Calc32C(b, bavail, 0U);
326 // this is the expected case
327 if (static_cast<size_t>(rlen) == bavail && crc32c == crc32v) break;
328
329 // after this write there will be nothing changed between p1_off+blen
330 // and bavail; if there is nothing on disk in this range it will not
331 // be added by the write. So don't try to match crc with implied zero
332 // in this range. Beyond bavail bytes on disk will not be included
333 // in the new crc.
334 const size_t rmin = (p1_off+blen < bavail) ? bavail : 0;
335 if (static_cast<size_t>(rlen) >= rmin && static_cast<size_t>(rlen)<=bavail)
336 {
337 if (crc32c == crc32v)
338 {
339 TRACE(Warn, "Recovered page at offset " << (XrdSys::PageSize * p1) << " of file " << fn_ << " (overwrite)");
340 break;
341 }
342 TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v) << " (loose match, still trying)");
343
344 if (static_cast<size_t>(rlen) != bavail && rlen > 0)
345 {
346 crc32c = XrdOucCRC::Calc32C(b, rlen, 0U);
347 if (crc32c == crc32v)
348 {
349 TRACE(Warn, "Recovered page (2) at offset " << (XrdSys::PageSize * p1) << " of file " << fn_ << " (overwrite)");
350 break;
351 }
352 TRACE(Warn, CRCMismatchError(rlen, p1, crc32c, crc32v) << " (loose match, still trying)");
353 }
354
355 memcpy(&b[p1_off], buff, blen);
356 const size_t vl = std::max(bavail, p1_off+blen);
357 crc32c = XrdOucCRC::Calc32C(b, vl, 0U);
358 if (crc32c == crc32v)
359 {
360 TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p1)+p1_off << " of file " << fn_ << " (overwrite)");
361 break;
362 }
363 TRACE(Warn, CRCMismatchError(vl, p1, crc32c, crc32v) << " (overwrite)");
364 }
365 else
366 {
367 TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v) << " (overwrite)");
368 }
369 return -EDOM;
370 } while(0);
371 }
372 else
373 {
374 // non-loose case
375 rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize * p1, bavail);
376 if (rret<0)
377 {
378 TRACE(Warn, PageReadError(bavail, p1, rret));
379 return rret;
380 }
381 const uint32_t crc32c = XrdOucCRC::Calc32C(b, bavail, 0U);
382 if (crc32v != crc32c)
383 {
384 TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v));
385 return -EDOM;
386 }
387 }
388
389 uint32_t crc32c = XrdOucCRC::Calc32C(b, p1_off, 0U);
390 if (csvec)
391 {
392 crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
393 }
394 else
395 {
396 crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
397 }
398 if (p1_off+blen < bavail)
399 {
400 const uint32_t cl = XrdOucCRC::Calc32C(&b[p1_off+blen], bavail-p1_off-blen, 0U);
401 crc32c = CrcUtils.crc32c_combine(crc32c, cl, bavail-p1_off-blen);
402 }
403 prepageval = crc32c;
404 return 0;
405}
406
407//
408// used by StoreRangeUnaligned when the end of supplied data is not page aligned
409// and is before the end of file
410//
411// offset: first offset in file at which write is page aligned
412// blen: length of write after offset
413//
414int XrdOssCsiPages::StoreRangeUnaligned_postblock(XrdOssDF *const fd, const void *const buff, const size_t blen,
415 const off_t offset, const off_t trackinglen,
416 const uint32_t *const csvec, uint32_t &lastpageval)
417{
418 EPNAME("StoreRangeUnaligned_postblock");
419
420 const uint8_t *const p = (uint8_t*)buff;
421 const off_t p2 = (offset+blen) / XrdSys::PageSize;
422 const size_t p2_off = (offset+blen) % XrdSys::PageSize;
423
424 const off_t tracked_page = trackinglen / XrdSys::PageSize;
425 const size_t tracked_off = trackinglen % XrdSys::PageSize;
426
427 // we should not be called in this case
428 assert(p2_off != 0);
429
430 // how much existing data this last (p2) page
431 const size_t bavail = (p2==tracked_page) ? tracked_off : XrdSys::PageSize;
432
433 // how much of that data will not be overwritten
434 const size_t bremain = (p2_off < bavail) ? bavail-p2_off : 0;
435
436 // we should not be called if it is a complete overwrite
437 assert(bremain>0);
438
439 // need to use remaining data to calculate the crc of the new p2 page.
440 // read and verify it now.
441
442 uint32_t crc32v;
443 ssize_t rret = ts_->ReadTags(&crc32v, p2, 1);
444 if (rret<0)
445 {
446 TRACE(Warn, TagsReadError(p2, 1, rret));
447 return rret;
448 }
449
450 uint8_t b[XrdSys::PageSize];
451 rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize * p2, bavail);
452 if (rret<0)
453 {
454 TRACE(Warn, PageReadError(bavail, p2, rret));
455 return rret;
456 }
457
458 // calculate crc of new data with remaining data at the end:
459 uint32_t crc32c = 0;
460 if (csvec)
461 {
462 crc32c = csvec[(blen-1)/XrdSys::PageSize];
463 }
464 else
465 {
466 crc32c = XrdOucCRC::Calc32C(&p[blen-p2_off], p2_off, 0U);
467 }
468
469 const uint32_t cl = XrdOucCRC::Calc32C(&b[p2_off], bremain, 0U);
470 // crc of page with new data
471 crc32c = CrcUtils.crc32c_combine(crc32c, cl, bremain);
472 // crc of current page (before write)
473 const uint32_t crc32prev = XrdOucCRC::Calc32C(b, bavail, 0U);
474
475 // check(s) to see if remaining data was valid
476
477 // usual check; unmodified block is consistent with stored crc
478 // for loose write we allow case were the new crc has already been stored in the tagfile
479
480 // this may be an implicit verification (e.g. pgWrite may return EDOM without Verify requested)
481 // however, it's not clear if there is a meaningful way to crc a mismatching page during a partial overwrite
482 if (crc32v != crc32prev)
483 {
484 if (loosewrite_ && crc32c != crc32prev)
485 {
486 // log that we chceked if the tag was matching the previous data
487 TRACE(Warn, CRCMismatchError(bavail, p2, crc32prev, crc32v) << " (loose match, still trying)");
488 if (crc32c == crc32v)
489 {
490 TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p2) << " of file " << fn_);
491 lastpageval = crc32c;
492 return 0;
493 }
494 TRACE(Warn, CRCMismatchError(bavail, p2, crc32c, crc32v));
495 }
496 else
497 {
498 TRACE(Warn, CRCMismatchError(bavail, p2, crc32prev, crc32v));
499 }
500 return -EDOM;
501 }
502
503 lastpageval = crc32c;
504 return 0;
505}
506
507//
508// StoreRangeUnaligned
509//
510// Used by pgWrite or Write (via UpdateRangeUnaligned) where the start of this update is not page aligned within the file
511// OR where the end of this update is before the end of the file and is not page aligned
512// OR where end of the file is not page aligned and this update starts after it
513// i.e. where checksums of last current page of file, or the first or last pages after writing this buffer will need to be recomputed
514//
515int XrdOssCsiPages::StoreRangeUnaligned(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, const Sizes_t &sizes, const uint32_t *const csvec)
516{
517 EPNAME("StoreRangeUnaligned");
518 const off_t p1 = offset / XrdSys::PageSize;
519
520 const off_t trackinglen = sizes.first;
521 if (offset > trackinglen)
522 {
523 const int ret = UpdateRangeHoleUntilPage(fd, p1, sizes);
524 if (ret<0)
525 {
526 TRACE(Warn, "Error updating tags for holes, error=" << ret);
527 return ret;
528 }
529 }
530
531 const size_t p1_off = offset % XrdSys::PageSize;
532 const size_t p2_off = (offset+blen) % XrdSys::PageSize;
533
534 bool hasprepage = false;
535 uint32_t prepageval;
536
537 // deal with partial first page
538 if ( p1_off>0 || blen < static_cast<size_t>(XrdSys::PageSize) )
539 {
540 const size_t bavail = (XrdSys::PageSize-p1_off > blen) ? blen : (XrdSys::PageSize-p1_off);
541 const int ret = StoreRangeUnaligned_preblock(fd, buff, bavail, offset, trackinglen, csvec, prepageval);
542 if (ret<0)
543 {
544 return ret;
545 }
546 hasprepage = true;
547 }
548
549 // next page (if any)
550 const off_t np = hasprepage ? p1+1 : p1;
551 // next page starts at buffer offset
552 const size_t npoff = hasprepage ? (XrdSys::PageSize - p1_off) : 0;
553
554 // anything in next page?
555 if (blen <= npoff)
556 {
557 // only need to write the first, partial page
558 if (hasprepage)
559 {
560 const ssize_t wret = ts_->WriteTags(&prepageval, p1, 1);
561 if (wret<0)
562 {
563 TRACE(Warn, TagsWriteError(p1, 1, wret));
564 return wret;
565 }
566 }
567 return 0;
568 }
569
570 const uint8_t *const p = (uint8_t*)buff;
571 const uint32_t *csp = csvec;
572 if (csp && hasprepage) csp++;
573
574 // see if there will be no old data to account for in the last page
575 if (p2_off == 0 || (offset + blen >= static_cast<size_t>(trackinglen)))
576 {
577 // write any precomputed prepage, then write full pages and last partial page (computing or using supplied csvec)
578 const ssize_t aret = apply_sequential_aligned_modify(&p[npoff], np, blen-npoff, csp, hasprepage, false, prepageval, 0U);
579 if (aret<0)
580 {
581 TRACE(Warn, "Error updating tags, error=" << aret);
582 return aret;
583 }
584 return 0;
585 }
586
587 // last page contains existing data that has to be read to modify it
588
589 uint32_t lastpageval;
590 const int ret = StoreRangeUnaligned_postblock(fd, &p[npoff], blen-npoff, offset+npoff, trackinglen, csp, lastpageval);
591 if (ret<0)
592 {
593 return ret;
594 }
595
596 // write any precomputed prepage, then write full pages (computing or using supplied csvec) and finally write precomputed last page
597 const ssize_t aret = apply_sequential_aligned_modify(&p[npoff], np, blen-npoff, csp, hasprepage, true, prepageval, lastpageval);
598 if (aret<0)
599 {
600 TRACE(Warn, "Error updating tags, error=" << aret);
601 return aret;
602 }
603
604 return 0;
605}
606
607// VerifyRangeUnaligned
608//
609// Used by Read for various cases with mis-alignment. See FetchRangeUnaligned for list of conditions.
610//
611int XrdOssCsiPages::VerifyRangeUnaligned(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
612{
613 return FetchRangeUnaligned(fd, buff, offset, blen, sizes, NULL, XrdOssDF::Verify);
614}
615
616//
617// used by FetchRangeUnaligned when only part of the data in the first page is needed, or the page is short
618//
619// offset: offset in file for start of read
620// blen: total length of read
621//
622int XrdOssCsiPages::FetchRangeUnaligned_preblock(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen,
623 const off_t trackinglen, uint32_t *const tbuf, uint32_t *const csvec, const uint64_t opts)
624{
625 EPNAME("FetchRangeUnaligned_preblock");
626
627 const off_t p1 = offset / XrdSys::PageSize;
628 const size_t p1_off = offset % XrdSys::PageSize;
629
630 // bavail is length of data in this page
631 const size_t bavail = std::min(trackinglen - (XrdSys::PageSize*p1), (off_t)XrdSys::PageSize);
632
633 // bcommon is length of data in this page that user wants
634 const size_t bcommon = std::min(bavail - p1_off, blen);
635
636 uint8_t b[XrdSys::PageSize];
637 const uint8_t *ub = (uint8_t*)buff;
638 if (bavail>bcommon)
639 {
640 // will need more data to either verify or return crc of the user's data
641 // (in case of no verify and no csvec FetchRange() returns early)
642 const ssize_t rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize*p1, bavail);
643 if (rret<0)
644 {
645 TRACE(Warn, PageReadError(bavail, p1, rret));
646 return rret;
647 }
648 // if we're going to verify, make sure we just read the same overlapping data as that in the user's buffer
649 if ((opts & XrdOssDF::Verify))
650 {
651 if (memcmp(buff, &b[p1_off], bcommon))
652 {
653 size_t badoff;
654 for(badoff=0;badoff<bcommon;badoff++) { if (((uint8_t*)buff)[badoff] != b[p1_off+badoff]) break; }
655 badoff = (badoff < bcommon) ? badoff : 0; // may be possible with concurrent modification
656 TRACE(Warn, ByteMismatchError(bavail, XrdSys::PageSize*p1+p1_off+badoff, ((uint8_t*)buff)[badoff], b[p1_off+badoff]));
657 return -EDOM;
658 }
659 }
660 ub = b;
661 }
662 // verify; based on whole block, or user's buffer (if it contains the whole block)
663 if ((opts & XrdOssDF::Verify))
664 {
665 const uint32_t crc32calc = XrdOucCRC::Calc32C(ub, bavail, 0U);
666 if (tbuf[0] != crc32calc)
667 {
668 TRACE(Warn, CRCMismatchError(bavail, p1, crc32calc, tbuf[0]));
669 return -EDOM;
670 }
671 }
672
673 // if we're returning csvec values and this first block
674 // needs adjustment because user requested a subset..
675 if (bavail>bcommon && csvec)
676 {
677 // make sure csvec[0] corresponds to only the data the user wanted, not whole page.
678 // if we have already verified the page + common part matches user's, take checksum of common.
679 // (Use local copy of page, perhaps less chance of accidental concurrent modification than buffer)
680 // Otherwise base on saved checksum.
681 if ((opts & XrdOssDF::Verify))
682 {
683 csvec[0] = XrdOucCRC::Calc32C(&b[p1_off], bcommon, 0u);
684 }
685 else
686 {
687 // calculate expected user checksum based on block's recorded checksum, adjusting
688 // for data not included in user's request. If either the returned data or the
689 // data not included in the user's request are corrupt the returned checksum and
690 // returned data will (probably) mismatch.
691
692 // remove block data before p1_off from checksum
693 uint32_t crc32c = XrdOucCRC::Calc32C(b, p1_off, 0u);
694 csvec[0] = CrcUtils.crc32c_split2(csvec[0], crc32c, bavail-p1_off);
695
696 // remove block data after p1_off+bcommon upto bavail
697 crc32c = XrdOucCRC::Calc32C(&b[p1_off+bcommon], bavail-p1_off-bcommon, 0u);
698 csvec[0] = CrcUtils.crc32c_split1(csvec[0], crc32c, bavail-p1_off-bcommon);
699 }
700 }
701 return 0;
702}
703
704//
705// used by FetchRangeUnaligned when only part of a page of data is needed from the last page
706//
707// offset: offset in file for start of read
708// blen: total length of read
709//
710int XrdOssCsiPages::FetchRangeUnaligned_postblock(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen,
711 const off_t trackinglen, uint32_t *const tbuf, uint32_t *const csvec, const size_t tidx, const uint64_t opts)
712{
713 EPNAME("FetchRangeUnaligned_postblock");
714
715 const off_t p2 = (offset+blen) / XrdSys::PageSize;
716 const size_t p2_off = (offset+blen) % XrdSys::PageSize;
717
718 // length of data in last (p2) page
719 const size_t bavail = std::min(trackinglen - (XrdSys::PageSize*p2), (off_t)XrdSys::PageSize);
720
721 // how much of that data is not being returned
722 const size_t bremain = (p2_off < bavail) ? bavail-p2_off : 0;
723 uint8_t b[XrdSys::PageSize];
724 const uint8_t *ub = &((uint8_t*)buff)[blen-p2_off];
725 if (bremain>0)
726 {
727 const ssize_t rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize*p2, bavail);
728 if (rret<0)
729 {
730 TRACE(Warn, PageReadError(bavail, p2, rret));
731 return rret;
732 }
733 // if we're verifying make sure overlapping part of data just read matches user's buffer
734 if ((opts & XrdOssDF::Verify))
735 {
736 const uint8_t *const p = (uint8_t*)buff;
737 if (memcmp(&p[blen-p2_off], b, p2_off))
738 {
739 size_t badoff;
740 for(badoff=0;badoff<p2_off;badoff++) { if (p[blen-p2_off+badoff] != b[badoff]) break; }
741 badoff = (badoff < p2_off) ? badoff : 0; // may be possible with concurrent modification
742 TRACE(Warn, ByteMismatchError(bavail, XrdSys::PageSize*p2+badoff, p[blen-p2_off+badoff], b[badoff]));
743 return -EDOM;
744 }
745 }
746 ub = b;
747 }
748 if ((opts & XrdOssDF::Verify))
749 {
750 const uint32_t crc32calc = XrdOucCRC::Calc32C(ub, bavail, 0U);
751 if (tbuf[tidx] != crc32calc)
752 {
753 TRACE(Warn, CRCMismatchError(bavail, p2, crc32calc, tbuf[tidx]));
754 return -EDOM;
755 }
756 }
757 // if we're returning csvec and user only request part of page
758 // adjust the crc
759 if (csvec && bremain>0)
760 {
761 if ((opts & XrdOssDF::Verify))
762 {
763 // verified; calculate crc based on common part of page.
764 csvec[tidx] = XrdOucCRC::Calc32C(b, p2_off, 0u);
765 }
766 else
767 {
768 // recalculate crc based on recorded checksum and adjusting for part of data not returned.
769 // If either the returned data or the data not included in the user's request are
770 // corrupt the returned checksum and returned data will (probably) mismatch.
771
772 const uint32_t crc32c = XrdOucCRC::Calc32C(&b[p2_off], bremain, 0u);
773 csvec[tidx] = CrcUtils.crc32c_split1(csvec[tidx], crc32c, bremain);
774 }
775 }
776
777 return 0;
778}
779
780//
781// FetchRangeUnaligned
782//
783// Used by pgRead/Read when reading a range not starting at a page boundary within the file
784// OR when the length is not a multiple of the page-size and the read finishes not at the end of file.
785//
786int XrdOssCsiPages::FetchRangeUnaligned(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes, uint32_t *const csvec, const uint64_t opts)
787{
788 EPNAME("FetchRangeUnaligned");
789
790 const off_t p1 = offset / XrdSys::PageSize;
791 const size_t p1_off = offset % XrdSys::PageSize;
792 const off_t p2 = (offset+blen) / XrdSys::PageSize;
793 const size_t p2_off = (offset+blen) % XrdSys::PageSize;
794
795 const off_t trackinglen = sizes.first;
796
797 size_t ntagstoread = (p2_off>0) ? p2-p1+1 : p2-p1;
798 size_t ntagsbase = p1;
799 uint32_t tbufint[stsize_], *tbuf=0;
800 size_t tbufsz = 0;
801 if (!csvec)
802 {
803 tbuf = tbufint;
804 tbufsz = sizeof(tbufint)/sizeof(uint32_t);
805 }
806 else
807 {
808 tbuf = csvec;
809 tbufsz = ntagstoread;
810 }
811
812 size_t tcnt = std::min(ntagstoread, tbufsz);
813 ssize_t rret = ts_->ReadTags(tbuf, ntagsbase, tcnt);
814 if (rret<0)
815 {
816 TRACE(Warn, TagsReadError(ntagsbase, tcnt, rret) << " (first)");
817 return rret;
818 }
819 ntagstoread -= tcnt;
820
821 // deal with partial first page
822 if ( p1_off>0 || blen < static_cast<size_t>(XrdSys::PageSize) )
823 {
824 const int ret = FetchRangeUnaligned_preblock(fd, buff, offset, blen, trackinglen, tbuf, csvec, opts);
825 if (ret<0)
826 {
827 return ret;
828 }
829 }
830
831 // first (inclusive) and last (exclusive) full page
832 const off_t fp = (p1_off != 0) ? p1+1 : p1;
833 const off_t lp = p2;
834
835 // verify full pages if wanted
836 if (fp<lp && (opts & XrdOssDF::Verify))
837 {
838 const uint8_t *const p = (uint8_t*)buff;
839 uint32_t calcbuf[stsize_];
840 const size_t cbufsz = sizeof(calcbuf)/sizeof(uint32_t);
841 size_t toread = lp-fp;
842 size_t nread = 0;
843 while(toread>0)
844 {
845 const size_t ccnt = std::min(toread, cbufsz);
846 XrdOucCRC::Calc32C(&p[(p1_off ? XrdSys::PageSize-p1_off : 0)+XrdSys::PageSize*nread],ccnt*XrdSys::PageSize,calcbuf);
847 size_t tovalid = ccnt;
848 size_t nvalid = 0;
849 while(tovalid>0)
850 {
851 const size_t tidx=fp+nread+nvalid - ntagsbase;
852 const size_t nv = std::min(tovalid, tbufsz-tidx);
853 if (nv == 0)
854 {
855 assert(csvec == NULL);
856 ntagsbase += tbufsz;
857 tcnt = std::min(ntagstoread, tbufsz);
858 rret = ts_->ReadTags(tbuf, ntagsbase, tcnt);
859 if (rret<0)
860 {
861 TRACE(Warn, TagsReadError(ntagsbase, tcnt, rret) << " (mid)");
862 return rret;
863 }
864 ntagstoread -= tcnt;
865 continue;
866 }
867 if (memcmp(&calcbuf[nvalid], &tbuf[tidx], 4*nv))
868 {
869 size_t badpg;
870 for(badpg=0;badpg<nv;badpg++) { if (memcmp(&calcbuf[nvalid+badpg], &tbuf[tidx+badpg],4)) break; }
872 (ntagsbase+tidx+badpg),
873 calcbuf[nvalid+badpg], tbuf[tidx+badpg]));
874 return -EDOM;
875 }
876 tovalid -= nv;
877 nvalid += nv;
878 }
879 toread -= ccnt;
880 nread += ccnt;
881 }
882 }
883
884 // last partial page
885 if (p2>p1 && p2_off > 0)
886 {
887 // make sure we have last tag;
888 // (should already have all of them if we're returning them in csvec)
889 size_t tidx = p2 - ntagsbase;
890 if (tidx >= tbufsz)
891 {
892 assert(csvec == NULL);
893 tidx = 0;
894 ntagsbase = p2;
895 rret = ts_->ReadTags(tbuf, ntagsbase, 1);
896 if (rret<0)
897 {
898 TRACE(Warn, TagsReadError(ntagsbase, 1, rret) << " (last)");
899 return rret;
900 }
901 ntagstoread = 0;
902 }
903
904 const int ret = FetchRangeUnaligned_postblock(fd, buff, offset, blen, trackinglen, tbuf, csvec, tidx, opts);
905 if (ret<0)
906 {
907 return ret;
908 }
909 }
910
911 return 0;
912}
#define EPNAME(x)
XrdOucTrace OssCsiTrace
static XrdOssCsiCrcUtils CrcUtils
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
struct myOpts opts
#define TRACE(act, x)
Definition XrdTrace.hh:63
static uint32_t crc32c_extendwith_zero(uint32_t crc, size_t len)
static uint32_t crc32c_combine(uint32_t crc1, uint32_t crc2, size_t len2)
static uint32_t crc32c_split1(uint32_t crctot, uint32_t crc2, size_t len2)
static uint32_t crc32c_split2(uint32_t crctot, uint32_t crc1, size_t len2)
int StoreRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, const uint32_t *)
ssize_t apply_sequential_aligned_modify(const void *, off_t, size_t, const uint32_t *, bool, bool, uint32_t, uint32_t)
std::string ByteMismatchError(size_t blen, off_t off, uint8_t user, uint8_t page)
static ssize_t maxread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz, size_t tg=0)
std::string TagsReadError(off_t start, size_t n, int ret)
std::unique_ptr< XrdOssCsiTagstore > ts_
int UpdateRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string TagsWriteError(off_t start, size_t n, int ret)
int FetchRangeUnaligned_preblock(XrdOssDF *, const void *, off_t, size_t, off_t, uint32_t *, uint32_t *, uint64_t)
int UpdateRangeHoleUntilPage(XrdOssDF *, off_t, const Sizes_t &)
static ssize_t fullread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz)
std::pair< off_t, off_t > Sizes_t
int FetchRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int FetchRangeUnaligned_postblock(XrdOssDF *, const void *, off_t, size_t, off_t, uint32_t *, uint32_t *, size_t, uint64_t)
int VerifyRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string CRCMismatchError(size_t blen, off_t pgnum, uint32_t got, uint32_t expected)
int StoreRangeUnaligned_preblock(XrdOssDF *, const void *, size_t, off_t, off_t, const uint32_t *, uint32_t &)
std::string PageReadError(size_t blen, off_t pgnum, int ret)
int StoreRangeUnaligned_postblock(XrdOssDF *, const void *, size_t, off_t, off_t, const uint32_t *, uint32_t &)
const std::string fn_
static const size_t stsize_
static const uint64_t Verify
all: Verify checksums
Definition XrdOss.hh:223
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static const int PageSize