--- ray/src/rt/rc3.c 2012/06/09 07:16:47 2.1 +++ ray/src/rt/rc3.c 2012/06/11 05:07:55 2.4 @@ -1,5 +1,5 @@ #ifndef lint -static const char RCSid[] = "$Id: rc3.c,v 2.1 2012/06/09 07:16:47 greg Exp $"; +static const char RCSid[] = "$Id: rc3.c,v 2.4 2012/06/11 05:07:55 greg Exp $"; #endif /* * Accumulate ray contributions for a set of materials @@ -14,7 +14,7 @@ static const char RCSid[] = "$Id: rc3.c,v 2.1 2012/06/ /* Modifier contribution queue (results waiting to be output) */ typedef struct s_binq { int ndx; /* index for this entry */ - int n2add; /* number left to add */ + int nadded; /* accumulated so far */ struct s_binq *next; /* next in queue */ MODCONT *mca[1]; /* contrib. array (extends struct) */ } BINQ; @@ -23,23 +23,25 @@ static BINQ *out_bq = NULL; /* output bin queue */ static BINQ *free_bq = NULL; /* free queue entries */ static SUBPROC kida[MAXPROCESS]; /* child processes */ +static FILE *inq_fp[MAXPROCESS]; /* input streams */ -/* Get new (empty) bin queue entry */ +/* Get new bin queue entry */ static BINQ * new_binq() { - BINQ *bp = free_bq; + BINQ *bp; int i; - if (bp != NULL) { /* something already available? */ + if (free_bq != NULL) { /* something already available? */ + bp = free_bq; free_bq = bp->next; bp->next = NULL; - bp->n2add = accumulate-1; + bp->nadded = 1; return(bp); } /* else allocate fresh */ - bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *)); + bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1)); if (bp == NULL) goto memerr; for (i = nmods; i--; ) { @@ -52,7 +54,7 @@ new_binq() /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */ } bp->ndx = 0; - bp->n2add = accumulate-1; + bp->nadded = 1; bp->next = NULL; return(bp); memerr: @@ -107,7 +109,7 @@ queue_modifiers() } -/* Sum one modifier record into another (doesn't update n2add) */ +/* Sum one modifier record into another (updates nadded) */ static void add_modbin(BINQ *dst, BINQ *src) { @@ -119,16 +121,15 @@ add_modbin(BINQ *dst, BINQ *src) for (j = mpout->nbins; j--; ) addcolor(mpout->cbin[j], mpin->cbin[j]); } + dst->nadded += src->nadded; } -/* Queue output, catching up with and freeing FIFO entries when possible */ -static int +/* Queue values for later output */ +static void queue_output(BINQ *bp) { - int nout = 0; BINQ *b_last, *b_cur; - int i; if (accumulate <= 0) { /* just accumulating? */ if (out_bq == NULL) { @@ -138,12 +139,13 @@ queue_output(BINQ *bp) add_modbin(out_bq, bp); free_binq(bp); } - return(0); + return; } b_last = NULL; /* else insert in output queue */ for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx; b_cur = b_cur->next) b_last = b_cur; + if (b_last != NULL) { bp->next = b_cur; b_last->next = bp; @@ -151,32 +153,66 @@ queue_output(BINQ *bp) bp->next = out_bq; out_bq = bp; } - if (accumulate > 1) { /* merge accumulation entries */ - b_cur = out_bq; - while (b_cur->next != NULL) { - if (b_cur->n2add <= 0 || - (b_cur->ndx-1)/accumulate != - (b_cur->next->ndx-1)/accumulate) { - b_cur = b_cur->next; - continue; - } - add_modbin(b_cur, b_cur->next); - b_cur->n2add--; - b_last = b_cur->next; - b_cur->next = b_last->next; - b_last->next = NULL; - free_binq(b_last); + if (accumulate == 1) /* no accumulation? */ + return; + b_cur = out_bq; /* else merge accumulation entries */ + while (b_cur->next != NULL) { + if (b_cur->nadded >= accumulate || + (b_cur->ndx-1)/accumulate != + (b_cur->next->ndx-1)/accumulate) { + b_cur = b_cur->next; + continue; } + add_modbin(b_cur, b_cur->next); + b_last = b_cur->next; + b_cur->next = b_last->next; + b_last->next = NULL; + free_binq(b_last); } - /* output ready results */ - while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) { - b_cur = out_bq; /* pop off first entry */ - out_bq = b_cur->next; - b_cur->next = NULL; +} + + +/* Count number of records ready for output */ +static int +queue_ready() +{ + int nready = 0; + BINQ *bp; + + if (accumulate <= 0) /* just accumulating? */ + return(0); + + for (bp = out_bq; bp != NULL && bp->nadded >= accumulate && + bp->ndx == lastdone+nready*accumulate+1; + bp = bp->next) + ++nready; + + return(nready); +} + + +/* Catch up with output queue by producing ready results */ +static int +output_catchup(int nmax) +{ + int nout = 0; + BINQ *bp; + int i; + + if (accumulate <= 0) /* just accumulating? */ + return(0); + /* else output ready results */ + while (out_bq != NULL && out_bq->nadded >= accumulate + && out_bq->ndx == lastdone+1) { + if ((nmax > 0) & (nout >= nmax)) + break; + bp = out_bq; /* pop off first entry */ + out_bq = bp->next; + bp->next = NULL; for (i = 0; i < nmods; i++) /* output record */ - mod_output(b_cur->mca[i]); + mod_output(bp->mca[i]); end_record(); - free_binq(b_cur); /* free this entry */ + free_binq(bp); /* free this entry */ lastdone += accumulate; ++nout; } @@ -184,9 +220,9 @@ queue_output(BINQ *bp) } -/* Put a zero record in results queue */ +/* Put a zero record in results queue & output */ void -zero_record(int ndx) +put_zero_record(int ndx) { BINQ *bp = new_binq(); int i; @@ -195,6 +231,7 @@ zero_record(int ndx) memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins); bp->ndx = ndx; queue_output(bp); + output_catchup(0); } @@ -221,11 +258,18 @@ in_rchild() int p0[2], p1[2]; int pid; /* prepare i/o pipes */ + errno = 0; if (pipe(p0) < 0 || pipe(p1) < 0) error(SYSTEM, "pipe() call failed!"); pid = fork(); /* fork parent process */ if (pid == 0) { /* if in child, set up & return true */ close(p0[1]); close(p1[0]); + lu_doall(&modconttab, set_stdout, NULL); + lu_done(&ofiletab); + while (nchild--) { + close(kida[nchild].w); + fclose(inq_fp[nchild]); + } dup2(p0[0], 0); close(p0[0]); dup2(p1[1], 1); close(p1[1]); inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f'; @@ -235,18 +279,22 @@ in_rchild() yres = 0; raysleft = 0; account = accumulate = 1; - lu_doall(&modconttab, set_stdout, NULL); - nchild = -1; return(1); /* child return value */ } if (pid < 0) error(SYSTEM, "fork() call failed!"); - /* connect our pipes */ + /* connect parent's pipes */ close(p0[0]); close(p1[1]); kida[nchild].r = p1[0]; kida[nchild].w = p0[1]; kida[nchild].pid = pid; kida[nchild].running = -1; + inq_fp[nchild] = fdopen(p1[0], "rb"); + if (inq_fp[nchild] == NULL) + error(SYSTEM, "out of memory in in_rchild()"); +#ifdef getc_unlocked + flockfile(inq_fp[nchild]); /* avoid mutex overhead */ +#endif ++nchild; } return(0); /* parent return value */ @@ -260,27 +308,44 @@ end_children() { int status; - while (nchild-- > 0) + while (nchild-- > 0) { + kida[nchild].r = -1; /* close(-1) error is ignored */ if ((status = close_process(&kida[nchild])) > 0) { sprintf(errmsg, "rendering process returned bad status (%d)", status); error(WARNING, errmsg); } + fclose(inq_fp[nchild]); /* performs actual close() */ + } } -/* Wait for the next available child */ +/* Wait for the next available child, managing output queue as well */ static int next_child_nq(int force_wait) { - fd_set readset, errset; - int i, j, n, nr; + static struct timeval polling; + struct timeval *pmode; + fd_set readset, errset; + int i, j, n, nr, nqr; if (!force_wait) /* see if there's one free */ for (i = nchild; i--; ) if (kida[i].running < 0) return(i); + + nqr = queue_ready(); /* wait mode or polling? */ + if (!nqr | force_wait | (accumulate <= 0)) + pmode = NULL; + else + pmode = &polling; +tryagain: + n = 0; /* catch up with output? */ + if ((pmode == &polling) & (nqr > nchild)) + n = nqr - nchild; + if ((pmode == NULL) & (nqr > 0) | (n > 0)) + nqr -= output_catchup(n); /* prepare select() call */ FD_ZERO(&readset); FD_ZERO(&errset); n = nr = 0; @@ -293,12 +358,17 @@ next_child_nq(int force_wait) if (kida[i].r >= n) n = kida[i].r + 1; } - if (!nr) /* nothing going on */ + if (!nr) /* nothing to wait for? */ return(-1); - if (nr > 1) { /* call select if multiple busy */ + if ((nr > 1) | (pmode == &polling)) { errno = 0; - if (select(n, &readset, NULL, &errset, NULL) < 0) - error(SYSTEM, "select call error in next_child_nq()"); + i = select(n, &readset, NULL, &errset, pmode); + if (!i) { + pmode = NULL; /* try again, blocking this time */ + goto tryagain; + } + if (i < 0) + error(SYSTEM, "select() error in next_child_nq()"); } else FD_ZERO(&errset); n = -1; /* read results from child(ren) */ @@ -312,16 +382,16 @@ next_child_nq(int force_wait) bq->ndx = kida[i].running; /* read from child */ for (j = 0; j < nmods; j++) { - n = sizeof(DCOLOR)*bq->mca[j]->nbins; - nr = readbuf(kida[i].r, (char *)bq->mca[j]->cbin, n); - if (nr != n) + nr = bq->mca[j]->nbins; + if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr, + inq_fp[i]) != nr) error(SYSTEM, "read error from render process"); } - queue_output(bq); /* put results in output queue */ + queue_output(bq); /* add results to output queue */ kida[i].running = -1; /* mark child as available */ n = i; } - return(n); /* last available child */ + return(n); /* first available child */ } @@ -332,6 +402,7 @@ parental_loop() static int ignore_warning_given = 0; FVECT orgdir[2]; double d; + int i; /* load rays from stdin & process */ #ifdef getc_unlocked flockfile(stdin); /* avoid lock/unlock overhead */ @@ -347,19 +418,19 @@ parental_loop() } if ((d == 0.0) | (lastray+1 < lastray)) { while (next_child_nq(1) >= 0) - ; + ; /* clear the queue */ lastdone = lastray = 0; } if (d == 0.0) { if ((yres <= 0) | (xres <= 0)) waitflush = 1; /* flush right after */ - zero_record(++lastray); - } else { /* else assign */ - int avail = next_child_nq(0); - if (writebuf(kida[avail].w, (char *)orgdir, - sizeof(FVECT)*2) != sizeof(FVECT)*2) + put_zero_record(++lastray); + } else { /* else assign ray */ + i = next_child_nq(0); + if (writebuf(kida[i].w, (char *)orgdir, + sizeof(orgdir)) != sizeof(orgdir)) error(SYSTEM, "pipe write error"); - kida[avail].running = ++lastray; + kida[i].running = ++lastray; } if (raysleft && !--raysleft) break; /* preemptive EOI */ @@ -368,7 +439,6 @@ parental_loop() ; /* output accumulated record */ if (accumulate <= 0 || account < accumulate) { - int i; if (account < accumulate) { error(WARNING, "partial accumulation in final record"); accumulate -= account; @@ -376,6 +446,8 @@ parental_loop() for (i = 0; i < nmods; i++) mod_output(out_bq->mca[i]); end_record(); + free_binq(out_bq); + out_bq = NULL; } if (raysleft) error(USER, "unexpected EOF on input");