--- ray/src/rt/rc3.c 2012/06/11 05:07:55 2.4 +++ ray/src/rt/rc3.c 2012/06/12 17:20:44 2.6 @@ -1,5 +1,5 @@ #ifndef lint -static const char RCSid[] = "$Id: rc3.c,v 2.4 2012/06/11 05:07:55 greg Exp $"; +static const char RCSid[] = "$Id: rc3.c,v 2.6 2012/06/12 17:20:44 greg Exp $"; #endif /* * Accumulate ray contributions for a set of materials @@ -13,8 +13,8 @@ static const char RCSid[] = "$Id: rc3.c,v 2.4 2012/06/ /* Modifier contribution queue (results waiting to be output) */ typedef struct s_binq { - int ndx; /* index for this entry */ - int nadded; /* accumulated so far */ + RNUMBER ndx; /* index for this entry */ + RNUMBER nadded; /* accumulated so far */ struct s_binq *next; /* next in queue */ MODCONT *mca[1]; /* contrib. array (extends struct) */ } BINQ; @@ -22,8 +22,12 @@ typedef struct s_binq { static BINQ *out_bq = NULL; /* output bin queue */ static BINQ *free_bq = NULL; /* free queue entries */ -static SUBPROC kida[MAXPROCESS]; /* child processes */ -static FILE *inq_fp[MAXPROCESS]; /* input streams */ +static struct { + RNUMBER r1; /* assigned ray starting index */ + SUBPROC pr; /* PID, i/o descriptors */ + FILE *infp; /* file pointer to read from process */ + int nr; /* number rays to sum (0 if free) */ +} kida[MAXPROCESS]; /* our child processes */ /* Get new bin queue entry */ @@ -37,7 +41,7 @@ new_binq() bp = free_bq; free_bq = bp->next; bp->next = NULL; - bp->nadded = 1; + bp->nadded = 0; return(bp); } /* else allocate fresh */ @@ -54,7 +58,7 @@ new_binq() /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */ } bp->ndx = 0; - bp->nadded = 1; + bp->nadded = 0; bp->next = NULL; return(bp); memerr: @@ -106,6 +110,7 @@ queue_modifiers() addcolor(mpout->cbin[j], mpin->cbin[j]); memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins); } + out_bq->nadded++; } @@ -230,11 +235,32 @@ put_zero_record(int ndx) for (i = nmods; i--; ) memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins); bp->ndx = ndx; + bp->nadded = 1; queue_output(bp); output_catchup(0); } +/* Get results from child process and add to queue */ +static void +queue_results(int k) +{ + BINQ *bq = new_binq(); /* get results holder */ + int j; + + bq->ndx = kida[k].r1; + bq->nadded = kida[k].nr; + /* read from child */ + for (j = 0; j < nmods; j++) + if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins, + kida[k].infp) != bq->mca[j]->nbins) + error(SYSTEM, "read error from render process"); + + queue_output(bq); /* put results in output queue */ + kida[k].nr = 0; /* mark child as available */ +} + + /* callback to set output spec to NULL (stdout) */ static int set_stdout(const LUENT *le, void *p) @@ -266,36 +292,41 @@ in_rchild() close(p0[1]); close(p1[0]); lu_doall(&modconttab, set_stdout, NULL); lu_done(&ofiletab); - while (nchild--) { - close(kida[nchild].w); - fclose(inq_fp[nchild]); + while (nchild--) { /* don't share other pipes */ + close(kida[nchild].pr.w); + fclose(kida[nchild].infp); } dup2(p0[0], 0); close(p0[0]); dup2(p1[1], 1); close(p1[1]); inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f'; outfmt = 'd'; header = 0; - waitflush = xres = 1; yres = 0; raysleft = 0; - account = accumulate = 1; + if (accumulate == 1) { + waitflush = xres = 1; + account = accumulate = 1; + } else { /* parent controls accumulation */ + waitflush = xres = 0; + account = accumulate = 0; + } return(1); /* child return value */ } if (pid < 0) error(SYSTEM, "fork() call failed!"); /* connect parent's pipes */ close(p0[0]); close(p1[1]); - kida[nchild].r = p1[0]; - kida[nchild].w = p0[1]; - kida[nchild].pid = pid; - kida[nchild].running = -1; - inq_fp[nchild] = fdopen(p1[0], "rb"); - if (inq_fp[nchild] == NULL) + kida[nchild].pr.r = p1[0]; + kida[nchild].pr.w = p0[1]; + kida[nchild].pr.pid = pid; + kida[nchild].pr.running = 1; + kida[nchild].infp = fdopen(p1[0], "rb"); + if (kida[nchild].infp == NULL) error(SYSTEM, "out of memory in in_rchild()"); #ifdef getc_unlocked - flockfile(inq_fp[nchild]); /* avoid mutex overhead */ + flockfile(kida[nchild].infp); /* avoid mutex overhead */ #endif - ++nchild; + kida[nchild++].nr = 0; /* mark as available */ } return(0); /* parent return value */ #endif @@ -308,88 +339,76 @@ end_children() { int status; - while (nchild-- > 0) { - kida[nchild].r = -1; /* close(-1) error is ignored */ - if ((status = close_process(&kida[nchild])) > 0) { + while (nchild > 0) { + nchild--; + fclose(kida[nchild].infp); + kida[nchild].pr.r = -1; /* close(-1) error is ignored */ + if ((status = close_process(&kida[nchild].pr)) > 0) { sprintf(errmsg, "rendering process returned bad status (%d)", status); error(WARNING, errmsg); } - fclose(inq_fp[nchild]); /* performs actual close() */ } } -/* Wait for the next available child, managing output queue as well */ +/* Wait for the next available child, managing output queue simultaneously */ static int -next_child_nq(int force_wait) +next_child_nq(int flushing) { static struct timeval polling; struct timeval *pmode; fd_set readset, errset; - int i, j, n, nr, nqr; + int i, n, nr, nqr; - if (!force_wait) /* see if there's one free */ + if (!flushing) /* see if there's one free */ for (i = nchild; i--; ) - if (kida[i].running < 0) + if (!kida[i].nr) return(i); - nqr = queue_ready(); /* wait mode or polling? */ - if (!nqr | force_wait | (accumulate <= 0)) - pmode = NULL; - else + nqr = queue_ready(); /* choose blocking mode or polling */ + if ((nqr > 0) & !flushing) pmode = &polling; -tryagain: - n = 0; /* catch up with output? */ - if ((pmode == &polling) & (nqr > nchild)) - n = nqr - nchild; - if ((pmode == NULL) & (nqr > 0) | (n > 0)) - nqr -= output_catchup(n); + else + pmode = NULL; +tryagain: /* catch up with output? */ + if (pmode == &polling) { + if (nqr > nchild) /* don't get too far behind */ + nqr -= output_catchup(nqr-nchild); + } else if (nqr > 0) /* clear output before blocking */ + nqr -= output_catchup(0); /* prepare select() call */ FD_ZERO(&readset); FD_ZERO(&errset); n = nr = 0; for (i = nchild; i--; ) { - if (kida[i].running > 0) { - FD_SET(kida[i].r, &readset); + if (kida[i].nr) { + FD_SET(kida[i].pr.r, &readset); ++nr; } - FD_SET(kida[i].r, &errset); - if (kida[i].r >= n) - n = kida[i].r + 1; + FD_SET(kida[i].pr.r, &errset); + if (kida[i].pr.r >= n) + n = kida[i].pr.r + 1; } if (!nr) /* nothing to wait for? */ return(-1); if ((nr > 1) | (pmode == &polling)) { errno = 0; - i = select(n, &readset, NULL, &errset, pmode); - if (!i) { + nr = select(n, &readset, NULL, &errset, pmode); + if (!nr) { pmode = NULL; /* try again, blocking this time */ goto tryagain; } - if (i < 0) + if (nr < 0) error(SYSTEM, "select() error in next_child_nq()"); } else FD_ZERO(&errset); n = -1; /* read results from child(ren) */ for (i = nchild; i--; ) { - BINQ *bq; - if (FD_ISSET(kida[i].r, &errset)) + if (FD_ISSET(kida[i].pr.r, &errset)) error(USER, "rendering process died"); - if (!FD_ISSET(kida[i].r, &readset)) - continue; - bq = new_binq(); /* get results holder */ - bq->ndx = kida[i].running; - /* read from child */ - for (j = 0; j < nmods; j++) { - nr = bq->mca[j]->nbins; - if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr, - inq_fp[i]) != nr) - error(SYSTEM, "read error from render process"); - } - queue_output(bq); /* add results to output queue */ - kida[i].running = -1; /* mark child as available */ - n = i; + if (FD_ISSET(kida[i].pr.r, &readset)) + queue_results(n = i); } return(n); /* first available child */ } @@ -399,38 +418,49 @@ tryagain: void parental_loop() { +#define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2)) static int ignore_warning_given = 0; - FVECT orgdir[2]; + int qlimit = (accumulate == 1) ? 1 : MAXIQ-1; + int ninq = 0; + FVECT orgdir[2*MAXIQ]; double d; - int i; + int i, n; /* load rays from stdin & process */ #ifdef getc_unlocked flockfile(stdin); /* avoid lock/unlock overhead */ #endif - while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) { + while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) { d = normalize(orgdir[1]); - /* asking for flush? */ - if ((d == 0.0) & (accumulate != 1)) { - if (!ignore_warning_given++) - error(WARNING, + if (d == 0.0) { /* asking for flush? */ + if (accumulate != 1) { + if (!ignore_warning_given++) + error(WARNING, "dummy ray(s) ignored during accumulation\n"); - continue; - } - if ((d == 0.0) | (lastray+1 < lastray)) { + continue; + } while (next_child_nq(1) >= 0) ; /* clear the queue */ lastdone = lastray = 0; - } - if (d == 0.0) { if ((yres <= 0) | (xres <= 0)) - waitflush = 1; /* flush right after */ + waitflush = 1; /* flush next */ put_zero_record(++lastray); - } else { /* else assign ray */ - i = next_child_nq(0); - if (writebuf(kida[i].w, (char *)orgdir, - sizeof(orgdir)) != sizeof(orgdir)) + } else if (++ninq >= qlimit || accumulate > 1 && + lastray/accumulate != (lastray+ninq)/accumulate) { + i = next_child_nq(0); /* manages output */ + n = ninq; + if (accumulate != 1) /* request flush? */ + memset(orgdir[2*n++], 0, sizeof(FVECT)*2); + n *= sizeof(FVECT)*2; /* send assignment */ + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) error(SYSTEM, "pipe write error"); - kida[i].running = ++lastray; + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; /* mark as busy */ + ninq = 0; + if (lastray < lastdone) { /* RNUMBER wrapped? */ + while (next_child_nq(1) >= 0) + ; + lastdone = lastray = 0; + } } if (raysleft && !--raysleft) break; /* preemptive EOI */ @@ -439,6 +469,7 @@ parental_loop() ; /* output accumulated record */ if (accumulate <= 0 || account < accumulate) { + end_children(); /* frees up file descriptors */ if (account < accumulate) { error(WARNING, "partial accumulation in final record"); accumulate -= account; @@ -453,4 +484,5 @@ parental_loop() error(USER, "unexpected EOF on input"); free_binq(NULL); /* clean up */ lu_done(&ofiletab); +#undef MAXIQ }