ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/oocsort.c
Revision: 2.3
Committed: Tue May 17 17:39:47 2016 UTC (8 years ago) by rschregle
Content type: text/plain
Branch: MAIN
Changes since 2.2: +412 -297 lines
Log Message:
Initial import of ooC photon map

File Contents

# User Rev Content
1 greg 2.1 /*
2 rschregle 2.3 =========================================================================
3     N-way out-of-core merge sort for records with 3D keys. Recursively
4     subdivides input into N blocks until these are of sufficiently small size
5     to be sorted in-core according to Z-order (Morton code), then N-way
6     merging them out-of-core using a priority queue as the stack unwinds.
7    
8     Roland Schregle (roland.schregle@{hslu.ch, gmail.com})
9     (c) Lucerne University of Applied Sciences and Arts,
10     supported by the Swiss National Science Foundation (SNSF, #147053)
11     ==========================================================================
12 greg 2.1
13 rschregle 2.3 $Id: oocsort.c,v 1.34 2015/09/15 13:40:41 taschreg Exp taschreg $
14 greg 2.1 */
15    
16    
17 rschregle 2.3
18 greg 2.1 #include "oocsort.h"
19 rschregle 2.3 #include "oocmorton.h"
20 greg 2.1 #include <stdio.h>
21     #include <stdlib.h>
22 rschregle 2.3 #include <unistd.h>
23     #include <fcntl.h>
24     #include <sys/wait.h>
25    
26 greg 2.1
27    
28     /* Priority queue node */
29     typedef struct {
30 rschregle 2.3 OOC_MortonIdx pri; /* Record's priority (sort key) */
31     unsigned blk; /* Block containing record */
32     } OOC_SortQueueNode;
33    
34 greg 2.1
35 rschregle 2.3 /* Priority queue */
36 greg 2.1 typedef struct {
37 rschregle 2.3 OOC_SortQueueNode *node;
38     unsigned len, tail;
39     } OOC_SortQueue;
40 greg 2.1
41 rschregle 2.3
42     /* Additional data for qsort() compare function. We resort to instancing
43     * this as a global variable instead of passing it to the compare func via
44     * qsort_r(), since the latter is a non-portable GNU extension. */
45 greg 2.1 typedef struct {
46 rschregle 2.3 RREAL *(*key)(const void*); /* Callback to access 3D key in record */
47     FVECT bbOrg; /* Origin of bbox containing keys */
48     RREAL mortonScale; /* Scale for generating Morton code */
49     } OOC_KeyData;
50 greg 2.1
51 rschregle 2.3 static OOC_KeyData keyData;
52 greg 2.1
53    
54    
55 rschregle 2.3 static int OOC_KeyCompare (const void *rec1, const void *rec2)
56     /* Comparison function for in-core quicksort */
57 greg 2.1 {
58 rschregle 2.3 OOC_MortonIdx pri1, pri2;
59 greg 2.1
60     if (!rec1 || !rec2)
61     return 0;
62 rschregle 2.3
63     pri1 = OOC_Key2Morton(keyData.key(rec1), keyData.bbOrg,
64     keyData.mortonScale);
65     pri2 = OOC_Key2Morton(keyData.key(rec2), keyData.bbOrg,
66     keyData.mortonScale);
67 greg 2.1
68     if (pri1 < pri2)
69     return -1;
70     else if (pri1 > pri2)
71     return 1;
72     else
73     return 0;
74     }
75    
76    
77 rschregle 2.3
78     static int OOC_SortRead (FILE *file, unsigned recSize, char *rec)
79     /* Read next record from file; return 0 and record in rec on success,
80     * else -1 */
81 greg 2.1 {
82 rschregle 2.3 if (!rec || feof(file) || !fread(rec, recSize, 1, file))
83     return -1;
84    
85     return 0;
86 greg 2.1 }
87    
88    
89 rschregle 2.3
90     static int OOC_SortPeek (FILE *file, unsigned recSize, char *rec)
91     /* Return next record from file WITHOUT advancing file position;
92     * return 0 and record in rec on success, else -1 */
93 greg 2.1 {
94 rschregle 2.3 const unsigned long filePos = ftell(file);
95    
96     if (OOC_SortRead(file, recSize, rec))
97     return -1;
98    
99     /* Workaround; fseek(file, -recSize, SEEK_CUR) causes subsequent
100     * fread()'s to fail until rewind() */
101     rewind(file);
102     if (fseek(file, filePos, SEEK_SET) < 0)
103     return -1;
104    
105     return 0;
106 greg 2.1 }
107    
108    
109 rschregle 2.3
110     static int OOC_SortWrite (FILE *file, unsigned recSize, const char *rec)
111     /* Output record to file; return 0 on success, else -1 */
112 greg 2.1 {
113 rschregle 2.3 if (!rec || !fwrite(rec, recSize, 1, file))
114     return -1;
115 greg 2.1
116 rschregle 2.3 return 0;
117 greg 2.1 }
118    
119    
120 rschregle 2.3
121     #ifdef DEBUG_OOC_SORT
122     static int OOC_SortQueueCheck (OOC_SortQueue *pq, unsigned root)
123 greg 2.1 /* Priority queue sanity check */
124     {
125     unsigned kid;
126    
127     if (root < pq -> tail) {
128 rschregle 2.3 if ((kid = (root << 1) + 1) < pq -> tail) {
129 greg 2.1 if (pq -> node [kid].pri < pq -> node [root].pri)
130     return -1;
131 rschregle 2.3 else return OOC_SortQueueCheck(pq, kid);
132     }
133 greg 2.1
134 rschregle 2.3 if ((kid = kid + 1) < pq -> tail) {
135 greg 2.1 if (pq -> node [kid].pri < pq -> node [root].pri)
136     return -1;
137 rschregle 2.3 else return OOC_SortQueueCheck(pq, kid);
138     }
139 greg 2.1 }
140    
141     return 0;
142     }
143     #endif
144    
145    
146 rschregle 2.3
147     static int OOC_SortPush (OOC_SortQueue *pq, OOC_MortonIdx pri, unsigned blk)
148 greg 2.1 /* Insert specified block index into priority queue; return block index
149     * or -1 if queue is full */
150     {
151 rschregle 2.3 OOC_SortQueueNode *pqn = pq -> node;
152     unsigned kid, root;
153 greg 2.1
154     if (pq -> tail >= pq -> len)
155     /* Queue full */
156     return -1;
157    
158     /* Append node at tail and re-sort */
159     kid = pq -> tail++;
160    
161     while (kid) {
162 rschregle 2.3 root = (kid - 1) >> 1;
163 greg 2.1
164     /* Compare with parent node and swap if necessary, else terminate */
165     if (pri < pqn [root].pri) {
166     pqn [kid].pri = pqn [root].pri;
167     pqn [kid].blk = pqn [root].blk;
168     kid = root;
169     }
170     else break;
171     }
172    
173     pqn [kid].pri = pri;
174     pqn [kid].blk = blk;
175    
176 rschregle 2.3 #ifdef DEBUG_OOC_SORT
177     if (OOC_SortQueueCheck(pq, 0) < 0) {
178 greg 2.1 fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
179     return -1;
180     }
181     #endif
182    
183     return blk;
184     }
185    
186    
187 rschregle 2.3
188     static int OOC_SortPop (OOC_SortQueue *pq)
189 greg 2.1 /* Remove head of priority queue and return its block index
190 rschregle 2.3 or -1 if queue empty */
191 greg 2.1 {
192 rschregle 2.3 OOC_SortQueueNode *pqn = pq -> node;
193     OOC_MortonIdx pri;
194     unsigned kid, kid2, root = 0, blk, res;
195 greg 2.1
196     if (!pq -> tail)
197     /* Queue empty */
198     return -1;
199    
200     res = pqn -> blk;
201     pri = pqn [--pq -> tail].pri;
202     blk = pqn [pq -> tail].blk;
203    
204     /* Replace head with tail node and re-sort */
205     while ((kid = (root << 1) + 1) < pq -> tail) {
206 rschregle 2.3 /* Compare with smaller kid and swap if necessary, else terminate */
207 greg 2.1 if ((kid2 = (kid + 1)) < pq -> tail && pqn [kid2].pri < pqn [kid].pri)
208     kid = kid2;
209    
210     if (pri > pqn [kid].pri) {
211     pqn [root].pri = pqn [kid].pri;
212     pqn [root].blk = pqn [kid].blk;
213     }
214     else break;
215    
216     root = kid;
217     }
218    
219     pqn [root].pri = pri;
220     pqn [root].blk = blk;
221    
222 rschregle 2.3 #ifdef DEBUG_OOC_SORT
223     if (OOC_SortQueueCheck(pq, 0) < 0) {
224 greg 2.1 fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
225     return -1;
226     }
227     #endif
228    
229     return res;
230     }
231    
232 rschregle 2.3
233    
234     /* Active subprocess counter updated by parent process; must persist when
235     * recursing into OOC_SortRecurse(), hence global */
236     static unsigned procCnt = 0;
237    
238     static int OOC_SortRecurse (FILE *in, unsigned long blkLo,
239     unsigned long blkHi, FILE *out,
240     unsigned numBlk, unsigned long maxBlkSize,
241     unsigned numProc, unsigned recSize,
242     char *sortBuf, OOC_SortQueue *pqueue,
243     const OOC_KeyData *keyData)
244     /* Recursive part of OOC_Sort(). Reads block of records from input file
245     * within the interval [blkLo, blkHi] and divides it into numBlk blocks
246     * until the size (in bytes) does not exceed maxBlkSize, then quicksorts
247     * each block into a temporary file. These files are then mergesorted via a
248     * priority queue to the output file as the stack unwinds. NOTE: Critical
249     * sections are prepended with '!!!'
250     *
251     * Parameters are as follows:
252     * in Input file containing unsorted records (assumed to be open)
253     * blkLo Start of current block in input file, in number of records
254     * blkHi End of current block in input file, in number of records
255     * out Output file containing sorted records (assumed to be open)
256     * numBlk Number of blocks to divide into / merge from
257     * maxBlkSize Max block size and size of in-core sort buffer, in bytes
258     * numProc Number of parallel processes for in-core sort
259     * recSize Size of input records in bytes
260     * sortBuf Preallocated in-core sort buffer of size maxBlkSize
261     * pqueue Preallocated priority queue of length numBlk for block merge
262     * keyData Aggregate data for Morton code generation and comparison
263     */
264 greg 2.1 {
265 rschregle 2.3 const unsigned long blkLen = blkHi - blkLo + 1,
266     blkSize = blkLen * recSize;
267     int stat;
268 greg 2.1
269 rschregle 2.3 if (!blkLen || blkHi < blkLo)
270     return 0;
271    
272     if (blkSize > maxBlkSize) {
273     unsigned long blkLo2 = blkLo, blkHi2 = blkLo, blkSize2;
274     const double blkLen2 = (double)blkLen / numBlk;
275     FILE *blkFile [numBlk]; /* Violates C89! */
276     char rec [recSize]; /* Ditto */
277     OOC_MortonIdx pri;
278     int b, pid;
279     #ifdef DEBUG_OOC_SORT
280     unsigned long pqCnt = 0;
281     #endif
282    
283     /* ======================================================
284     * Block too large for in-core sort -> divide into numBlk
285     * subblocks and recurse
286     * ====================================================== */
287    
288     #ifdef DEBUG_OOC_SORT
289     fprintf(stderr, "OOC_Sort: splitting block [%lu - %lu]\n",
290     blkLo, blkHi);
291     #endif
292    
293     for (b = 0; b < numBlk; b++) {
294     /* Open temporary file as output for subblock */
295     if (!(blkFile [b] = tmpfile())) {
296     perror("OOC_Sort: failed opening temporary block file");
297     return -1;
298     }
299    
300     /* Subblock interval [blkLo2, blkHi2] of size blkSize2 */
301     blkHi2 = blkLo + (b + 1) * blkLen2 - 1;
302     blkSize2 = (blkHi2 - blkLo2 + 1) * recSize;
303    
304     if (blkSize2 <= maxBlkSize) {
305     /* !!! Will be in-core sorted on recursion -> fork kid process,
306     * !!! but don't fork more than numProc kids; wait if necessary */
307     while (procCnt >= numProc && wait(&stat) >= 0) {
308     if (!WIFEXITED(stat) || WEXITSTATUS(stat))
309     return -1;
310    
311     procCnt--;
312     }
313    
314     /* Now fork kid process */
315     if (!(pid = fork())) {
316     /* Recurse on subblocks with new input filehandle; */
317     if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
318     maxBlkSize, numProc, recSize, sortBuf,
319     pqueue, keyData))
320     exit(-1);
321    
322     /* !!! Apparently the parent's tmpfile isn't deleted when the
323     * !!! child exits (which is what we want), though some
324     * !!! sources suggest using _Exit() instead; is this
325     * !!! implementation specific? */
326     exit(0);
327     }
328     else if (pid < 0) {
329     fprintf(stderr, "OOC_Sort: failed to fork subprocess\n");
330     return -1;
331     }
332    
333     #ifdef DEBUG_OOC_FORK
334     fprintf(stderr, "OOC_Sort: forking kid %d for block %d\n",
335     procCnt, b);
336     #endif
337    
338     /* Parent continues here */
339     procCnt++;
340     }
341     else {
342     /* Recurse on subblock; without forking */
343     if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
344     maxBlkSize, numProc, recSize, sortBuf,
345     pqueue, keyData))
346     return -1;
347     }
348    
349     /* Prepare for next block */
350     blkLo2 = blkHi2 + 1;
351     }
352    
353     /* !!! Wait for any forked processes to terminate */
354     while (procCnt && wait(&stat) >= 0) {
355     if (!WIFEXITED(stat) || WEXITSTATUS(stat))
356     return -1;
357    
358     procCnt--;
359     }
360    
361     /* ===============================================================
362     * Subblocks are now sorted; prepare priority queue by peeking and
363     * enqueueing first record from corresponding temporary file
364     * =============================================================== */
365    
366     #ifdef DEBUG_OOC_SORT
367     fprintf(stderr, "OOC_Sort: merging block [%lu - %lu]\n", blkLo, blkHi);
368     #endif
369 greg 2.1
370 rschregle 2.3 for (b = 0; b < numBlk; b++) {
371     #ifdef DEBUG_OOC_SORT
372     fseek(blkFile [b], 0, SEEK_END);
373     if (ftell(blkFile [b]) % recSize) {
374     fprintf(stderr, "OOC_Sort: truncated record in tmp block "
375     "file %d\n", b);
376     return -1;
377     }
378    
379     fprintf(stderr, "OOC_Sort: tmp block file %d contains %ld rec\n",
380     b, ftell(blkFile [b]) / recSize);
381 greg 2.1 #endif
382    
383 rschregle 2.3 rewind(blkFile [b]);
384    
385     if (OOC_SortPeek(blkFile [b], recSize, rec)) {
386     fprintf(stderr, "OOC_Sort: error reading from block file\n");
387     return -1;
388     }
389    
390     /* Enqueue record along with its Morton code as priority */
391     pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
392     keyData -> mortonScale);
393    
394     if (OOC_SortPush(pqueue, pri, b) < 0) {
395     fprintf(stderr, "OOC_Sort: failed priority queue insertion\n");
396     return -1;
397     }
398 greg 2.1 }
399    
400 rschregle 2.3 /* ==========================================================
401     * Subblocks now sorted and priority queue filled; merge from
402     * temporary files
403     * ========================================================== */
404    
405     do {
406     /* Get next node in priority queue, read next record in corresponding
407     * block, and send to output */
408     b = OOC_SortPop(pqueue);
409    
410     if (b >= 0) {
411     /* Priority queue non-empty */
412     if (OOC_SortRead(blkFile [b], recSize, rec)) {
413     /* Corresponding record should still be in the input block */
414     fprintf(stderr, "OOC_Sort: unexpected end reading block file\n");
415     return -1;
416     }
417    
418     if (OOC_SortWrite(out, recSize, rec)) {
419     fprintf(stderr, "OOC_Sort; error writing output file\n");
420     return -1;
421     }
422    
423     #ifdef DEBUG_OOC_SORT
424     pqCnt++;
425     #endif
426    
427     /* Peek next record from same block and insert in priority queue */
428     if (!OOC_SortPeek(blkFile [b], recSize, rec)) {
429     /* Block not exhausted */
430     pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
431     keyData -> mortonScale);
432    
433     if (OOC_SortPush(pqueue, pri, b) < 0) {
434     fprintf(stderr, "OOC_Sort: failed priority queue insert\n");
435     return -1;
436     }
437     }
438     }
439     } while (b >= 0);
440    
441     #ifdef DEBUG_OOC_SORT
442     fprintf(stderr, "OOC_Sort: dequeued %lu rec\n", pqCnt);
443     fprintf(stderr, "OOC_Sort: merged file contains %lu rec\n",
444     ftell(out) / recSize);
445     #endif
446 greg 2.1
447 rschregle 2.3 /* Priority queue now empty -> done; close temporary subblock files,
448     * causing them to be automatically deleted. */
449     for (b = 0; b < numBlk; b++) {
450     fclose(blkFile [b]);
451 greg 2.1 }
452    
453 rschregle 2.3 /* !!! Commit output file to disk before caller reads it; omitting
454     * !!! this apparently leads to corrupted files (race condition?) when
455     * !!! the caller tries to read them! */
456     fflush(out);
457     fsync(fileno(out));
458     }
459    
460     else {
461     /* ======================================
462     * Block is small enough for in-core sort
463     * ====================================== */
464     int ifd = fileno(in), ofd = fileno(out);
465    
466     #ifdef DEBUG_OOC_SORT
467     fprintf(stderr, "OOC_Sort: Proc %d (%d/%d) sorting block [%lu - %lu]\n",
468     getpid(), procCnt, numProc - 1, blkLo, blkHi);
469    
470     #endif
471    
472     /* Atomically seek and read block into in-core sort buffer */
473     /* !!! Unbuffered I/O via pread() avoids potential race conditions
474     * !!! and buffer corruption which can occur with lseek()/fseek()
475     * !!! followed by read()/fread(). */
476     if (pread(ifd, sortBuf, blkSize, blkLo * recSize) != blkSize) {
477     perror("OOC_Sort: error reading from input file");
478     return -1;
479     }
480    
481     /* Quicksort block in-core and write to output file */
482     qsort(sortBuf, blkLen, recSize, OOC_KeyCompare);
483    
484     if (write(ofd, sortBuf, blkSize) != blkSize) {
485     perror("OOC_Sort: error writing to block file");
486 greg 2.1 return -1;
487     }
488    
489 rschregle 2.3 fsync(ofd);
490    
491     #ifdef DEBUG_OOC_SORT
492     fprintf(stderr, "OOC_Sort: proc %d wrote %ld records\n",
493     getpid(), lseek(ofd, 0, SEEK_END) / recSize);
494     #endif
495     }
496    
497     return 0;
498     }
499    
500    
501 greg 2.1
502 rschregle 2.3 int OOC_Sort (FILE *in, FILE *out, unsigned numBlk,
503     unsigned long blkSize, unsigned numProc, unsigned recSize,
504     FVECT bbOrg, RREAL bbSize, RREAL *(*key)(const void*))
505     /* Sort records in inFile and append to outFile by subdividing inFile into
506     * small blocks, sorting these in-core, and merging them out-of-core via a
507     * priority queue.
508     *
509     * This is implemented as a recursive (numBlk)-way sort; the input is
510     * successively split into numBlk smaller blocks until these are of size <=
511     * blkSize, i.e. small enough for in-core sorting, then merging the sorted
512     * blocks as the stack unwinds. The in-core sort is parallelised over
513     * numProx processes.
514     *
515     * Parameters are as follows:
516     * inFile Opened input file containing unsorted records
517     * outFile Opened output file containing sorted records
518     * numBlk Number of blocks to divide into / merge from
519     * blkSize Max block size and size of in-core sort buffer, in bytes
520     * numProc Number of parallel processes for in-core sort
521     * recSize Size of input records in bytes
522     * bbOrg Origin of bounding box containing record keys for Morton code
523     * bbSize Extent of bounding box containing record keys for Morton code
524     * key Callback to access 3D coords from records for Morton code
525     */
526     {
527     unsigned long numRec;
528     OOC_SortQueue pqueue;
529     char *sortBuf = NULL;
530     int stat;
531    
532     if (numBlk < 1)
533     numBlk = 1;
534 greg 2.1
535 rschregle 2.3 /* Open input file & get size in number of records */
536     if (fseek(in, 0, SEEK_END) < 0) {
537     fputs("OOC_Sort: failed seek in input file\n", stderr);
538 greg 2.1 return -1;
539     }
540    
541 rschregle 2.3 if (!(numRec = ftell(in) / recSize)) {
542     fputs("OOC_Sort: empty input file\n", stderr);
543     return -1;
544 greg 2.1 }
545 rschregle 2.3
546     /* Allocate & init priority queue */
547     if (!(pqueue.node = calloc(numBlk, sizeof(OOC_SortQueueNode)))) {
548     fputs("OOC_Sort: failure allocating priority queue\n", stderr);
549 greg 2.1 return -1;
550     }
551 rschregle 2.3 pqueue.tail = 0;
552     pqueue.len = numBlk;
553    
554     /* Allocate in-core sort buffa */
555     if (!(sortBuf = malloc(blkSize))) {
556     fprintf(stderr, "OOC_Sort: failure allocating in-core sort buffer");
557 greg 2.1 return -1;
558     }
559    
560 rschregle 2.3 /* Set up key data to pass to qsort()'s comparison func */
561     keyData.key = key;
562     keyData.mortonScale = OOC_MORTON_MAX / bbSize;
563     VCOPY(keyData.bbOrg, bbOrg);
564    
565     stat = OOC_SortRecurse(in, 0, numRec - 1, out, numBlk, blkSize, numProc,
566     recSize, sortBuf, &pqueue, &keyData);
567 greg 2.1
568 rschregle 2.3 /* Cleanup */
569 greg 2.1 free(pqueue.node);
570 rschregle 2.3 free(sortBuf);
571 greg 2.1
572 rschregle 2.3 return stat;
573     }