--- ray/src/rt/rc3.c 2012/06/09 16:47:27 2.2 +++ ray/src/rt/rc3.c 2012/11/15 19:41:03 2.18 @@ -1,20 +1,21 @@ #ifndef lint -static const char RCSid[] = "$Id: rc3.c,v 2.2 2012/06/09 16:47:27 greg Exp $"; +static const char RCSid[] = "$Id: rc3.c,v 2.18 2012/11/15 19:41:03 greg Exp $"; #endif /* * Accumulate ray contributions for a set of materials * Controlling process for multiple children */ +#include #include "rcontrib.h" -#include "platform.h" -#include "rtprocess.h" #include "selcall.h" +#define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2)) + /* Modifier contribution queue (results waiting to be output) */ typedef struct s_binq { - int ndx; /* index for this entry */ - int n2add; /* number left to add */ + RNUMBER ndx; /* index for this entry */ + RNUMBER nadded; /* accumulated so far */ struct s_binq *next; /* next in queue */ MODCONT *mca[1]; /* contrib. array (extends struct) */ } BINQ; @@ -22,25 +23,30 @@ typedef struct s_binq { static BINQ *out_bq = NULL; /* output bin queue */ static BINQ *free_bq = NULL; /* free queue entries */ -static SUBPROC kida[MAXPROCESS]; /* child processes */ -static FILE *inq_fp[MAXPROCESS]; /* input streams */ +static struct { + RNUMBER r1; /* assigned ray starting index */ + SUBPROC pr; /* PID, i/o descriptors */ + FILE *infp; /* file pointer to read from process */ + int nr; /* number of rays to sum (0 if free) */ +} kida[MAXPROCESS]; /* our child processes */ -/* Get new (empty) bin queue entry */ +/* Get new bin queue entry */ static BINQ * new_binq() { - BINQ *bp = free_bq; + BINQ *bp; int i; - if (bp != NULL) { /* something already available? */ + if (free_bq != NULL) { /* something already available? */ + bp = free_bq; free_bq = bp->next; bp->next = NULL; - bp->n2add = accumulate-1; + bp->nadded = 0; return(bp); } /* else allocate fresh */ - bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *)); + bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1)); if (bp == NULL) goto memerr; for (i = nmods; i--; ) { @@ -53,7 +59,7 @@ new_binq() /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */ } bp->ndx = 0; - bp->n2add = accumulate-1; + bp->nadded = 0; bp->next = NULL; return(bp); memerr: @@ -89,7 +95,7 @@ free_binq(BINQ *bp) /* Add modifier values to accumulation record in queue and clear */ -void +static void queue_modifiers() { MODCONT *mpin, *mpout; @@ -105,10 +111,11 @@ queue_modifiers() addcolor(mpout->cbin[j], mpin->cbin[j]); memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins); } + out_bq->nadded++; } -/* Sum one modifier record into another (doesn't update n2add) */ +/* Sum one modifier record into another (updates nadded) */ static void add_modbin(BINQ *dst, BINQ *src) { @@ -120,6 +127,7 @@ add_modbin(BINQ *dst, BINQ *src) for (j = mpout->nbins; j--; ) addcolor(mpout->cbin[j], mpin->cbin[j]); } + dst->nadded += src->nadded; } @@ -139,10 +147,11 @@ queue_output(BINQ *bp) } return; } - b_last = NULL; /* else insert in output queue */ + b_last = NULL; /* insert in output queue */ for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx; b_cur = b_cur->next) b_last = b_cur; + if (b_last != NULL) { bp->next = b_cur; b_last->next = bp; @@ -150,18 +159,17 @@ queue_output(BINQ *bp) bp->next = out_bq; out_bq = bp; } - if (accumulate <= 1) /* no accumulating? */ + if (accumulate == 1) /* no accumulation? */ return; b_cur = out_bq; /* else merge accumulation entries */ while (b_cur->next != NULL) { - if (b_cur->n2add <= 0 || + if (b_cur->nadded >= accumulate || (b_cur->ndx-1)/accumulate != (b_cur->next->ndx-1)/accumulate) { b_cur = b_cur->next; continue; } add_modbin(b_cur, b_cur->next); - b_cur->n2add--; b_last = b_cur->next; b_cur->next = b_last->next; b_last->next = NULL; @@ -170,18 +178,34 @@ queue_output(BINQ *bp) } -/* Get current with output FIFO by producing ready results */ +/* Count number of records ready for output */ static int -output_catchup() +queue_ready() { + int nready = 0; + BINQ *bp; + + for (bp = out_bq; bp != NULL && bp->nadded >= accumulate && + bp->ndx == lastdone+nready*accumulate+1; + bp = bp->next) + ++nready; + + return(nready); +} + + +/* Catch up with output queue by producing ready results */ +static int +output_catchup(int nmax) +{ int nout = 0; BINQ *bp; int i; - - if (accumulate <= 0) /* just accumulating? */ - return(0); - /* else output ready results */ - while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) { + /* output ready results */ + while (out_bq != NULL && out_bq->nadded >= accumulate + && out_bq->ndx == lastdone+1) { + if ((nmax > 0) & (nout >= nmax)) + break; bp = out_bq; /* pop off first entry */ out_bq = bp->next; bp->next = NULL; @@ -196,9 +220,9 @@ output_catchup() } -/* Put a zero record in results queue */ +/* Put a zero record in results queue & output */ void -zero_record(int ndx) +put_zero_record(int ndx) { BINQ *bp = new_binq(); int i; @@ -206,10 +230,32 @@ zero_record(int ndx) for (i = nmods; i--; ) memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins); bp->ndx = ndx; + bp->nadded = 1; queue_output(bp); + output_catchup(0); } +/* Get results from child process and add to queue */ +static void +queue_results(int k) +{ + BINQ *bq = new_binq(); /* get results holder */ + int j; + + bq->ndx = kida[k].r1; + bq->nadded = kida[k].nr; + /* read from child */ + for (j = 0; j < nmods; j++) + if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins, + kida[k].infp) != bq->mca[j]->nbins) + error(SYSTEM, "read error from render process"); + + queue_output(bq); /* put results in output queue */ + kida[k].nr = 0; /* mark child as available */ +} + + /* callback to set output spec to NULL (stdout) */ static int set_stdout(const LUENT *le, void *p) @@ -223,134 +269,130 @@ set_stdout(const LUENT *le, void *p) int in_rchild() { -#ifdef _WIN32 - error(WARNING, "multiprocessing unsupported -- running solo"); - nproc = 1; - return(1); -#else - /* try to fork ourselves */ - while (nchild < nproc) { - int p0[2], p1[2]; - int pid; - /* prepare i/o pipes */ - if (pipe(p0) < 0 || pipe(p1) < 0) - error(SYSTEM, "pipe() call failed!"); - pid = fork(); /* fork parent process */ - if (pid == 0) { /* if in child, set up & return true */ - close(p0[1]); close(p1[0]); - dup2(p0[0], 0); close(p0[0]); - dup2(p1[1], 1); close(p1[1]); + int rval; + + while (nchild < nproc) { /* fork until target reached */ + errno = 0; + rval = open_process(&kida[nchild].pr, NULL); + if (rval < 0) + error(SYSTEM, "open_process() call failed"); + if (rval == 0) { /* if in child, set up & return true */ + lu_doall(&modconttab, &set_stdout, NULL); + lu_done(&ofiletab); + while (nchild--) { /* don't share other pipes */ + close(kida[nchild].pr.w); + fclose(kida[nchild].infp); + } inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f'; outfmt = 'd'; header = 0; - waitflush = xres = 1; yres = 0; raysleft = 0; - account = accumulate = 1; - lu_doall(&modconttab, set_stdout, NULL); - nchild = -1; - return(1); /* child return value */ + if (accumulate == 1) { + waitflush = xres = 1; + account = accumulate = 1; + } else { /* parent controls accumulation */ + waitflush = xres = 0; + account = accumulate = 0; + } + return(1); /* return "true" in child */ } - if (pid < 0) - error(SYSTEM, "fork() call failed!"); - /* connect our pipes */ - close(p0[0]); close(p1[1]); - kida[nchild].r = p1[0]; - kida[nchild].w = p0[1]; - kida[nchild].pid = pid; - kida[nchild].running = -1; - inq_fp[nchild] = fdopen(p1[0], "rb"); - if (inq_fp[nchild] == NULL) + if (rval != PIPE_BUF) + error(CONSISTENCY, "bad value from open_process()"); + /* connect to child's output */ + kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb"); + if (kida[nchild].infp == NULL) error(SYSTEM, "out of memory in in_rchild()"); - ++nchild; - } - return(0); /* parent return value */ +#ifdef getc_unlocked + flockfile(kida[nchild].infp); /* avoid mutex overhead */ #endif + kida[nchild++].nr = 0; /* mark as available */ + } + return(0); /* return "false" in parent */ } /* Close child processes */ void -end_children() +end_children(int immed) { int status; - while (nchild-- > 0) { - kida[nchild].r = -1; /* close(-1) error is ignored */ - if ((status = close_process(&kida[nchild])) > 0) { + while (nchild > 0) { + nchild--; +#ifdef SIGKILL + if (immed) /* error mode -- quick exit */ + kill(kida[nchild].pr.pid, SIGKILL); +#endif + if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) { sprintf(errmsg, "rendering process returned bad status (%d)", status); error(WARNING, errmsg); } - fclose(inq_fp[nchild]); /* performs actual close() */ + fclose(kida[nchild].infp); } } -/* Wait for the next available child, managing output queue as well */ +/* Wait for the next available child, managing output queue simultaneously */ static int -next_child_nq(int force_wait) +next_child_nq(int flushing) { static struct timeval polling; - struct timeval *pmode = force_wait | (accumulate <= 0) ? - (struct timeval *)NULL : &polling; + struct timeval *pmode; fd_set readset, errset; - int i, j, n, nr; + int i, n, nr, nqr; - if (!force_wait) /* see if there's one free */ + if (!flushing) /* see if there's one free */ for (i = nchild; i--; ) - if (kida[i].running < 0) + if (!kida[i].nr) return(i); + + nqr = queue_ready(); /* choose blocking mode or polling */ + if ((nqr > 0) & !flushing) + pmode = &polling; + else + pmode = NULL; +tryagain: /* catch up with output? */ + if (pmode == &polling) { + if (nqr > nchild) /* don't get too far behind */ + nqr -= output_catchup(nqr-nchild); + } else if (nqr > 0) /* clear output before blocking */ + nqr -= output_catchup(0); /* prepare select() call */ FD_ZERO(&readset); FD_ZERO(&errset); n = nr = 0; for (i = nchild; i--; ) { - if (kida[i].running > 0) { - FD_SET(kida[i].r, &readset); + if (kida[i].nr) { + FD_SET(kida[i].pr.r, &readset); ++nr; } - FD_SET(kida[i].r, &errset); - if (kida[i].r >= n) - n = kida[i].r + 1; + FD_SET(kida[i].pr.r, &errset); + if (kida[i].pr.r >= n) + n = kida[i].pr.r + 1; } - if (!nr) /* nothing going on */ + if (!nr) /* nothing to wait for? */ return(-1); -tryagain: - if (pmode == NULL) /* about to block, so catch up */ - output_catchup(); if ((nr > 1) | (pmode == &polling)) { errno = 0; nr = select(n, &readset, NULL, &errset, pmode); - if (!nr & (pmode == &polling)) { + if (!nr) { pmode = NULL; /* try again, blocking this time */ goto tryagain; } - if (nr <= 0) - error(SYSTEM, "select call error in next_child_nq()"); + if (nr < 0) + error(SYSTEM, "select() error in next_child_nq()"); } else FD_ZERO(&errset); n = -1; /* read results from child(ren) */ for (i = nchild; i--; ) { - BINQ *bq; - if (FD_ISSET(kida[i].r, &errset)) + if (FD_ISSET(kida[i].pr.r, &errset)) error(USER, "rendering process died"); - if (!FD_ISSET(kida[i].r, &readset)) - continue; - bq = new_binq(); /* get results holder */ - bq->ndx = kida[i].running; - /* read from child */ - for (j = 0; j < nmods; j++) { - n = bq->mca[j]->nbins; - nr = fread(bq->mca[j]->cbin,sizeof(DCOLOR),n,inq_fp[i]); - if (nr != n) - error(SYSTEM, "read error from render process"); - } - queue_output(bq); /* put results in output queue */ - kida[i].running = -1; /* mark child as available */ - n = i; + if (FD_ISSET(kida[i].pr.r, &readset)) + queue_results(n = i); } - return(n); /* last available child */ + return(n); /* first available child */ } @@ -358,56 +400,161 @@ tryagain: void parental_loop() { + const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1; + int ninq = 0; + FVECT orgdir[2*MAXIQ]; + int i, n; + /* load rays from stdin & process */ +#ifdef getc_unlocked + flockfile(stdin); /* avoid lock/unlock overhead */ +#endif + while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) { + const int zero_ray = orgdir[2*ninq+1][0] == 0.0 && + (orgdir[2*ninq+1][1] == 0.0) & + (orgdir[2*ninq+1][2] == 0.0); + ninq += !zero_ray; + /* Zero ray cannot go in input queue */ + if (zero_ray ? ninq : ninq >= qlimit || + lastray/accumulate != (lastray+ninq)/accumulate) { + i = next_child_nq(0); /* manages output */ + n = ninq; + if (accumulate > 1) /* need terminator? */ + memset(orgdir[2*n++], 0, sizeof(FVECT)*2); + n *= sizeof(FVECT)*2; /* send assignment */ + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) + error(SYSTEM, "pipe write error"); + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; /* mark as busy */ + if (lastray < lastdone) { /* RNUMBER wrapped? */ + while (next_child_nq(1) >= 0) + ; + lastray -= ninq; + lastdone = lastray %= accumulate; + } + ninq = 0; + } + if (zero_ray) { /* put bogus record? */ + if ((yres <= 0) | (xres <= 1) && + (lastray+1) % accumulate == 0) { + while (next_child_nq(1) >= 0) + ; /* clear the queue */ + lastdone = lastray = accumulate-1; + waitflush = 1; /* flush next */ + } + put_zero_record(++lastray); + } + if (raysleft && !--raysleft) + break; /* preemptive EOI */ + } + while (next_child_nq(1) >= 0) /* empty results queue */ + ; + if (account < accumulate) { + error(WARNING, "partial accumulation in final record"); + free_binq(out_bq); /* XXX just ignore it */ + out_bq = NULL; + } + free_binq(NULL); /* clean up */ + lu_done(&ofiletab); + if (raysleft) + error(USER, "unexpected EOF on input"); +} + + +/* Wait for the next available child by monitoring "to" pipes */ +static int +next_child_ready() +{ + fd_set writeset, errset; + int i, n; + + for (i = nchild; i--; ) /* see if there's one free first */ + if (!kida[i].nr) + return(i); + /* prepare select() call */ + FD_ZERO(&writeset); FD_ZERO(&errset); + n = 0; + for (i = nchild; i--; ) { + FD_SET(kida[i].pr.w, &writeset); + FD_SET(kida[i].pr.r, &errset); + if (kida[i].pr.w >= n) + n = kida[i].pr.w + 1; + if (kida[i].pr.r >= n) + n = kida[i].pr.r + 1; + } + errno = 0; + n = select(n, NULL, &writeset, &errset, NULL); + if (n < 0) + error(SYSTEM, "select() error in next_child_ready()"); + n = -1; /* identify waiting child */ + for (i = nchild; i--; ) { + if (FD_ISSET(kida[i].pr.r, &errset)) + error(USER, "rendering process died"); + if (FD_ISSET(kida[i].pr.w, &writeset)) + kida[n = i].nr = 0; + } + return(n); /* first available child */ +} + + +/* Modified parental loop for full accumulation mode (-c 0) */ +void +feeder_loop() +{ static int ignore_warning_given = 0; - FVECT orgdir[2]; - double d; + int ninq = 0; + FVECT orgdir[2*MAXIQ]; + int i, n; /* load rays from stdin & process */ #ifdef getc_unlocked flockfile(stdin); /* avoid lock/unlock overhead */ #endif - while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) { - d = normalize(orgdir[1]); - /* asking for flush? */ - if ((d == 0.0) & (accumulate != 1)) { + while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) { + if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */ + (orgdir[2*ninq+1][1] == 0.0) & + (orgdir[2*ninq+1][2] == 0.0)) { if (!ignore_warning_given++) error(WARNING, "dummy ray(s) ignored during accumulation\n"); continue; - } - if ((d == 0.0) | (lastray+1 < lastray)) { - while (next_child_nq(1) >= 0) - ; - lastdone = lastray = 0; - } - if (d == 0.0) { - if ((yres <= 0) | (xres <= 0)) - waitflush = 1; /* flush right after */ - zero_record(++lastray); - } else { /* else assign */ - int avail = next_child_nq(0); - if (writebuf(kida[avail].w, (char *)orgdir, - sizeof(FVECT)*2) != sizeof(FVECT)*2) + } + if (++ninq >= MAXIQ) { + i = next_child_ready(); /* get eager child */ + n = sizeof(FVECT)*2 * ninq; /* give assignment */ + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) error(SYSTEM, "pipe write error"); - kida[avail].running = ++lastray; + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; + if (lastray < lastdone) /* RNUMBER wrapped? */ + lastdone = lastray = 0; + ninq = 0; } if (raysleft && !--raysleft) break; /* preemptive EOI */ } - while (next_child_nq(1) >= 0) /* empty results queue */ - ; - /* output accumulated record */ - if (accumulate <= 0 || account < accumulate) { - int i; - if (account < accumulate) { - error(WARNING, "partial accumulation in final record"); - accumulate -= account; - } - for (i = 0; i < nmods; i++) - mod_output(out_bq->mca[i]); - end_record(); + if (ninq) { /* polish off input */ + i = next_child_ready(); + n = sizeof(FVECT)*2 * ninq; + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) + error(SYSTEM, "pipe write error"); + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; + ninq = 0; } + memset(orgdir, 0, sizeof(FVECT)*2); /* get results */ + for (i = nchild; i--; ) { + writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2); + queue_results(i); + } + if (recover) /* and from before? */ + queue_modifiers(); + end_children(0); /* free up file descriptors */ + for (i = 0; i < nmods; i++) + mod_output(out_bq->mca[i]); /* output accumulated record */ + end_record(); + free_binq(out_bq); /* clean up */ + out_bq = NULL; + free_binq(NULL); + lu_done(&ofiletab); if (raysleft) error(USER, "unexpected EOF on input"); - free_binq(NULL); /* clean up */ - lu_done(&ofiletab); }