ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/oocsort.c
(Generate patch)

Comparing ray/src/rt/oocsort.c (file contents):
Revision 2.2 by greg, Wed Feb 25 17:19:52 2015 UTC vs.
Revision 2.3 by rschregle, Tue May 17 17:39:47 2016 UTC

# Line 1 | Line 1
1   /*
2 <   ==================================================================
3 <   N-way hybrid out-of-core merge sort
4 <   Sorts N blocks internally using quicksort, followed by N-way
5 <   merge using priority queue.
2 >   =========================================================================
3 >   N-way out-of-core merge sort for records with 3D keys.  Recursively
4 >   subdivides input into N blocks until these are of sufficiently small size
5 >   to be sorted in-core according to Z-order (Morton code), then N-way
6 >   merging them out-of-core using a priority queue as the stack unwinds.
7    
8 <   Roland Schregle (roland.schregle@{hslu.ch, gmail.com})
9 <   (c) Fraunhofer Institute for Solar Energy Systems,
10 <       Lucerne University of Applied Sciences & Arts  
11 <   ==================================================================
8 >   Roland Schregle (roland.schregle@{hslu.ch, gmail.com})
9 >   (c) Lucerne University of Applied Sciences and Arts,
10 >       supported by the Swiss National Science Foundation (SNSF, #147053)
11 >   ==========================================================================
12    
13 <   $Id $
13 >   $Id$
14   */
15  
16  
17 +
18   #include "oocsort.h"
19 + #include "oocmorton.h"
20   #include <stdio.h>
21   #include <stdlib.h>
22 < #include <string.h>
22 > #include <unistd.h>
23 > #include <fcntl.h>
24 > #include <sys/wait.h>
25  
26  
27 +
28   /* Priority queue node */
29   typedef struct {
30 <   OOC_Sort_Key pri;   /* Record's priority (sort key) */
31 <   unsigned blk;           /* Block containing record */
32 < } OOC_Sort_PQNode;
30 >   OOC_MortonIdx     pri;        /* Record's priority (sort key) */
31 >   unsigned          blk;        /* Block containing record */
32 > } OOC_SortQueueNode;
33  
34 +
35 + /* Priority queue */
36   typedef struct {
37 <   OOC_Sort_PQNode *node;
38 <   unsigned len, tail;
39 < } OOC_Sort_PQueue;
37 >   OOC_SortQueueNode *node;
38 >   unsigned          len, tail;
39 > } OOC_SortQueue;
40  
41 < /* Block descriptor */
41 >
42 > /* Additional data for qsort() compare function. We resort to instancing
43 > * this as a global variable instead of passing it to the compare func via
44 > * qsort_r(), since the latter is a non-portable GNU extension. */
45   typedef struct {
46 <   FILE *file;                /* Temporary file */
47 <   char *buf, *head, *tail;   /* Associated I/O buffa,
48 <                                 head/tail pointers */
49 < } OOC_Sort_Block;
46 >   RREAL    *(*key)(const void*);   /* Callback to access 3D key in record */
47 >   FVECT    bbOrg;                  /* Origin of bbox containing keys */
48 >   RREAL    mortonScale;            /* Scale for generating Morton code */
49 > } OOC_KeyData;
50  
51 + static OOC_KeyData keyData;
52  
41 /* Record priority evaluation function */
42 static OOC_Sort_Key (*pri)(const void *);
53  
54  
55 < static int OOC_Sort_Compare (const void *rec1, const void *rec2)
56 < /* Comparison function for internal sorting */
55 > static int OOC_KeyCompare (const void *rec1, const void *rec2)
56 > /* Comparison function for in-core quicksort */
57   {
58 <   const OOC_Sort_Key pri1 = pri(rec1), pri2 = pri(rec2);
58 >   OOC_MortonIdx  pri1, pri2;
59    
60     if (!rec1 || !rec2)
61        return 0;
62 +      
63 +   pri1 = OOC_Key2Morton(keyData.key(rec1), keyData.bbOrg,
64 +                         keyData.mortonScale);
65 +   pri2 = OOC_Key2Morton(keyData.key(rec2), keyData.bbOrg,
66 +                         keyData.mortonScale);
67    
68     if (pri1 < pri2)
69        return -1;
# Line 59 | Line 74 | static int OOC_Sort_Compare (const void *rec1, const v
74   }
75  
76  
77 < static char *OOC_Sort_Read (OOC_Sort_Block *blk, unsigned recSize)
78 < /* Read next record from specified block; return pointer to buffa entry
79 <   or NULL on error */
77 >
78 > static int OOC_SortRead (FILE *file, unsigned recSize, char *rec)
79 > /* Read next record from file; return 0 and record in rec on success,
80 > * else -1 */
81   {
82 <   char *rec = NULL;
83 <        
68 <   if (blk -> head >= blk -> tail) {
69 <      /* Input buffa empty; read next block (potentially truncated if last),
70 <       * where tail marks end of buffa */
71 <      const unsigned long bufSize = blk -> tail - blk -> buf;
72 <    
73 <      if (feof(blk -> file))
74 <         return NULL;
82 >   if (!rec || feof(file) || !fread(rec, recSize, 1, file))
83 >      return -1;
84  
85 <      blk -> head = blk -> buf;
77 <      blk -> tail = blk -> buf + fread(blk -> buf, 1, bufSize, blk -> file);
78 <                    
79 <      if (blk -> tail == blk -> head)
80 <         return NULL;
81 <   }
82 <  
83 <   rec = blk -> head;
84 <   blk -> head += recSize;
85 <  
86 <   return rec;
85 >   return 0;
86   }
87  
88  
89 < static char *OOC_Sort_Peek (OOC_Sort_Block *blk, unsigned recSize)
90 < /* Return next record from specified block WITHOUT removing from buffa */
89 >
90 > static int OOC_SortPeek (FILE *file, unsigned recSize, char *rec)
91 > /* Return next record from file WITHOUT advancing file position;
92 > * return 0 and record in rec on success, else -1 */
93   {
94 <   char *rec = NULL;
95 <  
96 <   if (rec = OOC_Sort_Read(blk, recSize)) {
97 <      /* Restore buffa head */
98 <      blk -> head -= recSize;
99 <   }
100 <  
101 <   return rec;
94 >   const unsigned long filePos = ftell(file);
95 >
96 >   if (OOC_SortRead(file, recSize, rec))
97 >      return -1;
98 >      
99 >   /* Workaround; fseek(file, -recSize, SEEK_CUR) causes subsequent
100 >    * fread()'s to fail until rewind() */
101 >   rewind(file);    
102 >   if (fseek(file, filePos, SEEK_SET) < 0)
103 >      return -1;
104 >
105 >   return 0;
106   }
107  
108  
109 < static char *OOC_Sort_Write (OOC_Sort_Block *blk, unsigned recSize,
110 <                             const char *rec)
111 < /* Output record to specified block and return pointer to buffa entry
107 <   or NULL on error */
109 >
110 > static int OOC_SortWrite (FILE *file, unsigned recSize, const char *rec)
111 > /* Output record to file; return 0 on success, else -1 */
112   {
113 <   char *res = NULL;
113 >   if (!rec || !fwrite(rec, recSize, 1, file))
114 >      return -1;
115    
116 <   if (blk -> head >= blk -> tail) {
112 <      /* Flush output buffa (tail marks end of buffa) */
113 <      const unsigned long bufSize = blk -> tail - blk -> buf;
114 <
115 <      blk -> head = blk -> buf;
116 <      if (fwrite(blk -> buf, 1, bufSize, blk -> file) != bufSize)
117 <         return NULL;
118 <   }
119 <  
120 <   if (!rec)
121 <      return NULL;
122 <      
123 <   memcpy(blk -> head, rec, recSize);
124 <   res = blk -> head;
125 <   blk -> head += recSize;
126 <  
127 <   return res;
116 >   return 0;
117   }
118  
119  
120 < #ifdef DEBUG
121 <   static int OOC_Sort_PQCheck (OOC_Sort_PQueue *pq, unsigned root)
120 >
121 > #ifdef DEBUG_OOC_SORT
122 >   static int OOC_SortQueueCheck (OOC_SortQueue *pq, unsigned root)
123     /* Priority queue sanity check */
124     {
125        unsigned kid;
126        
127        if (root < pq -> tail) {
128 <         if ((kid = (root << 1) + 1) < pq -> tail)
128 >         if ((kid = (root << 1) + 1) < pq -> tail) {
129              if (pq -> node [kid].pri < pq -> node [root].pri)
130                 return -1;
131 <            else return OOC_Sort_PQCheck(pq, kid);
131 >            else return OOC_SortQueueCheck(pq, kid);
132 >         }
133              
134 <         if ((kid = kid + 1) < pq -> tail)
134 >         if ((kid = kid + 1) < pq -> tail) {
135              if (pq -> node [kid].pri < pq -> node [root].pri)
136                 return -1;
137 <            else return OOC_Sort_PQCheck(pq, kid);
138 <        
137 >            else return OOC_SortQueueCheck(pq, kid);
138 >         }
139        }
140        
141        return 0;
# Line 152 | Line 143 | static char *OOC_Sort_Write (OOC_Sort_Block *blk, unsi
143   #endif
144  
145  
146 < static int OOC_Sort_Push (OOC_Sort_PQueue *pq, OOC_Sort_Key pri,
147 <                          unsigned blk)
146 >
147 > static int OOC_SortPush (OOC_SortQueue *pq, OOC_MortonIdx pri, unsigned blk)
148   /* Insert specified block index into priority queue; return block index
149   * or -1 if queue is full */
150   {
151 <   OOC_Sort_PQNode *pqn = pq -> node;
152 <   unsigned kid, root;
151 >   OOC_SortQueueNode *pqn = pq -> node;
152 >   unsigned          kid, root;
153    
154     if (pq -> tail >= pq -> len)
155        /* Queue full */
# Line 168 | Line 159 | static int OOC_Sort_Push (OOC_Sort_PQueue *pq, OOC_Sor
159     kid = pq -> tail++;
160    
161     while (kid) {
162 <      root = kid - 1 >> 1;
162 >      root = (kid - 1) >> 1;
163        
164        /* Compare with parent node and swap if necessary, else terminate */
165        if (pri < pqn [root].pri) {
# Line 182 | Line 173 | static int OOC_Sort_Push (OOC_Sort_PQueue *pq, OOC_Sor
173     pqn [kid].pri = pri;
174     pqn [kid].blk = blk;  
175  
176 < #ifdef DEBUG
177 <   if (OOC_Sort_PQCheck(pq, 0) < 0) {
176 > #ifdef DEBUG_OOC_SORT
177 >   if (OOC_SortQueueCheck(pq, 0) < 0) {
178        fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
179        return -1;
180     }
# Line 193 | Line 184 | static int OOC_Sort_Push (OOC_Sort_PQueue *pq, OOC_Sor
184   }
185  
186  
187 < static int OOC_Sort_Pop (OOC_Sort_PQueue *pq)
187 >
188 > static int OOC_SortPop (OOC_SortQueue *pq)
189   /* Remove head of priority queue and return its block index
190 < * or -1 if queue is empty */
190 >   or -1 if queue empty */
191   {
192 <   OOC_Sort_PQNode *pqn = pq -> node;
193 <   OOC_Sort_Key pri;
194 <   unsigned kid, kid2, root = 0, blk, res;
192 >   OOC_SortQueueNode *pqn = pq -> node;
193 >   OOC_MortonIdx     pri;
194 >   unsigned          kid, kid2, root = 0, blk, res;
195    
196     if (!pq -> tail)
197        /* Queue empty */
# Line 211 | Line 203 | static int OOC_Sort_Pop (OOC_Sort_PQueue *pq)
203  
204     /* Replace head with tail node and re-sort */
205     while ((kid = (root << 1) + 1) < pq -> tail) {
206 <      /* Compare with with smaller kid and swap if necessary, else
215 <       * terminate */
206 >      /* Compare with smaller kid and swap if necessary, else terminate */
207        if ((kid2 = (kid + 1)) < pq -> tail && pqn [kid2].pri < pqn [kid].pri)
208           kid = kid2;
209              
# Line 228 | Line 219 | static int OOC_Sort_Pop (OOC_Sort_PQueue *pq)
219     pqn [root].pri = pri;
220     pqn [root].blk = blk;
221  
222 < #ifdef DEBUG
223 <   if (OOC_Sort_PQCheck(pq, 0) < 0) {
222 > #ifdef DEBUG_OOC_SORT
223 >   if (OOC_SortQueueCheck(pq, 0) < 0) {
224        fprintf(stderr, "OOC_Sort: priority queue inconsistency\n");
225        return -1;
226     }
# Line 237 | Line 228 | static int OOC_Sort_Pop (OOC_Sort_PQueue *pq)
228    
229     return res;
230   }
240
241 #if 0
242 int OOC_Sort (const char *inFile, const char *outFile,
243              unsigned long blkSize, unsigned recSize,
244              OOC_Sort_Key (*priority)(const void *))
245 /* Sort records in inFile and output to outFile by (a) internally
246 * quicksorting block of blkSize bytes at a time, then (b) merging them
247 * via a priority queue. RecSize specifies the size in bytes of each data
248 * record. The priority() callback evaluates a record's priority and must be
249 * supplied by the caller. */
250 {
251   FILE *in = NULL;
252   OOC_Sort_PQueue pqueue;                         /* Priority queue */
253   OOC_Sort_Block *iBlk = NULL, oBlk;              /* Block descriptors */
254   char *rec, *sortBuf = NULL;                     /* Internal sort buffa */
255   unsigned bufSize, numBlk;
256   int b;
257  
258   /* Set record priority evaluation callback */                
259   pri = priority;
260  
261   /* Round block and buffa size down to nearest multiple of record size */
262   blkSize = (blkSize / recSize) * recSize;
263   bufSize = (OOC_SORT_BUFSIZE / recSize) * recSize;
231  
265   if (!(in = fopen(inFile, "rb"))) {
266      fprintf(stderr, "OOC_Sort: failure opening input file %s\n", inFile);
267      return -1;
268   }
269  
270   /* Get input file size and number of blocks (rounded up to nearest
271    * multiple of block size) */
272   fseek(in, 0, SEEK_END);
273   numBlk = (ftell(in) + blkSize - 1) / blkSize;
274   rewind(in);
275 #else
232  
233 < int OOC_Sort (const char *inFile, const char *outFile,
234 <              unsigned numBlk, unsigned recSize,
235 <              OOC_Sort_Key (*priority)(const void *))
236 < /* Sort records in inFile and output to outFile by (a) internally
237 < * quicksorting numBlk blocks at a time, then (b) merging them via a priority
238 < * queue.  RecSize specifies the size in bytes of each data record.  The
239 < * priority() callback evaluates a record's priority (ordinal index) and
240 < * must be supplied by the caller.  */
233 >
234 > /* Active subprocess counter updated by parent process; must persist when
235 > * recursing into OOC_SortRecurse(), hence global */
236 > static unsigned procCnt = 0;
237 >
238 > static int OOC_SortRecurse (FILE *in, unsigned long blkLo,
239 >                            unsigned long blkHi, FILE *out,
240 >                            unsigned numBlk, unsigned long maxBlkSize,
241 >                            unsigned numProc, unsigned recSize,
242 >                            char *sortBuf, OOC_SortQueue *pqueue,
243 >                            const OOC_KeyData *keyData)
244 > /* Recursive part of OOC_Sort(). Reads block of records from input file
245 > * within the interval [blkLo, blkHi] and divides it into numBlk blocks
246 > * until the size (in bytes) does not exceed maxBlkSize, then quicksorts
247 > * each block into a temporary file.  These files are then mergesorted via a
248 > * priority queue to the output file as the stack unwinds.  NOTE: Critical
249 > * sections are prepended with '!!!'
250 > *
251 > * Parameters are as follows:
252 > * in          Input file containing unsorted records (assumed to be open)
253 > * blkLo       Start of current block in input file, in number of records
254 > * blkHi       End of current block in input file, in number of records
255 > * out         Output file containing sorted records (assumed to be open)
256 > * numBlk      Number of blocks to divide into / merge from
257 > * maxBlkSize  Max block size and size of in-core sort buffer, in bytes
258 > * numProc     Number of parallel processes for in-core sort
259 > * recSize     Size of input records in bytes
260 > * sortBuf     Preallocated in-core sort buffer of size maxBlkSize
261 > * pqueue      Preallocated priority queue of length numBlk for block merge
262 > * keyData     Aggregate data for Morton code generation and comparison
263 > */
264   {
265 <   FILE *in = NULL;
266 <   OOC_Sort_PQueue pqueue;                         /* Priority queue */
267 <   OOC_Sort_Block *iBlk = NULL, oBlk;              /* Block descriptors */
289 <   char *rec, *sortBuf = NULL;                     /* Internal sort buffa */
290 <   unsigned long blkSize;
291 <   unsigned bufSize;
292 <   int b;
265 >   const unsigned long  blkLen = blkHi - blkLo + 1,
266 >                        blkSize = blkLen * recSize;
267 >   int                  stat;
268    
269 <   /* Set record priority evaluation callback */                
270 <   pri = priority;
271 <  
272 <   /* Round buffa size down to nearest multiple of record size */
273 <   bufSize = (OOC_SORT_BUFSIZE / recSize) * recSize;
269 >   if (!blkLen || blkHi < blkLo)
270 >      return 0;
271 >      
272 >   if (blkSize > maxBlkSize) {
273 >      unsigned long     blkLo2 = blkLo, blkHi2 = blkLo, blkSize2;
274 >      const double      blkLen2 = (double)blkLen / numBlk;
275 >      FILE              *blkFile [numBlk];         /* Violates C89!  */
276 >      char              rec [recSize];             /* Ditto          */
277 >      OOC_MortonIdx     pri;
278 >      int               b, pid;
279 > #ifdef DEBUG_OOC_SORT      
280 >      unsigned long     pqCnt = 0;
281 > #endif      
282 >      
283 >      /* ======================================================
284 >       * Block too large for in-core sort -> divide into numBlk
285 >       * subblocks and recurse
286 >       * ====================================================== */
287  
288 <   if (!(in = fopen(inFile, "rb"))) {
289 <      fprintf(stderr, "OOC_Sort: failure opening input file %s\n", inFile);
290 <      return -1;
303 <   }
304 <  
305 <   /* Get input file size and block size (in number of records) rounded up
306 <    * to nearest multiple of number of blocks */
307 <   fseek(in, 0, SEEK_END);
308 <   blkSize = (ftell(in) / recSize + numBlk - 1) / numBlk;  
309 <   rewind(in);
288 > #ifdef DEBUG_OOC_SORT
289 >      fprintf(stderr, "OOC_Sort: splitting block [%lu - %lu]\n",
290 >              blkLo, blkHi);
291   #endif
292  
293 <   /* Allocate buffa for internal sorting */
294 <   if (!(sortBuf = malloc(blkSize))) {
295 <      fprintf(stderr, "OOC_Sort: failure allocating internal sort buffer\n");
296 <      return -1;
297 <   }
298 <  
299 <   /* Allocate input blocks */
300 <   if (!(iBlk = calloc(numBlk, sizeof(OOC_Sort_Block)))) {
301 <      fprintf(stderr, "OOC_Sort: failure allocating input blocks\n");
302 <      return -1;
322 <   }
323 <  
324 <   /* ===================================================================
325 <    * Pass 1: Internal quicksort of each input block in sortBuf
326 <    * =================================================================== */
293 >      for (b = 0; b < numBlk; b++) {
294 >         /* Open temporary file as output for subblock */
295 >         if (!(blkFile [b] = tmpfile())) {
296 >            perror("OOC_Sort: failed opening temporary block file");
297 >            return -1;
298 >         }
299 >        
300 >         /* Subblock interval [blkLo2, blkHi2] of size blkSize2 */
301 >         blkHi2 = blkLo + (b + 1) * blkLen2 - 1;
302 >         blkSize2 = (blkHi2 - blkLo2 + 1) * recSize;
303  
304 <   for (b = 0; b < numBlk; b++) {
305 <      unsigned long numRec;
304 >         if (blkSize2 <= maxBlkSize) {
305 >            /* !!! Will be in-core sorted on recursion -> fork kid process,
306 >             * !!! but don't fork more than numProc kids; wait if necessary */
307 >            while (procCnt >= numProc && wait(&stat) >= 0) {
308 >               if (!WIFEXITED(stat) || WEXITSTATUS(stat))
309 >                  return -1;
310 >
311 >               procCnt--;
312 >            }
313 >
314 >            /* Now fork kid process */
315 >            if (!(pid = fork())) {
316 >               /* Recurse on subblocks with new input filehandle; */
317 >               if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
318 >                                   maxBlkSize, numProc, recSize, sortBuf,
319 >                                   pqueue, keyData))
320 >                  exit(-1);
321 >                                                                        
322 >               /* !!! Apparently the parent's tmpfile isn't deleted when the
323 >                * !!! child exits (which is what we want), though some
324 >                * !!! sources suggest using _Exit() instead; is this
325 >                * !!! implementation specific?  */
326 >               exit(0);
327 >            }
328 >            else if (pid < 0) {
329 >               fprintf(stderr, "OOC_Sort: failed to fork subprocess\n");
330 >               return -1;
331 >            }
332 >
333 > #ifdef DEBUG_OOC_FORK
334 >            fprintf(stderr, "OOC_Sort: forking kid %d for block %d\n",
335 >                    procCnt, b);
336 > #endif            
337        
338 <      /* Open temporary file associated with block */
339 <      if (!(iBlk [b].file = tmpfile())) {
340 <         /* fprintf(stderr, "OOC_Sort: failure opening block file\n"); */
341 <         perror("OOC_Sort: failure opening block file");
342 <         return -1;
338 >            /* Parent continues here */
339 >            procCnt++;
340 >         }
341 >         else {
342 >            /* Recurse on subblock; without forking */
343 >            if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk,
344 >                                maxBlkSize, numProc, recSize, sortBuf,
345 >                                pqueue, keyData))
346 >               return -1;
347 >         }
348 >
349 >         /* Prepare for next block */
350 >         blkLo2 = blkHi2 + 1;
351        }
352 <      
353 <      if (feof(in)) {
354 <         fprintf(stderr, "OOC_Sort: unexpected end of input file %s\n",
355 <                 inFile);
356 <         return -1;
352 >
353 >      /* !!! Wait for any forked processes to terminate */
354 >      while (procCnt && wait(&stat) >= 0) {
355 >         if (!WIFEXITED(stat) || WEXITSTATUS(stat))
356 >            return -1;
357 >
358 >         procCnt--;
359        }
360 <      
361 <      /* Read next block (potentially truncated if last) */
362 <      if (!(numRec = fread(sortBuf, 1, blkSize, in) / recSize)) {
363 <         fprintf(stderr, "OOC_Sort: error reading from input file %s\n",
364 <                 inFile);
365 <         return -1;
366 <      }
360 >
361 >      /* ===============================================================
362 >       * Subblocks are now sorted; prepare priority queue by peeking and
363 >       * enqueueing first record from corresponding temporary file
364 >       * =============================================================== */
365 >
366 > #ifdef DEBUG_OOC_SORT
367 >      fprintf(stderr, "OOC_Sort: merging block [%lu - %lu]\n", blkLo, blkHi);
368 > #endif
369 >
370 >      for (b = 0; b < numBlk; b++) {
371 > #ifdef DEBUG_OOC_SORT
372 >         fseek(blkFile [b], 0, SEEK_END);
373 >         if (ftell(blkFile [b]) % recSize) {
374 >            fprintf(stderr, "OOC_Sort: truncated record in tmp block "
375 >                    "file %d\n", b);
376 >            return -1;
377 >         }
378 >          
379 >         fprintf(stderr, "OOC_Sort: tmp block file %d contains %ld rec\n",
380 >                 b, ftell(blkFile [b]) / recSize);
381 > #endif
382 >
383 >         rewind(blkFile [b]);
384 >
385 >         if (OOC_SortPeek(blkFile [b], recSize, rec)) {
386 >            fprintf(stderr, "OOC_Sort: error reading from block file\n");
387 >            return -1;
388 >         }      
389          
390 <      /* Quicksort block internally and write out to temp file */
391 <      qsort(sortBuf, numRec, recSize, OOC_Sort_Compare);
392 <      
393 <      if (fwrite(sortBuf, recSize, numRec, iBlk [b].file) != numRec) {
394 <         fprintf(stderr, "OOC_Sort: error writing to block file\n");
395 <         return -1;
390 >         /* Enqueue record along with its Morton code as priority */
391 >         pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
392 >                              keyData -> mortonScale);
393 >                              
394 >         if (OOC_SortPush(pqueue, pri, b) < 0) {
395 >            fprintf(stderr, "OOC_Sort: failed priority queue insertion\n");
396 >            return -1;
397 >         }
398        }
399        
400 <      rewind(iBlk [b].file);
401 <   }
400 >      /* ==========================================================
401 >       * Subblocks now sorted and priority queue filled; merge from
402 >       * temporary files
403 >       * ========================================================== */
404  
405 <   /* Close input file and free sort buffa */
406 <   fclose(in);
407 <   free(sortBuf);
405 >      do {
406 >         /* Get next node in priority queue, read next record in corresponding
407 >          * block, and send to output */
408 >         b = OOC_SortPop(pqueue);
409 >              
410 >         if (b >= 0) {
411 >            /* Priority queue non-empty */
412 >            if (OOC_SortRead(blkFile [b], recSize, rec)) {
413 >               /* Corresponding record should still be in the input block */
414 >               fprintf(stderr, "OOC_Sort: unexpected end reading block file\n");
415 >               return -1;
416 >            }
417  
418 <   /* Allocate priority queue with numBlk nodes */  
419 <   pqueue.tail = 0;
420 <   pqueue.len = numBlk;
421 <   if (!(pqueue.node = calloc(numBlk, sizeof(OOC_Sort_PQNode)))) {
422 <      fprintf(stderr, "OOC_Sort: failure allocating priority queue\n");
423 <      return -1;
418 >            if (OOC_SortWrite(out, recSize, rec)) {
419 >               fprintf(stderr, "OOC_Sort; error writing output file\n");
420 >               return -1;
421 >            }
422 >
423 > #ifdef DEBUG_OOC_SORT
424 >            pqCnt++;
425 > #endif            
426 >            
427 >            /* Peek next record from same block and insert in priority queue */
428 >            if (!OOC_SortPeek(blkFile [b], recSize, rec)) {
429 >               /* Block not exhausted */
430 >               pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg,
431 >                                    keyData -> mortonScale);
432 >              
433 >               if (OOC_SortPush(pqueue, pri, b) < 0) {
434 >                  fprintf(stderr, "OOC_Sort: failed priority queue insert\n");
435 >                  return -1;
436 >               }
437 >            }
438 >         }
439 >      } while (b >= 0);
440 >      
441 > #ifdef DEBUG_OOC_SORT
442 >      fprintf(stderr, "OOC_Sort: dequeued %lu rec\n", pqCnt);
443 >      fprintf(stderr, "OOC_Sort: merged file contains %lu rec\n",
444 >              ftell(out) / recSize);
445 > #endif
446 >      
447 >      /* Priority queue now empty -> done; close temporary subblock files,
448 >       * causing them to be automatically deleted. */
449 >      for (b = 0; b < numBlk; b++) {
450 >         fclose(blkFile [b]);
451 >      }
452 >        
453 >      /* !!! Commit output file to disk before caller reads it; omitting
454 >       * !!! this apparently leads to corrupted files (race condition?) when
455 >       * !!! the caller tries to read them! */
456 >      fflush(out);
457 >      fsync(fileno(out));
458     }
459 <  
460 <   /* Prepare for pass 2: allocate buffa for each input block and initialise
461 <    * fill priority queue with single record from each block */
462 <   for (b = 0; b < numBlk; b++) {
463 <      if (!(iBlk [b].buf = malloc(bufSize))) {
464 <         fprintf(stderr, "OOC_Sort: failure allocating input buffer\n");
459 >
460 >   else {  
461 >      /* ======================================
462 >       * Block is small enough for in-core sort
463 >       * ====================================== */  
464 >      int   ifd = fileno(in), ofd = fileno(out);
465 >
466 > #ifdef DEBUG_OOC_SORT
467 >      fprintf(stderr, "OOC_Sort: Proc %d (%d/%d) sorting block [%lu - %lu]\n",
468 >              getpid(), procCnt, numProc - 1, blkLo, blkHi);
469 >
470 > #endif
471 >
472 >      /* Atomically seek and read block into in-core sort buffer */
473 >      /* !!! Unbuffered I/O via pread() avoids potential race conditions
474 >       * !!! and buffer corruption which can occur with lseek()/fseek()
475 >       * !!! followed by read()/fread(). */
476 >      if (pread(ifd, sortBuf, blkSize, blkLo * recSize) != blkSize) {
477 >         perror("OOC_Sort: error reading from input file");
478           return -1;
479 <      }
480 <      
481 <      /* Peek first record in block file without modifying buffa */
482 <      iBlk [b].head = iBlk [b].tail = iBlk [b].buf + bufSize;
483 <      if (!(rec = OOC_Sort_Peek(iBlk + b, recSize))) {
484 <         fprintf(stderr, "OOC_Sort: error reading from block file\n");
479 >      }            
480 >
481 >      /* Quicksort block in-core and write to output file */
482 >      qsort(sortBuf, blkLen, recSize, OOC_KeyCompare);
483 >
484 >      if (write(ofd, sortBuf, blkSize) != blkSize) {
485 >         perror("OOC_Sort: error writing to block file");
486           return -1;
487        }
488        
489 <      /* Insert record into priority queue */
490 <      if (OOC_Sort_Push(&pqueue, priority(rec), b) < 0) {
491 <         fprintf(stderr, "OOC_Sort: failed priority queue insertion\n");
492 <         return -1;
493 <      }
494 <   }
489 >      fsync(ofd);
490 >
491 > #ifdef DEBUG_OOC_SORT
492 >      fprintf(stderr, "OOC_Sort: proc %d wrote %ld records\n",
493 >              getpid(), lseek(ofd, 0, SEEK_END) / recSize);
494 > #endif      
495 >   }      
496 >
497 >   return 0;
498 > }  
499 >
500 >
501 >
502 > int OOC_Sort (FILE *in, FILE *out, unsigned numBlk,
503 >              unsigned long blkSize, unsigned numProc, unsigned recSize,
504 >              FVECT bbOrg, RREAL bbSize, RREAL *(*key)(const void*))
505 > /* Sort records in inFile and append to outFile by subdividing inFile into
506 > * small blocks, sorting these in-core, and merging them out-of-core via a
507 > * priority queue.
508 > *
509 > * This is implemented as a recursive (numBlk)-way sort; the input is
510 > * successively split into numBlk smaller blocks until these are of size <=
511 > * blkSize, i.e. small enough for in-core sorting, then merging the sorted
512 > * blocks as the stack unwinds. The in-core sort is parallelised over
513 > * numProx processes.
514 > *
515 > * Parameters are as follows:
516 > * inFile      Opened input file containing unsorted records
517 > * outFile     Opened output file containing sorted records
518 > * numBlk      Number of blocks to divide into / merge from
519 > * blkSize     Max block size and size of in-core sort buffer, in bytes
520 > * numProc     Number of parallel processes for in-core sort
521 > * recSize     Size of input records in bytes
522 > * bbOrg       Origin of bounding box containing record keys for Morton code
523 > * bbSize      Extent of bounding box containing record keys for Morton code
524 > * key         Callback to access 3D coords from records for Morton code
525 > */
526 > {
527 >   unsigned long     numRec;
528 >   OOC_SortQueue     pqueue;
529 >   char              *sortBuf = NULL;
530 >   int               stat;
531    
532 <   /* Allocate output buffa and open output file */
533 <   if (!(oBlk.file = fopen(outFile, "wb"))) {
534 <      fprintf(stderr, "OOC_Sort: failure opening output file %s\n", outFile);
532 >   if (numBlk < 1)
533 >      numBlk = 1;
534 >
535 >   /* Open input file & get size in number of records */
536 >   if (fseek(in, 0, SEEK_END) < 0) {
537 >      fputs("OOC_Sort: failed seek in input file\n", stderr);
538        return -1;
539     }
540    
541 <   if (!(oBlk.head = oBlk.buf = malloc(bufSize))) {
542 <      fprintf(stderr, "OOC_Sort: failure allocating output buffer\n");
541 >   if (!(numRec = ftell(in) / recSize)) {
542 >      fputs("OOC_Sort: empty input file\n", stderr);
543        return -1;
544     }
545 <  
546 <   /* tail marks end of output buffa */
547 <   oBlk.tail = oBlk.buf + bufSize;
548 <  
549 <   /* ===================================================================
411 <    * Pass 2: External merge of all blocks using priority queue
412 <    * =================================================================== */
413 <      
414 <   do {
415 <      /* Get next node in priority queue, read next record in corresponding
416 <       * block, and send to output */
417 <      b = OOC_Sort_Pop(&pqueue);
418 <            
419 <      if (b >= 0) {
420 <         /* Priority queue non-empty */
421 <         if (!(rec = OOC_Sort_Read(iBlk + b, recSize))) {
422 <            /* Corresponding record should still be in the buffa, so EOF
423 <             * should not happen */
424 <            fprintf(stderr, "OOC_Sort: unexpected EOF in block file\n");
425 <            return -1;
426 <         }
427 <        
428 <         if (!OOC_Sort_Write(&oBlk, recSize, rec)) {
429 <            fprintf(stderr, "OOC_Sort; error writing to output file %s\n",
430 <                    outFile);
431 <            return -1;
432 <         }
433 <        
434 <         /* Peek next record from same block and insert in priority queue */
435 <         if (rec = OOC_Sort_Peek(iBlk + b, recSize))
436 <            /* Buffa not exhausted */
437 <            if (OOC_Sort_Push(&pqueue, priority(rec), b) < 0) {
438 <               fprintf(stderr, "OOC_Sort: failed priority queue insertion\n");
439 <               return -1;
440 <            }
441 <      }
442 <   } while (b >= 0);
443 <  
444 <   /* Priority queue now empty; flush output buffa & close file */
445 <   oBlk.tail = oBlk.head;
446 <   OOC_Sort_Write(&oBlk, 0, NULL);
447 <   fclose(oBlk.file);
448 <  
449 <   for (b = 0; b < numBlk; b++) {
450 <      fclose(iBlk [b].file);
451 <      free(iBlk [b].buf);
545 >
546 >   /* Allocate & init priority queue */
547 >   if (!(pqueue.node = calloc(numBlk, sizeof(OOC_SortQueueNode)))) {
548 >      fputs("OOC_Sort: failure allocating priority queue\n", stderr);
549 >      return -1;
550     }
551 +   pqueue.tail = 0;
552 +   pqueue.len = numBlk;
553  
554 <   free(iBlk);  
554 >   /* Allocate in-core sort buffa */
555 >   if (!(sortBuf = malloc(blkSize))) {
556 >      fprintf(stderr, "OOC_Sort: failure allocating in-core sort buffer");
557 >      return -1;
558 >   }
559 >  
560 >   /* Set up key data to pass to qsort()'s comparison func */
561 >   keyData.key = key;
562 >   keyData.mortonScale = OOC_MORTON_MAX / bbSize;
563 >   VCOPY(keyData.bbOrg, bbOrg);
564 >    
565 >   stat = OOC_SortRecurse(in, 0, numRec - 1, out, numBlk, blkSize, numProc,
566 >                          recSize, sortBuf, &pqueue, &keyData);
567 >
568 >   /* Cleanup */
569     free(pqueue.node);
570 +   free(sortBuf);
571    
572 <   return 0;          
573 < }  
572 >   return stat;          
573 > }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines