/* ========================================================================= N-way out-of-core merge sort for records with 3D keys. Recursively subdivides input into N blocks until these are of sufficiently small size to be sorted in-core according to Z-order (Morton code), then N-way merging them out-of-core using a priority queue as the stack unwinds. Roland Schregle (roland.schregle@{hslu.ch, gmail.com}) (c) Lucerne University of Applied Sciences and Arts, supported by the Swiss National Science Foundation (SNSF, #147053) ========================================================================== $Id: oocsort.c,v 2.3 2016/05/17 17:39:47 rschregle Exp $ */ #include "oocsort.h" #include "oocmorton.h" #include #include #include #include #include /* Priority queue node */ typedef struct { OOC_MortonIdx pri; /* Record's priority (sort key) */ unsigned blk; /* Block containing record */ } OOC_SortQueueNode; /* Priority queue */ typedef struct { OOC_SortQueueNode *node; unsigned len, tail; } OOC_SortQueue; /* Additional data for qsort() compare function. We resort to instancing * this as a global variable instead of passing it to the compare func via * qsort_r(), since the latter is a non-portable GNU extension. */ typedef struct { RREAL *(*key)(const void*); /* Callback to access 3D key in record */ FVECT bbOrg; /* Origin of bbox containing keys */ RREAL mortonScale; /* Scale for generating Morton code */ } OOC_KeyData; static OOC_KeyData keyData; static int OOC_KeyCompare (const void *rec1, const void *rec2) /* Comparison function for in-core quicksort */ { OOC_MortonIdx pri1, pri2; if (!rec1 || !rec2) return 0; pri1 = OOC_Key2Morton(keyData.key(rec1), keyData.bbOrg, keyData.mortonScale); pri2 = OOC_Key2Morton(keyData.key(rec2), keyData.bbOrg, keyData.mortonScale); if (pri1 < pri2) return -1; else if (pri1 > pri2) return 1; else return 0; } static int OOC_SortRead (FILE *file, unsigned recSize, char *rec) /* Read next record from file; return 0 and record in rec on success, * else -1 */ { if (!rec || feof(file) || !fread(rec, recSize, 1, file)) return -1; return 0; } static int OOC_SortPeek (FILE *file, unsigned recSize, char *rec) /* Return next record from file WITHOUT advancing file position; * return 0 and record in rec on success, else -1 */ { const unsigned long filePos = ftell(file); if (OOC_SortRead(file, recSize, rec)) return -1; /* Workaround; fseek(file, -recSize, SEEK_CUR) causes subsequent * fread()'s to fail until rewind() */ rewind(file); if (fseek(file, filePos, SEEK_SET) < 0) return -1; return 0; } static int OOC_SortWrite (FILE *file, unsigned recSize, const char *rec) /* Output record to file; return 0 on success, else -1 */ { if (!rec || !fwrite(rec, recSize, 1, file)) return -1; return 0; } #ifdef DEBUG_OOC_SORT static int OOC_SortQueueCheck (OOC_SortQueue *pq, unsigned root) /* Priority queue sanity check */ { unsigned kid; if (root < pq -> tail) { if ((kid = (root << 1) + 1) < pq -> tail) { if (pq -> node [kid].pri < pq -> node [root].pri) return -1; else return OOC_SortQueueCheck(pq, kid); } if ((kid = kid + 1) < pq -> tail) { if (pq -> node [kid].pri < pq -> node [root].pri) return -1; else return OOC_SortQueueCheck(pq, kid); } } return 0; } #endif static int OOC_SortPush (OOC_SortQueue *pq, OOC_MortonIdx pri, unsigned blk) /* Insert specified block index into priority queue; return block index * or -1 if queue is full */ { OOC_SortQueueNode *pqn = pq -> node; unsigned kid, root; if (pq -> tail >= pq -> len) /* Queue full */ return -1; /* Append node at tail and re-sort */ kid = pq -> tail++; while (kid) { root = (kid - 1) >> 1; /* Compare with parent node and swap if necessary, else terminate */ if (pri < pqn [root].pri) { pqn [kid].pri = pqn [root].pri; pqn [kid].blk = pqn [root].blk; kid = root; } else break; } pqn [kid].pri = pri; pqn [kid].blk = blk; #ifdef DEBUG_OOC_SORT if (OOC_SortQueueCheck(pq, 0) < 0) { fprintf(stderr, "OOC_Sort: priority queue inconsistency\n"); return -1; } #endif return blk; } static int OOC_SortPop (OOC_SortQueue *pq) /* Remove head of priority queue and return its block index or -1 if queue empty */ { OOC_SortQueueNode *pqn = pq -> node; OOC_MortonIdx pri; unsigned kid, kid2, root = 0, blk, res; if (!pq -> tail) /* Queue empty */ return -1; res = pqn -> blk; pri = pqn [--pq -> tail].pri; blk = pqn [pq -> tail].blk; /* Replace head with tail node and re-sort */ while ((kid = (root << 1) + 1) < pq -> tail) { /* Compare with smaller kid and swap if necessary, else terminate */ if ((kid2 = (kid + 1)) < pq -> tail && pqn [kid2].pri < pqn [kid].pri) kid = kid2; if (pri > pqn [kid].pri) { pqn [root].pri = pqn [kid].pri; pqn [root].blk = pqn [kid].blk; } else break; root = kid; } pqn [root].pri = pri; pqn [root].blk = blk; #ifdef DEBUG_OOC_SORT if (OOC_SortQueueCheck(pq, 0) < 0) { fprintf(stderr, "OOC_Sort: priority queue inconsistency\n"); return -1; } #endif return res; } /* Active subprocess counter updated by parent process; must persist when * recursing into OOC_SortRecurse(), hence global */ static unsigned procCnt = 0; static int OOC_SortRecurse (FILE *in, unsigned long blkLo, unsigned long blkHi, FILE *out, unsigned numBlk, unsigned long maxBlkSize, unsigned numProc, unsigned recSize, char *sortBuf, OOC_SortQueue *pqueue, const OOC_KeyData *keyData) /* Recursive part of OOC_Sort(). Reads block of records from input file * within the interval [blkLo, blkHi] and divides it into numBlk blocks * until the size (in bytes) does not exceed maxBlkSize, then quicksorts * each block into a temporary file. These files are then mergesorted via a * priority queue to the output file as the stack unwinds. NOTE: Critical * sections are prepended with '!!!' * * Parameters are as follows: * in Input file containing unsorted records (assumed to be open) * blkLo Start of current block in input file, in number of records * blkHi End of current block in input file, in number of records * out Output file containing sorted records (assumed to be open) * numBlk Number of blocks to divide into / merge from * maxBlkSize Max block size and size of in-core sort buffer, in bytes * numProc Number of parallel processes for in-core sort * recSize Size of input records in bytes * sortBuf Preallocated in-core sort buffer of size maxBlkSize * pqueue Preallocated priority queue of length numBlk for block merge * keyData Aggregate data for Morton code generation and comparison */ { const unsigned long blkLen = blkHi - blkLo + 1, blkSize = blkLen * recSize; int stat; if (!blkLen || blkHi < blkLo) return 0; if (blkSize > maxBlkSize) { unsigned long blkLo2 = blkLo, blkHi2 = blkLo, blkSize2; const double blkLen2 = (double)blkLen / numBlk; FILE *blkFile [numBlk]; /* Violates C89! */ char rec [recSize]; /* Ditto */ OOC_MortonIdx pri; int b, pid; #ifdef DEBUG_OOC_SORT unsigned long pqCnt = 0; #endif /* ====================================================== * Block too large for in-core sort -> divide into numBlk * subblocks and recurse * ====================================================== */ #ifdef DEBUG_OOC_SORT fprintf(stderr, "OOC_Sort: splitting block [%lu - %lu]\n", blkLo, blkHi); #endif for (b = 0; b < numBlk; b++) { /* Open temporary file as output for subblock */ if (!(blkFile [b] = tmpfile())) { perror("OOC_Sort: failed opening temporary block file"); return -1; } /* Subblock interval [blkLo2, blkHi2] of size blkSize2 */ blkHi2 = blkLo + (b + 1) * blkLen2 - 1; blkSize2 = (blkHi2 - blkLo2 + 1) * recSize; if (blkSize2 <= maxBlkSize) { /* !!! Will be in-core sorted on recursion -> fork kid process, * !!! but don't fork more than numProc kids; wait if necessary */ while (procCnt >= numProc && wait(&stat) >= 0) { if (!WIFEXITED(stat) || WEXITSTATUS(stat)) return -1; procCnt--; } /* Now fork kid process */ if (!(pid = fork())) { /* Recurse on subblocks with new input filehandle; */ if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk, maxBlkSize, numProc, recSize, sortBuf, pqueue, keyData)) exit(-1); /* !!! Apparently the parent's tmpfile isn't deleted when the * !!! child exits (which is what we want), though some * !!! sources suggest using _Exit() instead; is this * !!! implementation specific? */ exit(0); } else if (pid < 0) { fprintf(stderr, "OOC_Sort: failed to fork subprocess\n"); return -1; } #ifdef DEBUG_OOC_FORK fprintf(stderr, "OOC_Sort: forking kid %d for block %d\n", procCnt, b); #endif /* Parent continues here */ procCnt++; } else { /* Recurse on subblock; without forking */ if (OOC_SortRecurse(in, blkLo2, blkHi2, blkFile [b], numBlk, maxBlkSize, numProc, recSize, sortBuf, pqueue, keyData)) return -1; } /* Prepare for next block */ blkLo2 = blkHi2 + 1; } /* !!! Wait for any forked processes to terminate */ while (procCnt && wait(&stat) >= 0) { if (!WIFEXITED(stat) || WEXITSTATUS(stat)) return -1; procCnt--; } /* =============================================================== * Subblocks are now sorted; prepare priority queue by peeking and * enqueueing first record from corresponding temporary file * =============================================================== */ #ifdef DEBUG_OOC_SORT fprintf(stderr, "OOC_Sort: merging block [%lu - %lu]\n", blkLo, blkHi); #endif for (b = 0; b < numBlk; b++) { #ifdef DEBUG_OOC_SORT fseek(blkFile [b], 0, SEEK_END); if (ftell(blkFile [b]) % recSize) { fprintf(stderr, "OOC_Sort: truncated record in tmp block " "file %d\n", b); return -1; } fprintf(stderr, "OOC_Sort: tmp block file %d contains %ld rec\n", b, ftell(blkFile [b]) / recSize); #endif rewind(blkFile [b]); if (OOC_SortPeek(blkFile [b], recSize, rec)) { fprintf(stderr, "OOC_Sort: error reading from block file\n"); return -1; } /* Enqueue record along with its Morton code as priority */ pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg, keyData -> mortonScale); if (OOC_SortPush(pqueue, pri, b) < 0) { fprintf(stderr, "OOC_Sort: failed priority queue insertion\n"); return -1; } } /* ========================================================== * Subblocks now sorted and priority queue filled; merge from * temporary files * ========================================================== */ do { /* Get next node in priority queue, read next record in corresponding * block, and send to output */ b = OOC_SortPop(pqueue); if (b >= 0) { /* Priority queue non-empty */ if (OOC_SortRead(blkFile [b], recSize, rec)) { /* Corresponding record should still be in the input block */ fprintf(stderr, "OOC_Sort: unexpected end reading block file\n"); return -1; } if (OOC_SortWrite(out, recSize, rec)) { fprintf(stderr, "OOC_Sort; error writing output file\n"); return -1; } #ifdef DEBUG_OOC_SORT pqCnt++; #endif /* Peek next record from same block and insert in priority queue */ if (!OOC_SortPeek(blkFile [b], recSize, rec)) { /* Block not exhausted */ pri = OOC_Key2Morton(keyData -> key(rec), keyData -> bbOrg, keyData -> mortonScale); if (OOC_SortPush(pqueue, pri, b) < 0) { fprintf(stderr, "OOC_Sort: failed priority queue insert\n"); return -1; } } } } while (b >= 0); #ifdef DEBUG_OOC_SORT fprintf(stderr, "OOC_Sort: dequeued %lu rec\n", pqCnt); fprintf(stderr, "OOC_Sort: merged file contains %lu rec\n", ftell(out) / recSize); #endif /* Priority queue now empty -> done; close temporary subblock files, * causing them to be automatically deleted. */ for (b = 0; b < numBlk; b++) { fclose(blkFile [b]); } /* !!! Commit output file to disk before caller reads it; omitting * !!! this apparently leads to corrupted files (race condition?) when * !!! the caller tries to read them! */ fflush(out); fsync(fileno(out)); } else { /* ====================================== * Block is small enough for in-core sort * ====================================== */ int ifd = fileno(in), ofd = fileno(out); #ifdef DEBUG_OOC_SORT fprintf(stderr, "OOC_Sort: Proc %d (%d/%d) sorting block [%lu - %lu]\n", getpid(), procCnt, numProc - 1, blkLo, blkHi); #endif /* Atomically seek and read block into in-core sort buffer */ /* !!! Unbuffered I/O via pread() avoids potential race conditions * !!! and buffer corruption which can occur with lseek()/fseek() * !!! followed by read()/fread(). */ if (pread(ifd, sortBuf, blkSize, blkLo * recSize) != blkSize) { perror("OOC_Sort: error reading from input file"); return -1; } /* Quicksort block in-core and write to output file */ qsort(sortBuf, blkLen, recSize, OOC_KeyCompare); if (write(ofd, sortBuf, blkSize) != blkSize) { perror("OOC_Sort: error writing to block file"); return -1; } fsync(ofd); #ifdef DEBUG_OOC_SORT fprintf(stderr, "OOC_Sort: proc %d wrote %ld records\n", getpid(), lseek(ofd, 0, SEEK_END) / recSize); #endif } return 0; } int OOC_Sort (FILE *in, FILE *out, unsigned numBlk, unsigned long blkSize, unsigned numProc, unsigned recSize, FVECT bbOrg, RREAL bbSize, RREAL *(*key)(const void*)) /* Sort records in inFile and append to outFile by subdividing inFile into * small blocks, sorting these in-core, and merging them out-of-core via a * priority queue. * * This is implemented as a recursive (numBlk)-way sort; the input is * successively split into numBlk smaller blocks until these are of size <= * blkSize, i.e. small enough for in-core sorting, then merging the sorted * blocks as the stack unwinds. The in-core sort is parallelised over * numProx processes. * * Parameters are as follows: * inFile Opened input file containing unsorted records * outFile Opened output file containing sorted records * numBlk Number of blocks to divide into / merge from * blkSize Max block size and size of in-core sort buffer, in bytes * numProc Number of parallel processes for in-core sort * recSize Size of input records in bytes * bbOrg Origin of bounding box containing record keys for Morton code * bbSize Extent of bounding box containing record keys for Morton code * key Callback to access 3D coords from records for Morton code */ { unsigned long numRec; OOC_SortQueue pqueue; char *sortBuf = NULL; int stat; if (numBlk < 1) numBlk = 1; /* Open input file & get size in number of records */ if (fseek(in, 0, SEEK_END) < 0) { fputs("OOC_Sort: failed seek in input file\n", stderr); return -1; } if (!(numRec = ftell(in) / recSize)) { fputs("OOC_Sort: empty input file\n", stderr); return -1; } /* Allocate & init priority queue */ if (!(pqueue.node = calloc(numBlk, sizeof(OOC_SortQueueNode)))) { fputs("OOC_Sort: failure allocating priority queue\n", stderr); return -1; } pqueue.tail = 0; pqueue.len = numBlk; /* Allocate in-core sort buffa */ if (!(sortBuf = malloc(blkSize))) { fprintf(stderr, "OOC_Sort: failure allocating in-core sort buffer"); return -1; } /* Set up key data to pass to qsort()'s comparison func */ keyData.key = key; keyData.mortonScale = OOC_MORTON_MAX / bbSize; VCOPY(keyData.bbOrg, bbOrg); stat = OOC_SortRecurse(in, 0, numRec - 1, out, numBlk, blkSize, numProc, recSize, sortBuf, &pqueue, &keyData); /* Cleanup */ free(pqueue.node); free(sortBuf); return stat; }