ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/oocsort.c
Revision: 2.3
Committed: Tue May 17 17:39:47 2016 UTC (8 years ago) by rschregle
Content type: text/plain
Branch: MAIN
Changes since 2.2: +412 -297 lines
Log Message:
Initial import of ooC photon map

File Contents

# Content
1 /*
2 =========================================================================
3 N-way out-of-core merge sort for records with 3D keys. Recursively
4 subdivides input into N blocks until these are of sufficiently small size
5 to be sorted in-core according to Z-order (Morton code), then N-way
6 merging them out-of-core using a priority queue as the stack unwinds.
7
8 Roland Schregle (roland.schregle@{hslu.ch, gmail.com})
9 (c) Lucerne University of Applied Sciences and Arts,
10 supported by the Swiss National Science Foundation (SNSF, #147053)
11 ==========================================================================
12
13 $Id: oocsort.c,v 1.34 2015/09/15 13:40:41 taschreg Exp taschreg $
14 */
15
16
17
18 #include "oocsort.h"
19 #include "oocmorton.h"
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24 #include <sys/wait.h>
25
26
27
28 /* Priority queue node */
29 typedef struct {
30 OOC_MortonIdx pri; /* Record's priority (sort key) */
31 unsigned blk; /* Block containing record */
32 } OOC_SortQueueNode;
33
34
35 /* Priority queue */
36 typedef struct {
37 OOC_SortQueueNode *node;
38 unsigned len, tail;
39 } OOC_SortQueue;
40
41
42 /* Additional data for qsort() compare function. We resort to instancing
43 * this as a global variable instead of passing it to the compare func via
44 * qsort_r(), since the latter is a non-portable GNU extension. */
45 typedef struct {
46 RREAL *(*key)(const void*); /* Callback to access 3D key in record */
47 FVECT bbOrg; /* Origin of bbox containing keys */
48 RREAL mortonScale; /* Scale for generating Morton code */
49 } OOC_KeyData;
50
51 static OOC_KeyData keyData;
52
53
54
55 static int OOC_KeyCompare (const void *rec1, const void *rec2)
56 /* Comparison function for in-core quicksort */
57 {
58 OOC_MortonIdx pri1, pri2;
59
60 if (!rec1 || !rec2)
61 return 0;
62
63 pri1 = OOC_Key2Morton(keyData.key(rec1), keyData.bbOrg,
64 keyData.mortonScale);
65 pri2 = OOC_Key2Morton(keyData.key(rec2), keyData.bbOrg,
66 keyData.mortonScale);
67
68 if (pri1 < pri2)
69 return -1;
70 else if (pri1 > pri2)
71 return 1;
72 else
73 return 0;
74 }
75
76
77
78 static int OOC_SortRead (FILE *file, unsigned recSize, char *rec)
79 /* Read next record from file; return 0 and record in rec on success,
80 * else -1 */
81 {
82 if (!rec || feof(file) || !fread(rec, recSize, 1, file))
83 return -1;
84
85 return 0;
86 }
87
88
89
90 static int OOC_SortPeek (FILE *file, unsigned recSize, char *rec)
91 /* Return next record from file WITHOUT advancing file position;
92 * return 0 and record in rec on success, else -1 */
93 {
94 const unsigned long filePos = ftell(file);
95
96 if (OOC_SortRead(file, recSize, rec))
97 return -1;
98
99 /* Workaround; fseek(file, -recSize, SEEK_CUR) causes subsequent
100 * fread()'s to fail until rewind() */
101 rewind(file);
102 if (fseek(file, filePos, SEEK_SET) < 0)
103 return -1;
104
105 return 0;
106 }
107
108
109
110 static int OOC_SortWrite (FILE *file, unsigned recSize, const char *rec)
111 /* Output record to file; return 0 on success, else -1 */
112 {
113 if (!rec || !fwrite(rec, recSize, 1, file))
114 return -1;
115
116 return 0;
117 }
118
119
120
121 #ifdef DEBUG_OOC_SORT
122 static int OOC_SortQueueCheck (OOC_SortQueue *pq, unsigned root)
123 /* Priority queue sanity check */
124 {
125 unsigned kid;
126
127 if (root < pq -> tail) {
128 if ((kid = (root << 1) + 1) < pq -> tail) {
129 if (pq -> node [kid].pri < pq -> node [root].pri)
130 return -1;
131 else return OOC_SortQueueCheck(pq, kid);
132 }
133
134 if ((kid = kid + 1) < pq -> tail) {
135 if (pq -> node [kid].pri < pq -> node [root].pri)
136 return -1;
137 else return OOC_SortQueueCheck(pq, kid);
138 }
139 }
140
141 return 0;
142 }
143 #endif
144
145
146
147 static int OOC_SortPush (OOC_SortQueue *pq, OOC_MortonIdx pri, unsigned blk)
148 /* Insert specified block index into priority queue; return block index
149 * or -1 if queue is full */
150 {
151 OOC_SortQueueNode *pqn = pq -> node;
152 unsigned kid, root;
153
154 if (pq -> tail >= pq -> len)
155 /* Queue full */
156 return -1;
157
158 /* Append node at tail and re-sort */
159 kid = pq -> tail++;
160
161 while (kid) {
162 root = (kid - 1) >> 1;
163
164 /* Compare with parent node and swap if necessary, else terminate */
165 if (pri < pqn [root].pri) {
166 pqn [kid].pri = pqn [root].pri;
167 pqn [kid].blk = pqn [root].blk;
168 kid = root;
169 }
170 else break;
171 }
172
173 pqn [kid].pri = pri;
174 pqn [kid].blk = blk;
175
176 #ifdef DEBUG_OOC_SORT
177 if (OOC_SortQueueCheck(pq, 0) < 0) {
178 fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
179 return -1;
180 }
181 #endif
182
183 return blk;
184 }
185
186
187
188 static int OOC_SortPop (OOC_SortQueue *pq)
189 /* Remove head of priority queue and return its block index
190 or -1 if queue empty */
191 {
192 OOC_SortQueueNode *pqn = pq -> node;
193 OOC_MortonIdx pri;
194 unsigned kid, kid2, root = 0, blk, res;
195
196 if (!pq -> tail)
197 /* Queue empty */
198 return -1;
199
200 res = pqn -> blk;
201 pri = pqn [--pq -> tail].pri;
202 blk = pqn [pq -> tail].blk;
203
204 /* Replace head with tail node and re-sort */
205 while ((kid = (root << 1) + 1) < pq -> tail) {
206 /* Compare with smaller kid and swap if necessary, else terminate */
207 if ((kid2 = (kid + 1)) < pq -> tail && pqn [kid2].pri < pqn [kid].pri)
208 kid = kid2;
209
210 if (pri > pqn [kid].pri) {
211 pqn [root].pri = pqn [kid].pri;
212 pqn [root].blk = pqn [kid].blk;
213 }
214 else break;
215
216 root = kid;
217 }
218
219 pqn [root].pri = pri;
220 pqn [root].blk = blk;
221
222 #ifdef DEBUG_OOC_SORT
223 if (OOC_SortQueueCheck(pq, 0) < 0) {
224 fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
225 return -1;
226 }
227 #endif
228
229 return res;
230 }
231
232
233
234 /* Active subprocess counter updated by parent process; must persist when
235 * recursing into OOC_SortRecurse(), hence global */
236 static unsigned procCnt = 0;
237
238 static int OOC_SortRecurse (FILE *in, unsigned long blkLo,
239 unsigned long blkHi, FILE *out,
240 unsigned numBlk, unsigned long maxBlkSize,
241 unsigned numProc, unsigned recSize,
242 char *sortBuf, OOC_SortQueue *pqueue,
243 const OOC_KeyData *keyData)
244 /* Recursive part of OOC_Sort(). Reads block of records from input file
245 * within the interval [blkLo, blkHi] and divides it into numBlk blocks
246 * until the size (in bytes) does not exceed maxBlkSize, then quicksorts
247 * each block into a temporary file. These files are then mergesorted via a
248 * priority queue to the output file as the stack unwinds. NOTE: Critical
249 * sections are prepended with '!!!'
250 *
251 * Parameters are as follows:
252 * in Input file containing unsorted records (assumed to be open)
253 * blkLo Start of current block in input file, in number of records
254 * blkHi End of current block in input file, in number of records
255 * out Output file containing sorted records (assumed to be open)
256 * numBlk Number of blocks to divide into / merge from
257 * maxBlkSize Max block size and size of in-core sort buffer, in bytes
258 * numProc Number of parallel processes for in-core sort
259 * recSize Size of input records in bytes
260 * sortBuf Preallocated in-core sort buffer of size maxBlkSize
261 * pqueue Preallocated priority queue of length numBlk for block merge
262 * keyData Aggregate data for Morton code generation and comparison
263 */
264 {
265 const unsigned long blkLen = blkHi - blkLo + 1,
266 blkSize = blkLen * recSize;
267 int stat;
268
269 if (!blkLen || blkHi < blkLo)
270 return 0;
271
272 if (blkSize > maxBlkSize) {
273 unsigned long blkLo2 = blkLo, blkHi2 = blkLo, blkSize2;
274 const double blkLen2 = (double)blkLen / numBlk;
275 FILE *blkFile [numBlk]; /* Violates C89! */
276 char rec [recSize]; /* Ditto */
277 OOC_MortonIdx pri;
278 int b, pid;
279 #ifdef DEBUG_OOC_SORT
280 unsigned long pqCnt = 0;
281 #endif
282
283 /* ======================================================
284 * Block too large for in-core sort -> divide into numBlk
285 * subblocks and recurse
286 * ====================================================== */
287
288 #ifdef DEBUG_OOC_SORT
289 fprintf(stderr, "OOC_Sort: splitting block [%lu - %lu]\n",
290 blkLo, blkHi);
291 #endif
292
293 for (b = 0; b < numBlk; b++) {
294 /* Open temporary file as output for subblock */
295 if (!(blkFile [b] = tmpfile())) {
296 perror("OOC_Sort: failed opening temporary block file");
297 return -1;
298 }
299
300 /* Subblock interval [blkLo2, blkHi2] of size blkSize2 */
301 blkHi2 = blkLo + (b + 1) * blkLen2 - 1;
302 blkSize2 = (blkHi2 - blkLo2 + 1) * recSize;
303
304 if (blkSize2 <= maxBlkSize) {
305 /* !!! Will be in-core sorted on recursion -> fork kid process,
306 * !!! but don't fork more than numProc kids; wait if necessary */
307 while (procCnt >= numProc && wait(&stat) >= 0) {
308 if (!WIFEXITED(stat) || WEXITSTATUS(stat))
309 return -1;
310
311 procCnt--;
312 }
313
314 /* Now fork kid process */
315 if (!(pid = fork())) {
316 /* Recurse on subblocks with new input filehandle; */
317 if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
318 maxBlkSize, numProc, recSize, sortBuf,
319 pqueue, keyData))
320 exit(-1);
321
322 /* !!! Apparently the parent's tmpfile isn't deleted when the
323 * !!! child exits (which is what we want), though some
324 * !!! sources suggest using _Exit() instead; is this
325 * !!! implementation specific? */
326 exit(0);
327 }
328 else if (pid < 0) {
329 fprintf(stderr, "OOC_Sort: failed to fork subprocess\n");
330 return -1;
331 }
332
333 #ifdef DEBUG_OOC_FORK
334 fprintf(stderr, "OOC_Sort: forking kid %d for block %d\n",
335 procCnt, b);
336 #endif
337
338 /* Parent continues here */
339 procCnt++;
340 }
341 else {
342 /* Recurse on subblock; without forking */
343 if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
344 maxBlkSize, numProc, recSize, sortBuf,
345 pqueue, keyData))
346 return -1;
347 }
348
349 /* Prepare for next block */
350 blkLo2 = blkHi2 + 1;
351 }
352
353 /* !!! Wait for any forked processes to terminate */
354 while (procCnt && wait(&stat) >= 0) {
355 if (!WIFEXITED(stat) || WEXITSTATUS(stat))
356 return -1;
357
358 procCnt--;
359 }
360
361 /* ===============================================================
362 * Subblocks are now sorted; prepare priority queue by peeking and
363 * enqueueing first record from corresponding temporary file
364 * =============================================================== */
365
366 #ifdef DEBUG_OOC_SORT
367 fprintf(stderr, "OOC_Sort: merging block [%lu - %lu]\n", blkLo, blkHi);
368 #endif
369
370 for (b = 0; b < numBlk; b++) {
371 #ifdef DEBUG_OOC_SORT
372 fseek(blkFile [b], 0, SEEK_END);
373 if (ftell(blkFile [b]) % recSize) {
374 fprintf(stderr, "OOC_Sort: truncated record in tmp block "
375 "file %d\n", b);
376 return -1;
377 }
378
379 fprintf(stderr, "OOC_Sort: tmp block file %d contains %ld rec\n",
380 b, ftell(blkFile [b]) / recSize);
381 #endif
382
383 rewind(blkFile [b]);
384
385 if (OOC_SortPeek(blkFile [b], recSize, rec)) {
386 fprintf(stderr, "OOC_Sort: error reading from block file\n");
387 return -1;
388 }
389
390 /* Enqueue record along with its Morton code as priority */
391 pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
392 keyData -> mortonScale);
393
394 if (OOC_SortPush(pqueue, pri, b) < 0) {
395 fprintf(stderr, "OOC_Sort: failed priority queue insertion\n");
396 return -1;
397 }
398 }
399
400 /* ==========================================================
401 * Subblocks now sorted and priority queue filled; merge from
402 * temporary files
403 * ========================================================== */
404
405 do {
406 /* Get next node in priority queue, read next record in corresponding
407 * block, and send to output */
408 b = OOC_SortPop(pqueue);
409
410 if (b >= 0) {
411 /* Priority queue non-empty */
412 if (OOC_SortRead(blkFile [b], recSize, rec)) {
413 /* Corresponding record should still be in the input block */
414 fprintf(stderr, "OOC_Sort: unexpected end reading block file\n");
415 return -1;
416 }
417
418 if (OOC_SortWrite(out, recSize, rec)) {
419 fprintf(stderr, "OOC_Sort; error writing output file\n");
420 return -1;
421 }
422
423 #ifdef DEBUG_OOC_SORT
424 pqCnt++;
425 #endif
426
427 /* Peek next record from same block and insert in priority queue */
428 if (!OOC_SortPeek(blkFile [b], recSize, rec)) {
429 /* Block not exhausted */
430 pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
431 keyData -> mortonScale);
432
433 if (OOC_SortPush(pqueue, pri, b) < 0) {
434 fprintf(stderr, "OOC_Sort: failed priority queue insert\n");
435 return -1;
436 }
437 }
438 }
439 } while (b >= 0);
440
441 #ifdef DEBUG_OOC_SORT
442 fprintf(stderr, "OOC_Sort: dequeued %lu rec\n", pqCnt);
443 fprintf(stderr, "OOC_Sort: merged file contains %lu rec\n",
444 ftell(out) / recSize);
445 #endif
446
447 /* Priority queue now empty -> done; close temporary subblock files,
448 * causing them to be automatically deleted. */
449 for (b = 0; b < numBlk; b++) {
450 fclose(blkFile [b]);
451 }
452
453 /* !!! Commit output file to disk before caller reads it; omitting
454 * !!! this apparently leads to corrupted files (race condition?) when
455 * !!! the caller tries to read them! */
456 fflush(out);
457 fsync(fileno(out));
458 }
459
460 else {
461 /* ======================================
462 * Block is small enough for in-core sort
463 * ====================================== */
464 int ifd = fileno(in), ofd = fileno(out);
465
466 #ifdef DEBUG_OOC_SORT
467 fprintf(stderr, "OOC_Sort: Proc %d (%d/%d) sorting block [%lu - %lu]\n",
468 getpid(), procCnt, numProc - 1, blkLo, blkHi);
469
470 #endif
471
472 /* Atomically seek and read block into in-core sort buffer */
473 /* !!! Unbuffered I/O via pread() avoids potential race conditions
474 * !!! and buffer corruption which can occur with lseek()/fseek()
475 * !!! followed by read()/fread(). */
476 if (pread(ifd, sortBuf, blkSize, blkLo * recSize) != blkSize) {
477 perror("OOC_Sort: error reading from input file");
478 return -1;
479 }
480
481 /* Quicksort block in-core and write to output file */
482 qsort(sortBuf, blkLen, recSize, OOC_KeyCompare);
483
484 if (write(ofd, sortBuf, blkSize) != blkSize) {
485 perror("OOC_Sort: error writing to block file");
486 return -1;
487 }
488
489 fsync(ofd);
490
491 #ifdef DEBUG_OOC_SORT
492 fprintf(stderr, "OOC_Sort: proc %d wrote %ld records\n",
493 getpid(), lseek(ofd, 0, SEEK_END) / recSize);
494 #endif
495 }
496
497 return 0;
498 }
499
500
501
502 int OOC_Sort (FILE *in, FILE *out, unsigned numBlk,
503 unsigned long blkSize, unsigned numProc, unsigned recSize,
504 FVECT bbOrg, RREAL bbSize, RREAL *(*key)(const void*))
505 /* Sort records in inFile and append to outFile by subdividing inFile into
506 * small blocks, sorting these in-core, and merging them out-of-core via a
507 * priority queue.
508 *
509 * This is implemented as a recursive (numBlk)-way sort; the input is
510 * successively split into numBlk smaller blocks until these are of size <=
511 * blkSize, i.e. small enough for in-core sorting, then merging the sorted
512 * blocks as the stack unwinds. The in-core sort is parallelised over
513 * numProx processes.
514 *
515 * Parameters are as follows:
516 * inFile Opened input file containing unsorted records
517 * outFile Opened output file containing sorted records
518 * numBlk Number of blocks to divide into / merge from
519 * blkSize Max block size and size of in-core sort buffer, in bytes
520 * numProc Number of parallel processes for in-core sort
521 * recSize Size of input records in bytes
522 * bbOrg Origin of bounding box containing record keys for Morton code
523 * bbSize Extent of bounding box containing record keys for Morton code
524 * key Callback to access 3D coords from records for Morton code
525 */
526 {
527 unsigned long numRec;
528 OOC_SortQueue pqueue;
529 char *sortBuf = NULL;
530 int stat;
531
532 if (numBlk < 1)
533 numBlk = 1;
534
535 /* Open input file & get size in number of records */
536 if (fseek(in, 0, SEEK_END) < 0) {
537 fputs("OOC_Sort: failed seek in input file\n", stderr);
538 return -1;
539 }
540
541 if (!(numRec = ftell(in) / recSize)) {
542 fputs("OOC_Sort: empty input file\n", stderr);
543 return -1;
544 }
545
546 /* Allocate & init priority queue */
547 if (!(pqueue.node = calloc(numBlk, sizeof(OOC_SortQueueNode)))) {
548 fputs("OOC_Sort: failure allocating priority queue\n", stderr);
549 return -1;
550 }
551 pqueue.tail = 0;
552 pqueue.len = numBlk;
553
554 /* Allocate in-core sort buffa */
555 if (!(sortBuf = malloc(blkSize))) {
556 fprintf(stderr, "OOC_Sort: failure allocating in-core sort buffer");
557 return -1;
558 }
559
560 /* Set up key data to pass to qsort()'s comparison func */
561 keyData.key = key;
562 keyData.mortonScale = OOC_MORTON_MAX / bbSize;
563 VCOPY(keyData.bbOrg, bbOrg);
564
565 stat = OOC_SortRecurse(in, 0, numRec - 1, out, numBlk, blkSize, numProc,
566 recSize, sortBuf, &pqueue, &keyData);
567
568 /* Cleanup */
569 free(pqueue.node);
570 free(sortBuf);
571
572 return stat;
573 }