1 |
|
/* |
2 |
< |
================================================================== |
3 |
< |
N-way hybrid out-of-core merge sort |
4 |
< |
Sorts N blocks internally using quicksort, followed by N-way |
5 |
< |
merge using priority queue. |
2 |
> |
========================================================================= |
3 |
> |
N-way out-of-core merge sort for records with 3D keys. Recursively |
4 |
> |
subdivides input into N blocks until these are of sufficiently small size |
5 |
> |
to be sorted in-core according to Z-order (Morton code), then N-way |
6 |
> |
merging them out-of-core using a priority queue as the stack unwinds. |
7 |
|
|
8 |
< |
Roland Schregle (roland.schregle@{hslu.ch, gmail.com}) |
9 |
< |
(c) Fraunhofer Institute for Solar Energy Systems, |
10 |
< |
Lucerne University of Applied Sciences & Arts |
11 |
< |
================================================================== |
8 |
> |
Roland Schregle (roland.schregle@{hslu.ch, gmail.com}) |
9 |
> |
(c) Lucerne University of Applied Sciences and Arts, |
10 |
> |
supported by the Swiss National Science Foundation (SNSF, #147053) |
11 |
> |
========================================================================== |
12 |
|
|
13 |
< |
$Id $ |
13 |
> |
$Id$ |
14 |
|
*/ |
15 |
|
|
16 |
|
|
17 |
+ |
|
18 |
|
#include "oocsort.h" |
19 |
+ |
#include "oocmorton.h" |
20 |
|
#include <stdio.h> |
21 |
|
#include <stdlib.h> |
22 |
< |
#include <string.h> |
22 |
> |
#include <unistd.h> |
23 |
> |
#include <fcntl.h> |
24 |
> |
#include <sys/wait.h> |
25 |
|
|
26 |
|
|
27 |
+ |
|
28 |
|
/* Priority queue node */ |
29 |
|
typedef struct { |
30 |
< |
OOC_Sort_Key pri; /* Record's priority (sort key) */ |
31 |
< |
unsigned blk; /* Block containing record */ |
32 |
< |
} OOC_Sort_PQNode; |
30 |
> |
OOC_MortonIdx pri; /* Record's priority (sort key) */ |
31 |
> |
unsigned blk; /* Block containing record */ |
32 |
> |
} OOC_SortQueueNode; |
33 |
|
|
34 |
+ |
|
35 |
+ |
/* Priority queue */ |
36 |
|
typedef struct { |
37 |
< |
OOC_Sort_PQNode *node; |
38 |
< |
unsigned len, tail; |
39 |
< |
} OOC_Sort_PQueue; |
37 |
> |
OOC_SortQueueNode *node; |
38 |
> |
unsigned len, tail; |
39 |
> |
} OOC_SortQueue; |
40 |
|
|
41 |
< |
/* Block descriptor */ |
41 |
> |
|
42 |
> |
/* Additional data for qsort() compare function. We resort to instancing |
43 |
> |
* this as a global variable instead of passing it to the compare func via |
44 |
> |
* qsort_r(), since the latter is a non-portable GNU extension. */ |
45 |
|
typedef struct { |
46 |
< |
FILE *file; /* Temporary file */ |
47 |
< |
char *buf, *head, *tail; /* Associated I/O buffa, |
48 |
< |
head/tail pointers */ |
49 |
< |
} OOC_Sort_Block; |
46 |
> |
RREAL *(*key)(const void*); /* Callback to access 3D key in record */ |
47 |
> |
FVECT bbOrg; /* Origin of bbox containing keys */ |
48 |
> |
RREAL mortonScale; /* Scale for generating Morton code */ |
49 |
> |
} OOC_KeyData; |
50 |
|
|
51 |
+ |
static OOC_KeyData keyData; |
52 |
|
|
41 |
– |
/* Record priority evaluation function */ |
42 |
– |
static OOC_Sort_Key (*pri)(const void *); |
53 |
|
|
54 |
|
|
55 |
< |
static int OOC_Sort_Compare (const void *rec1, const void *rec2) |
56 |
< |
/* Comparison function for internal sorting */ |
55 |
> |
static int OOC_KeyCompare (const void *rec1, const void *rec2) |
56 |
> |
/* Comparison function for in-core quicksort */ |
57 |
|
{ |
58 |
< |
const OOC_Sort_Key pri1 = pri(rec1), pri2 = pri(rec2); |
58 |
> |
OOC_MortonIdx pri1, pri2; |
59 |
|
|
60 |
|
if (!rec1 || !rec2) |
61 |
|
return 0; |
62 |
+ |
|
63 |
+ |
pri1 = OOC_Key2Morton(keyData.key(rec1), keyData.bbOrg, |
64 |
+ |
keyData.mortonScale); |
65 |
+ |
pri2 = OOC_Key2Morton(keyData.key(rec2), keyData.bbOrg, |
66 |
+ |
keyData.mortonScale); |
67 |
|
|
68 |
|
if (pri1 < pri2) |
69 |
|
return -1; |
74 |
|
} |
75 |
|
|
76 |
|
|
77 |
< |
static char *OOC_Sort_Read (OOC_Sort_Block *blk, unsigned recSize) |
78 |
< |
/* Read next record from specified block; return pointer to buffa entry |
79 |
< |
or NULL on error */ |
77 |
> |
|
78 |
> |
static int OOC_SortRead (FILE *file, unsigned recSize, char *rec) |
79 |
> |
/* Read next record from file; return 0 and record in rec on success, |
80 |
> |
* else -1 */ |
81 |
|
{ |
82 |
< |
char *rec = NULL; |
83 |
< |
|
68 |
< |
if (blk -> head >= blk -> tail) { |
69 |
< |
/* Input buffa empty; read next block (potentially truncated if last), |
70 |
< |
* where tail marks end of buffa */ |
71 |
< |
const unsigned long bufSize = blk -> tail - blk -> buf; |
72 |
< |
|
73 |
< |
if (feof(blk -> file)) |
74 |
< |
return NULL; |
82 |
> |
if (!rec || feof(file) || !fread(rec, recSize, 1, file)) |
83 |
> |
return -1; |
84 |
|
|
85 |
< |
blk -> head = blk -> buf; |
77 |
< |
blk -> tail = blk -> buf + fread(blk -> buf, 1, bufSize, blk -> file); |
78 |
< |
|
79 |
< |
if (blk -> tail == blk -> head) |
80 |
< |
return NULL; |
81 |
< |
} |
82 |
< |
|
83 |
< |
rec = blk -> head; |
84 |
< |
blk -> head += recSize; |
85 |
< |
|
86 |
< |
return rec; |
85 |
> |
return 0; |
86 |
|
} |
87 |
|
|
88 |
|
|
89 |
< |
static char *OOC_Sort_Peek (OOC_Sort_Block *blk, unsigned recSize) |
90 |
< |
/* Return next record from specified block WITHOUT removing from buffa */ |
89 |
> |
|
90 |
> |
static int OOC_SortPeek (FILE *file, unsigned recSize, char *rec) |
91 |
> |
/* Return next record from file WITHOUT advancing file position; |
92 |
> |
* return 0 and record in rec on success, else -1 */ |
93 |
|
{ |
94 |
< |
char *rec = NULL; |
95 |
< |
|
96 |
< |
if (rec = OOC_Sort_Read(blk, recSize)) { |
97 |
< |
/* Restore buffa head */ |
98 |
< |
blk -> head -= recSize; |
99 |
< |
} |
100 |
< |
|
101 |
< |
return rec; |
94 |
> |
const unsigned long filePos = ftell(file); |
95 |
> |
|
96 |
> |
if (OOC_SortRead(file, recSize, rec)) |
97 |
> |
return -1; |
98 |
> |
|
99 |
> |
/* Workaround; fseek(file, -recSize, SEEK_CUR) causes subsequent |
100 |
> |
* fread()'s to fail until rewind() */ |
101 |
> |
rewind(file); |
102 |
> |
if (fseek(file, filePos, SEEK_SET) < 0) |
103 |
> |
return -1; |
104 |
> |
|
105 |
> |
return 0; |
106 |
|
} |
107 |
|
|
108 |
|
|
109 |
< |
static char *OOC_Sort_Write (OOC_Sort_Block *blk, unsigned recSize, |
110 |
< |
const char *rec) |
111 |
< |
/* Output record to specified block and return pointer to buffa entry |
107 |
< |
or NULL on error */ |
109 |
> |
|
110 |
> |
static int OOC_SortWrite (FILE *file, unsigned recSize, const char *rec) |
111 |
> |
/* Output record to file; return 0 on success, else -1 */ |
112 |
|
{ |
113 |
< |
char *res = NULL; |
113 |
> |
if (!rec || !fwrite(rec, recSize, 1, file)) |
114 |
> |
return -1; |
115 |
|
|
116 |
< |
if (blk -> head >= blk -> tail) { |
112 |
< |
/* Flush output buffa (tail marks end of buffa) */ |
113 |
< |
const unsigned long bufSize = blk -> tail - blk -> buf; |
114 |
< |
|
115 |
< |
blk -> head = blk -> buf; |
116 |
< |
if (fwrite(blk -> buf, 1, bufSize, blk -> file) != bufSize) |
117 |
< |
return NULL; |
118 |
< |
} |
119 |
< |
|
120 |
< |
if (!rec) |
121 |
< |
return NULL; |
122 |
< |
|
123 |
< |
memcpy(blk -> head, rec, recSize); |
124 |
< |
res = blk -> head; |
125 |
< |
blk -> head += recSize; |
126 |
< |
|
127 |
< |
return res; |
116 |
> |
return 0; |
117 |
|
} |
118 |
|
|
119 |
|
|
120 |
< |
#ifdef DEBUG |
121 |
< |
static int OOC_Sort_PQCheck (OOC_Sort_PQueue *pq, unsigned root) |
120 |
> |
|
121 |
> |
#ifdef DEBUG_OOC_SORT |
122 |
> |
static int OOC_SortQueueCheck (OOC_SortQueue *pq, unsigned root) |
123 |
|
/* Priority queue sanity check */ |
124 |
|
{ |
125 |
|
unsigned kid; |
126 |
|
|
127 |
|
if (root < pq -> tail) { |
128 |
< |
if ((kid = (root << 1) + 1) < pq -> tail) |
128 |
> |
if ((kid = (root << 1) + 1) < pq -> tail) { |
129 |
|
if (pq -> node [kid].pri < pq -> node [root].pri) |
130 |
|
return -1; |
131 |
< |
else return OOC_Sort_PQCheck(pq, kid); |
131 |
> |
else return OOC_SortQueueCheck(pq, kid); |
132 |
> |
} |
133 |
|
|
134 |
< |
if ((kid = kid + 1) < pq -> tail) |
134 |
> |
if ((kid = kid + 1) < pq -> tail) { |
135 |
|
if (pq -> node [kid].pri < pq -> node [root].pri) |
136 |
|
return -1; |
137 |
< |
else return OOC_Sort_PQCheck(pq, kid); |
138 |
< |
|
137 |
> |
else return OOC_SortQueueCheck(pq, kid); |
138 |
> |
} |
139 |
|
} |
140 |
|
|
141 |
|
return 0; |
143 |
|
#endif |
144 |
|
|
145 |
|
|
146 |
< |
static int OOC_Sort_Push (OOC_Sort_PQueue *pq, OOC_Sort_Key pri, |
147 |
< |
unsigned blk) |
146 |
> |
|
147 |
> |
static int OOC_SortPush (OOC_SortQueue *pq, OOC_MortonIdx pri, unsigned blk) |
148 |
|
/* Insert specified block index into priority queue; return block index |
149 |
|
* or -1 if queue is full */ |
150 |
|
{ |
151 |
< |
OOC_Sort_PQNode *pqn = pq -> node; |
152 |
< |
unsigned kid, root; |
151 |
> |
OOC_SortQueueNode *pqn = pq -> node; |
152 |
> |
unsigned kid, root; |
153 |
|
|
154 |
|
if (pq -> tail >= pq -> len) |
155 |
|
/* Queue full */ |
159 |
|
kid = pq -> tail++; |
160 |
|
|
161 |
|
while (kid) { |
162 |
< |
root = kid - 1 >> 1; |
162 |
> |
root = (kid - 1) >> 1; |
163 |
|
|
164 |
|
/* Compare with parent node and swap if necessary, else terminate */ |
165 |
|
if (pri < pqn [root].pri) { |
173 |
|
pqn [kid].pri = pri; |
174 |
|
pqn [kid].blk = blk; |
175 |
|
|
176 |
< |
#ifdef DEBUG |
177 |
< |
if (OOC_Sort_PQCheck(pq, 0) < 0) { |
176 |
> |
#ifdef DEBUG_OOC_SORT |
177 |
> |
if (OOC_SortQueueCheck(pq, 0) < 0) { |
178 |
|
fprintf(stderr, "OOC_Sort: priority queue inconsistency\n"); |
179 |
|
return -1; |
180 |
|
} |
184 |
|
} |
185 |
|
|
186 |
|
|
187 |
< |
static int OOC_Sort_Pop (OOC_Sort_PQueue *pq) |
187 |
> |
|
188 |
> |
static int OOC_SortPop (OOC_SortQueue *pq) |
189 |
|
/* Remove head of priority queue and return its block index |
190 |
< |
* or -1 if queue is empty */ |
190 |
> |
or -1 if queue empty */ |
191 |
|
{ |
192 |
< |
OOC_Sort_PQNode *pqn = pq -> node; |
193 |
< |
OOC_Sort_Key pri; |
194 |
< |
unsigned kid, kid2, root = 0, blk, res; |
192 |
> |
OOC_SortQueueNode *pqn = pq -> node; |
193 |
> |
OOC_MortonIdx pri; |
194 |
> |
unsigned kid, kid2, root = 0, blk, res; |
195 |
|
|
196 |
|
if (!pq -> tail) |
197 |
|
/* Queue empty */ |
203 |
|
|
204 |
|
/* Replace head with tail node and re-sort */ |
205 |
|
while ((kid = (root << 1) + 1) < pq -> tail) { |
206 |
< |
/* Compare with with smaller kid and swap if necessary, else |
215 |
< |
* terminate */ |
206 |
> |
/* Compare with smaller kid and swap if necessary, else terminate */ |
207 |
|
if ((kid2 = (kid + 1)) < pq -> tail && pqn [kid2].pri < pqn [kid].pri) |
208 |
|
kid = kid2; |
209 |
|
|
219 |
|
pqn [root].pri = pri; |
220 |
|
pqn [root].blk = blk; |
221 |
|
|
222 |
< |
#ifdef DEBUG |
223 |
< |
if (OOC_Sort_PQCheck(pq, 0) < 0) { |
222 |
> |
#ifdef DEBUG_OOC_SORT |
223 |
> |
if (OOC_SortQueueCheck(pq, 0) < 0) { |
224 |
|
fprintf(stderr, "OOC_Sort: priority queue inconsistency\n"); |
225 |
|
return -1; |
226 |
|
} |
228 |
|
|
229 |
|
return res; |
230 |
|
} |
240 |
– |
|
241 |
– |
#if 0 |
242 |
– |
int OOC_Sort (const char *inFile, const char *outFile, |
243 |
– |
unsigned long blkSize, unsigned recSize, |
244 |
– |
OOC_Sort_Key (*priority)(const void *)) |
245 |
– |
/* Sort records in inFile and output to outFile by (a) internally |
246 |
– |
* quicksorting block of blkSize bytes at a time, then (b) merging them |
247 |
– |
* via a priority queue. RecSize specifies the size in bytes of each data |
248 |
– |
* record. The priority() callback evaluates a record's priority and must be |
249 |
– |
* supplied by the caller. */ |
250 |
– |
{ |
251 |
– |
FILE *in = NULL; |
252 |
– |
OOC_Sort_PQueue pqueue; /* Priority queue */ |
253 |
– |
OOC_Sort_Block *iBlk = NULL, oBlk; /* Block descriptors */ |
254 |
– |
char *rec, *sortBuf = NULL; /* Internal sort buffa */ |
255 |
– |
unsigned bufSize, numBlk; |
256 |
– |
int b; |
257 |
– |
|
258 |
– |
/* Set record priority evaluation callback */ |
259 |
– |
pri = priority; |
260 |
– |
|
261 |
– |
/* Round block and buffa size down to nearest multiple of record size */ |
262 |
– |
blkSize = (blkSize / recSize) * recSize; |
263 |
– |
bufSize = (OOC_SORT_BUFSIZE / recSize) * recSize; |
231 |
|
|
265 |
– |
if (!(in = fopen(inFile, "rb"))) { |
266 |
– |
fprintf(stderr, "OOC_Sort: failure opening input file %s\n", inFile); |
267 |
– |
return -1; |
268 |
– |
} |
269 |
– |
|
270 |
– |
/* Get input file size and number of blocks (rounded up to nearest |
271 |
– |
* multiple of block size) */ |
272 |
– |
fseek(in, 0, SEEK_END); |
273 |
– |
numBlk = (ftell(in) + blkSize - 1) / blkSize; |
274 |
– |
rewind(in); |
275 |
– |
#else |
232 |
|
|
233 |
< |
int OOC_Sort (const char *inFile, const char *outFile, |
234 |
< |
unsigned numBlk, unsigned recSize, |
235 |
< |
OOC_Sort_Key (*priority)(const void *)) |
236 |
< |
/* Sort records in inFile and output to outFile by (a) internally |
237 |
< |
* quicksorting numBlk blocks at a time, then (b) merging them via a priority |
238 |
< |
* queue. RecSize specifies the size in bytes of each data record. The |
239 |
< |
* priority() callback evaluates a record's priority (ordinal index) and |
240 |
< |
* must be supplied by the caller. */ |
233 |
> |
|
234 |
> |
/* Active subprocess counter updated by parent process; must persist when |
235 |
> |
* recursing into OOC_SortRecurse(), hence global */ |
236 |
> |
static unsigned procCnt = 0; |
237 |
> |
|
238 |
> |
static int OOC_SortRecurse (FILE *in, unsigned long blkLo, |
239 |
> |
unsigned long blkHi, FILE *out, |
240 |
> |
unsigned numBlk, unsigned long maxBlkSize, |
241 |
> |
unsigned numProc, unsigned recSize, |
242 |
> |
char *sortBuf, OOC_SortQueue *pqueue, |
243 |
> |
const OOC_KeyData *keyData) |
244 |
> |
/* Recursive part of OOC_Sort(). Reads block of records from input file |
245 |
> |
* within the interval [blkLo, blkHi] and divides it into numBlk blocks |
246 |
> |
* until the size (in bytes) does not exceed maxBlkSize, then quicksorts |
247 |
> |
* each block into a temporary file. These files are then mergesorted via a |
248 |
> |
* priority queue to the output file as the stack unwinds. NOTE: Critical |
249 |
> |
* sections are prepended with '!!!' |
250 |
> |
* |
251 |
> |
* Parameters are as follows: |
252 |
> |
* in Input file containing unsorted records (assumed to be open) |
253 |
> |
* blkLo Start of current block in input file, in number of records |
254 |
> |
* blkHi End of current block in input file, in number of records |
255 |
> |
* out Output file containing sorted records (assumed to be open) |
256 |
> |
* numBlk Number of blocks to divide into / merge from |
257 |
> |
* maxBlkSize Max block size and size of in-core sort buffer, in bytes |
258 |
> |
* numProc Number of parallel processes for in-core sort |
259 |
> |
* recSize Size of input records in bytes |
260 |
> |
* sortBuf Preallocated in-core sort buffer of size maxBlkSize |
261 |
> |
* pqueue Preallocated priority queue of length numBlk for block merge |
262 |
> |
* keyData Aggregate data for Morton code generation and comparison |
263 |
> |
*/ |
264 |
|
{ |
265 |
< |
FILE *in = NULL; |
266 |
< |
OOC_Sort_PQueue pqueue; /* Priority queue */ |
267 |
< |
OOC_Sort_Block *iBlk = NULL, oBlk; /* Block descriptors */ |
289 |
< |
char *rec, *sortBuf = NULL; /* Internal sort buffa */ |
290 |
< |
unsigned long blkSize; |
291 |
< |
unsigned bufSize; |
292 |
< |
int b; |
265 |
> |
const unsigned long blkLen = blkHi - blkLo + 1, |
266 |
> |
blkSize = blkLen * recSize; |
267 |
> |
int stat; |
268 |
|
|
269 |
< |
/* Set record priority evaluation callback */ |
270 |
< |
pri = priority; |
271 |
< |
|
272 |
< |
/* Round buffa size down to nearest multiple of record size */ |
273 |
< |
bufSize = (OOC_SORT_BUFSIZE / recSize) * recSize; |
269 |
> |
if (!blkLen || blkHi < blkLo) |
270 |
> |
return 0; |
271 |
> |
|
272 |
> |
if (blkSize > maxBlkSize) { |
273 |
> |
unsigned long blkLo2 = blkLo, blkHi2 = blkLo, blkSize2; |
274 |
> |
const double blkLen2 = (double)blkLen / numBlk; |
275 |
> |
FILE *blkFile [numBlk]; /* Violates C89! */ |
276 |
> |
char rec [recSize]; /* Ditto */ |
277 |
> |
OOC_MortonIdx pri; |
278 |
> |
int b, pid; |
279 |
> |
#ifdef DEBUG_OOC_SORT |
280 |
> |
unsigned long pqCnt = 0; |
281 |
> |
#endif |
282 |
> |
|
283 |
> |
/* ====================================================== |
284 |
> |
* Block too large for in-core sort -> divide into numBlk |
285 |
> |
* subblocks and recurse |
286 |
> |
* ====================================================== */ |
287 |
|
|
288 |
< |
if (!(in = fopen(inFile, "rb"))) { |
289 |
< |
fprintf(stderr, "OOC_Sort: failure opening input file %s\n", inFile); |
290 |
< |
return -1; |
303 |
< |
} |
304 |
< |
|
305 |
< |
/* Get input file size and block size (in number of records) rounded up |
306 |
< |
* to nearest multiple of number of blocks */ |
307 |
< |
fseek(in, 0, SEEK_END); |
308 |
< |
blkSize = (ftell(in) / recSize + numBlk - 1) / numBlk; |
309 |
< |
rewind(in); |
288 |
> |
#ifdef DEBUG_OOC_SORT |
289 |
> |
fprintf(stderr, "OOC_Sort: splitting block [%lu - %lu]\n", |
290 |
> |
blkLo, blkHi); |
291 |
|
#endif |
292 |
|
|
293 |
< |
/* Allocate buffa for internal sorting */ |
294 |
< |
if (!(sortBuf = malloc(blkSize))) { |
295 |
< |
fprintf(stderr, "OOC_Sort: failure allocating internal sort buffer\n"); |
296 |
< |
return -1; |
297 |
< |
} |
298 |
< |
|
299 |
< |
/* Allocate input blocks */ |
300 |
< |
if (!(iBlk = calloc(numBlk, sizeof(OOC_Sort_Block)))) { |
301 |
< |
fprintf(stderr, "OOC_Sort: failure allocating input blocks\n"); |
302 |
< |
return -1; |
322 |
< |
} |
323 |
< |
|
324 |
< |
/* =================================================================== |
325 |
< |
* Pass 1: Internal quicksort of each input block in sortBuf |
326 |
< |
* =================================================================== */ |
293 |
> |
for (b = 0; b < numBlk; b++) { |
294 |
> |
/* Open temporary file as output for subblock */ |
295 |
> |
if (!(blkFile [b] = tmpfile())) { |
296 |
> |
perror("OOC_Sort: failed opening temporary block file"); |
297 |
> |
return -1; |
298 |
> |
} |
299 |
> |
|
300 |
> |
/* Subblock interval [blkLo2, blkHi2] of size blkSize2 */ |
301 |
> |
blkHi2 = blkLo + (b + 1) * blkLen2 - 1; |
302 |
> |
blkSize2 = (blkHi2 - blkLo2 + 1) * recSize; |
303 |
|
|
304 |
< |
for (b = 0; b < numBlk; b++) { |
305 |
< |
unsigned long numRec; |
304 |
> |
if (blkSize2 <= maxBlkSize) { |
305 |
> |
/* !!! Will be in-core sorted on recursion -> fork kid process, |
306 |
> |
* !!! but don't fork more than numProc kids; wait if necessary */ |
307 |
> |
while (procCnt >= numProc && wait(&stat) >= 0) { |
308 |
> |
if (!WIFEXITED(stat) || WEXITSTATUS(stat)) |
309 |
> |
return -1; |
310 |
> |
|
311 |
> |
procCnt--; |
312 |
> |
} |
313 |
> |
|
314 |
> |
/* Now fork kid process */ |
315 |
> |
if (!(pid = fork())) { |
316 |
> |
/* Recurse on subblocks with new input filehandle; */ |
317 |
> |
if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk, |
318 |
> |
maxBlkSize, numProc, recSize, sortBuf, |
319 |
> |
pqueue, keyData)) |
320 |
> |
exit(-1); |
321 |
> |
|
322 |
> |
/* !!! Apparently the parent's tmpfile isn't deleted when the |
323 |
> |
* !!! child exits (which is what we want), though some |
324 |
> |
* !!! sources suggest using _Exit() instead; is this |
325 |
> |
* !!! implementation specific? */ |
326 |
> |
exit(0); |
327 |
> |
} |
328 |
> |
else if (pid < 0) { |
329 |
> |
fprintf(stderr, "OOC_Sort: failed to fork subprocess\n"); |
330 |
> |
return -1; |
331 |
> |
} |
332 |
> |
|
333 |
> |
#ifdef DEBUG_OOC_FORK |
334 |
> |
fprintf(stderr, "OOC_Sort: forking kid %d for block %d\n", |
335 |
> |
procCnt, b); |
336 |
> |
#endif |
337 |
|
|
338 |
< |
/* Open temporary file associated with block */ |
339 |
< |
if (!(iBlk [b].file = tmpfile())) { |
340 |
< |
/* fprintf(stderr, "OOC_Sort: failure opening block file\n"); */ |
341 |
< |
perror("OOC_Sort: failure opening block file"); |
342 |
< |
return -1; |
338 |
> |
/* Parent continues here */ |
339 |
> |
procCnt++; |
340 |
> |
} |
341 |
> |
else { |
342 |
> |
/* Recurse on subblock; without forking */ |
343 |
> |
if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk, |
344 |
> |
maxBlkSize, numProc, recSize, sortBuf, |
345 |
> |
pqueue, keyData)) |
346 |
> |
return -1; |
347 |
> |
} |
348 |
> |
|
349 |
> |
/* Prepare for next block */ |
350 |
> |
blkLo2 = blkHi2 + 1; |
351 |
|
} |
352 |
< |
|
353 |
< |
if (feof(in)) { |
354 |
< |
fprintf(stderr, "OOC_Sort: unexpected end of input file %s\n", |
355 |
< |
inFile); |
356 |
< |
return -1; |
352 |
> |
|
353 |
> |
/* !!! Wait for any forked processes to terminate */ |
354 |
> |
while (procCnt && wait(&stat) >= 0) { |
355 |
> |
if (!WIFEXITED(stat) || WEXITSTATUS(stat)) |
356 |
> |
return -1; |
357 |
> |
|
358 |
> |
procCnt--; |
359 |
|
} |
360 |
< |
|
361 |
< |
/* Read next block (potentially truncated if last) */ |
362 |
< |
if (!(numRec = fread(sortBuf, 1, blkSize, in) / recSize)) { |
363 |
< |
fprintf(stderr, "OOC_Sort: error reading from input file %s\n", |
364 |
< |
inFile); |
365 |
< |
return -1; |
366 |
< |
} |
360 |
> |
|
361 |
> |
/* =============================================================== |
362 |
> |
* Subblocks are now sorted; prepare priority queue by peeking and |
363 |
> |
* enqueueing first record from corresponding temporary file |
364 |
> |
* =============================================================== */ |
365 |
> |
|
366 |
> |
#ifdef DEBUG_OOC_SORT |
367 |
> |
fprintf(stderr, "OOC_Sort: merging block [%lu - %lu]\n", blkLo, blkHi); |
368 |
> |
#endif |
369 |
> |
|
370 |
> |
for (b = 0; b < numBlk; b++) { |
371 |
> |
#ifdef DEBUG_OOC_SORT |
372 |
> |
fseek(blkFile [b], 0, SEEK_END); |
373 |
> |
if (ftell(blkFile [b]) % recSize) { |
374 |
> |
fprintf(stderr, "OOC_Sort: truncated record in tmp block " |
375 |
> |
"file %d\n", b); |
376 |
> |
return -1; |
377 |
> |
} |
378 |
> |
|
379 |
> |
fprintf(stderr, "OOC_Sort: tmp block file %d contains %ld rec\n", |
380 |
> |
b, ftell(blkFile [b]) / recSize); |
381 |
> |
#endif |
382 |
> |
|
383 |
> |
rewind(blkFile [b]); |
384 |
> |
|
385 |
> |
if (OOC_SortPeek(blkFile [b], recSize, rec)) { |
386 |
> |
fprintf(stderr, "OOC_Sort: error reading from block file\n"); |
387 |
> |
return -1; |
388 |
> |
} |
389 |
|
|
390 |
< |
/* Quicksort block internally and write out to temp file */ |
391 |
< |
qsort(sortBuf, numRec, recSize, OOC_Sort_Compare); |
392 |
< |
|
393 |
< |
if (fwrite(sortBuf, recSize, numRec, iBlk [b].file) != numRec) { |
394 |
< |
fprintf(stderr, "OOC_Sort: error writing to block file\n"); |
395 |
< |
return -1; |
390 |
> |
/* Enqueue record along with its Morton code as priority */ |
391 |
> |
pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg, |
392 |
> |
keyData -> mortonScale); |
393 |
> |
|
394 |
> |
if (OOC_SortPush(pqueue, pri, b) < 0) { |
395 |
> |
fprintf(stderr, "OOC_Sort: failed priority queue insertion\n"); |
396 |
> |
return -1; |
397 |
> |
} |
398 |
|
} |
399 |
|
|
400 |
< |
rewind(iBlk [b].file); |
401 |
< |
} |
400 |
> |
/* ========================================================== |
401 |
> |
* Subblocks now sorted and priority queue filled; merge from |
402 |
> |
* temporary files |
403 |
> |
* ========================================================== */ |
404 |
|
|
405 |
< |
/* Close input file and free sort buffa */ |
406 |
< |
fclose(in); |
407 |
< |
free(sortBuf); |
405 |
> |
do { |
406 |
> |
/* Get next node in priority queue, read next record in corresponding |
407 |
> |
* block, and send to output */ |
408 |
> |
b = OOC_SortPop(pqueue); |
409 |
> |
|
410 |
> |
if (b >= 0) { |
411 |
> |
/* Priority queue non-empty */ |
412 |
> |
if (OOC_SortRead(blkFile [b], recSize, rec)) { |
413 |
> |
/* Corresponding record should still be in the input block */ |
414 |
> |
fprintf(stderr, "OOC_Sort: unexpected end reading block file\n"); |
415 |
> |
return -1; |
416 |
> |
} |
417 |
|
|
418 |
< |
/* Allocate priority queue with numBlk nodes */ |
419 |
< |
pqueue.tail = 0; |
420 |
< |
pqueue.len = numBlk; |
421 |
< |
if (!(pqueue.node = calloc(numBlk, sizeof(OOC_Sort_PQNode)))) { |
422 |
< |
fprintf(stderr, "OOC_Sort: failure allocating priority queue\n"); |
423 |
< |
return -1; |
418 |
> |
if (OOC_SortWrite(out, recSize, rec)) { |
419 |
> |
fprintf(stderr, "OOC_Sort; error writing output file\n"); |
420 |
> |
return -1; |
421 |
> |
} |
422 |
> |
|
423 |
> |
#ifdef DEBUG_OOC_SORT |
424 |
> |
pqCnt++; |
425 |
> |
#endif |
426 |
> |
|
427 |
> |
/* Peek next record from same block and insert in priority queue */ |
428 |
> |
if (!OOC_SortPeek(blkFile [b], recSize, rec)) { |
429 |
> |
/* Block not exhausted */ |
430 |
> |
pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg, |
431 |
> |
keyData -> mortonScale); |
432 |
> |
|
433 |
> |
if (OOC_SortPush(pqueue, pri, b) < 0) { |
434 |
> |
fprintf(stderr, "OOC_Sort: failed priority queue insert\n"); |
435 |
> |
return -1; |
436 |
> |
} |
437 |
> |
} |
438 |
> |
} |
439 |
> |
} while (b >= 0); |
440 |
> |
|
441 |
> |
#ifdef DEBUG_OOC_SORT |
442 |
> |
fprintf(stderr, "OOC_Sort: dequeued %lu rec\n", pqCnt); |
443 |
> |
fprintf(stderr, "OOC_Sort: merged file contains %lu rec\n", |
444 |
> |
ftell(out) / recSize); |
445 |
> |
#endif |
446 |
> |
|
447 |
> |
/* Priority queue now empty -> done; close temporary subblock files, |
448 |
> |
* causing them to be automatically deleted. */ |
449 |
> |
for (b = 0; b < numBlk; b++) { |
450 |
> |
fclose(blkFile [b]); |
451 |
> |
} |
452 |
> |
|
453 |
> |
/* !!! Commit output file to disk before caller reads it; omitting |
454 |
> |
* !!! this apparently leads to corrupted files (race condition?) when |
455 |
> |
* !!! the caller tries to read them! */ |
456 |
> |
fflush(out); |
457 |
> |
fsync(fileno(out)); |
458 |
|
} |
459 |
< |
|
460 |
< |
/* Prepare for pass 2: allocate buffa for each input block and initialise |
461 |
< |
* fill priority queue with single record from each block */ |
462 |
< |
for (b = 0; b < numBlk; b++) { |
463 |
< |
if (!(iBlk [b].buf = malloc(bufSize))) { |
464 |
< |
fprintf(stderr, "OOC_Sort: failure allocating input buffer\n"); |
459 |
> |
|
460 |
> |
else { |
461 |
> |
/* ====================================== |
462 |
> |
* Block is small enough for in-core sort |
463 |
> |
* ====================================== */ |
464 |
> |
int ifd = fileno(in), ofd = fileno(out); |
465 |
> |
|
466 |
> |
#ifdef DEBUG_OOC_SORT |
467 |
> |
fprintf(stderr, "OOC_Sort: Proc %d (%d/%d) sorting block [%lu - %lu]\n", |
468 |
> |
getpid(), procCnt, numProc - 1, blkLo, blkHi); |
469 |
> |
|
470 |
> |
#endif |
471 |
> |
|
472 |
> |
/* Atomically seek and read block into in-core sort buffer */ |
473 |
> |
/* !!! Unbuffered I/O via pread() avoids potential race conditions |
474 |
> |
* !!! and buffer corruption which can occur with lseek()/fseek() |
475 |
> |
* !!! followed by read()/fread(). */ |
476 |
> |
if (pread(ifd, sortBuf, blkSize, blkLo * recSize) != blkSize) { |
477 |
> |
perror("OOC_Sort: error reading from input file"); |
478 |
|
return -1; |
479 |
< |
} |
480 |
< |
|
481 |
< |
/* Peek first record in block file without modifying buffa */ |
482 |
< |
iBlk [b].head = iBlk [b].tail = iBlk [b].buf + bufSize; |
483 |
< |
if (!(rec = OOC_Sort_Peek(iBlk + b, recSize))) { |
484 |
< |
fprintf(stderr, "OOC_Sort: error reading from block file\n"); |
479 |
> |
} |
480 |
> |
|
481 |
> |
/* Quicksort block in-core and write to output file */ |
482 |
> |
qsort(sortBuf, blkLen, recSize, OOC_KeyCompare); |
483 |
> |
|
484 |
> |
if (write(ofd, sortBuf, blkSize) != blkSize) { |
485 |
> |
perror("OOC_Sort: error writing to block file"); |
486 |
|
return -1; |
487 |
|
} |
488 |
|
|
489 |
< |
/* Insert record into priority queue */ |
490 |
< |
if (OOC_Sort_Push(&pqueue, priority(rec), b) < 0) { |
491 |
< |
fprintf(stderr, "OOC_Sort: failed priority queue insertion\n"); |
492 |
< |
return -1; |
493 |
< |
} |
494 |
< |
} |
489 |
> |
fsync(ofd); |
490 |
> |
|
491 |
> |
#ifdef DEBUG_OOC_SORT |
492 |
> |
fprintf(stderr, "OOC_Sort: proc %d wrote %ld records\n", |
493 |
> |
getpid(), lseek(ofd, 0, SEEK_END) / recSize); |
494 |
> |
#endif |
495 |
> |
} |
496 |
> |
|
497 |
> |
return 0; |
498 |
> |
} |
499 |
> |
|
500 |
> |
|
501 |
> |
|
502 |
> |
int OOC_Sort (FILE *in, FILE *out, unsigned numBlk, |
503 |
> |
unsigned long blkSize, unsigned numProc, unsigned recSize, |
504 |
> |
FVECT bbOrg, RREAL bbSize, RREAL *(*key)(const void*)) |
505 |
> |
/* Sort records in inFile and append to outFile by subdividing inFile into |
506 |
> |
* small blocks, sorting these in-core, and merging them out-of-core via a |
507 |
> |
* priority queue. |
508 |
> |
* |
509 |
> |
* This is implemented as a recursive (numBlk)-way sort; the input is |
510 |
> |
* successively split into numBlk smaller blocks until these are of size <= |
511 |
> |
* blkSize, i.e. small enough for in-core sorting, then merging the sorted |
512 |
> |
* blocks as the stack unwinds. The in-core sort is parallelised over |
513 |
> |
* numProx processes. |
514 |
> |
* |
515 |
> |
* Parameters are as follows: |
516 |
> |
* inFile Opened input file containing unsorted records |
517 |
> |
* outFile Opened output file containing sorted records |
518 |
> |
* numBlk Number of blocks to divide into / merge from |
519 |
> |
* blkSize Max block size and size of in-core sort buffer, in bytes |
520 |
> |
* numProc Number of parallel processes for in-core sort |
521 |
> |
* recSize Size of input records in bytes |
522 |
> |
* bbOrg Origin of bounding box containing record keys for Morton code |
523 |
> |
* bbSize Extent of bounding box containing record keys for Morton code |
524 |
> |
* key Callback to access 3D coords from records for Morton code |
525 |
> |
*/ |
526 |
> |
{ |
527 |
> |
unsigned long numRec; |
528 |
> |
OOC_SortQueue pqueue; |
529 |
> |
char *sortBuf = NULL; |
530 |
> |
int stat; |
531 |
|
|
532 |
< |
/* Allocate output buffa and open output file */ |
533 |
< |
if (!(oBlk.file = fopen(outFile, "wb"))) { |
534 |
< |
fprintf(stderr, "OOC_Sort: failure opening output file %s\n", outFile); |
532 |
> |
if (numBlk < 1) |
533 |
> |
numBlk = 1; |
534 |
> |
|
535 |
> |
/* Open input file & get size in number of records */ |
536 |
> |
if (fseek(in, 0, SEEK_END) < 0) { |
537 |
> |
fputs("OOC_Sort: failed seek in input file\n", stderr); |
538 |
|
return -1; |
539 |
|
} |
540 |
|
|
541 |
< |
if (!(oBlk.head = oBlk.buf = malloc(bufSize))) { |
542 |
< |
fprintf(stderr, "OOC_Sort: failure allocating output buffer\n"); |
541 |
> |
if (!(numRec = ftell(in) / recSize)) { |
542 |
> |
fputs("OOC_Sort: empty input file\n", stderr); |
543 |
|
return -1; |
544 |
|
} |
545 |
< |
|
546 |
< |
/* tail marks end of output buffa */ |
547 |
< |
oBlk.tail = oBlk.buf + bufSize; |
548 |
< |
|
549 |
< |
/* =================================================================== |
411 |
< |
* Pass 2: External merge of all blocks using priority queue |
412 |
< |
* =================================================================== */ |
413 |
< |
|
414 |
< |
do { |
415 |
< |
/* Get next node in priority queue, read next record in corresponding |
416 |
< |
* block, and send to output */ |
417 |
< |
b = OOC_Sort_Pop(&pqueue); |
418 |
< |
|
419 |
< |
if (b >= 0) { |
420 |
< |
/* Priority queue non-empty */ |
421 |
< |
if (!(rec = OOC_Sort_Read(iBlk + b, recSize))) { |
422 |
< |
/* Corresponding record should still be in the buffa, so EOF |
423 |
< |
* should not happen */ |
424 |
< |
fprintf(stderr, "OOC_Sort: unexpected EOF in block file\n"); |
425 |
< |
return -1; |
426 |
< |
} |
427 |
< |
|
428 |
< |
if (!OOC_Sort_Write(&oBlk, recSize, rec)) { |
429 |
< |
fprintf(stderr, "OOC_Sort; error writing to output file %s\n", |
430 |
< |
outFile); |
431 |
< |
return -1; |
432 |
< |
} |
433 |
< |
|
434 |
< |
/* Peek next record from same block and insert in priority queue */ |
435 |
< |
if (rec = OOC_Sort_Peek(iBlk + b, recSize)) |
436 |
< |
/* Buffa not exhausted */ |
437 |
< |
if (OOC_Sort_Push(&pqueue, priority(rec), b) < 0) { |
438 |
< |
fprintf(stderr, "OOC_Sort: failed priority queue insertion\n"); |
439 |
< |
return -1; |
440 |
< |
} |
441 |
< |
} |
442 |
< |
} while (b >= 0); |
443 |
< |
|
444 |
< |
/* Priority queue now empty; flush output buffa & close file */ |
445 |
< |
oBlk.tail = oBlk.head; |
446 |
< |
OOC_Sort_Write(&oBlk, 0, NULL); |
447 |
< |
fclose(oBlk.file); |
448 |
< |
|
449 |
< |
for (b = 0; b < numBlk; b++) { |
450 |
< |
fclose(iBlk [b].file); |
451 |
< |
free(iBlk [b].buf); |
545 |
> |
|
546 |
> |
/* Allocate & init priority queue */ |
547 |
> |
if (!(pqueue.node = calloc(numBlk, sizeof(OOC_SortQueueNode)))) { |
548 |
> |
fputs("OOC_Sort: failure allocating priority queue\n", stderr); |
549 |
> |
return -1; |
550 |
|
} |
551 |
+ |
pqueue.tail = 0; |
552 |
+ |
pqueue.len = numBlk; |
553 |
|
|
554 |
< |
free(iBlk); |
554 |
> |
/* Allocate in-core sort buffa */ |
555 |
> |
if (!(sortBuf = malloc(blkSize))) { |
556 |
> |
fprintf(stderr, "OOC_Sort: failure allocating in-core sort buffer"); |
557 |
> |
return -1; |
558 |
> |
} |
559 |
> |
|
560 |
> |
/* Set up key data to pass to qsort()'s comparison func */ |
561 |
> |
keyData.key = key; |
562 |
> |
keyData.mortonScale = OOC_MORTON_MAX / bbSize; |
563 |
> |
VCOPY(keyData.bbOrg, bbOrg); |
564 |
> |
|
565 |
> |
stat = OOC_SortRecurse(in, 0, numRec - 1, out, numBlk, blkSize, numProc, |
566 |
> |
recSize, sortBuf, &pqueue, &keyData); |
567 |
> |
|
568 |
> |
/* Cleanup */ |
569 |
|
free(pqueue.node); |
570 |
+ |
free(sortBuf); |
571 |
|
|
572 |
< |
return 0; |
573 |
< |
} |
572 |
> |
return stat; |
573 |
> |
} |