--- ray/src/rt/rc3.c 2012/06/12 22:46:45 2.8 +++ ray/src/rt/rc3.c 2012/11/15 15:26:52 2.17 @@ -1,16 +1,17 @@ #ifndef lint -static const char RCSid[] = "$Id: rc3.c,v 2.8 2012/06/12 22:46:45 greg Exp $"; +static const char RCSid[] = "$Id: rc3.c,v 2.17 2012/11/15 15:26:52 greg Exp $"; #endif /* * Accumulate ray contributions for a set of materials * Controlling process for multiple children */ +#include #include "rcontrib.h" -#include "platform.h" -#include "rtprocess.h" #include "selcall.h" +#define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2)) + /* Modifier contribution queue (results waiting to be output) */ typedef struct s_binq { RNUMBER ndx; /* index for this entry */ @@ -26,7 +27,7 @@ static struct { RNUMBER r1; /* assigned ray starting index */ SUBPROC pr; /* PID, i/o descriptors */ FILE *infp; /* file pointer to read from process */ - int nr; /* number rays to sum (0 if free) */ + int nr; /* number of rays to sum (0 if free) */ } kida[MAXPROCESS]; /* our child processes */ @@ -94,7 +95,7 @@ free_binq(BINQ *bp) /* Add modifier values to accumulation record in queue and clear */ -void +static void queue_modifiers() { MODCONT *mpin, *mpout; @@ -146,7 +147,7 @@ queue_output(BINQ *bp) } return; } - b_last = NULL; /* else insert in output queue */ + b_last = NULL; /* insert in output queue */ for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx; b_cur = b_cur->next) b_last = b_cur; @@ -184,9 +185,6 @@ queue_ready() int nready = 0; BINQ *bp; - if (accumulate <= 0) /* just accumulating? */ - return(0); - for (bp = out_bq; bp != NULL && bp->nadded >= accumulate && bp->ndx == lastdone+nready*accumulate+1; bp = bp->next) @@ -203,10 +201,7 @@ output_catchup(int nmax) int nout = 0; BINQ *bp; int i; - - if (accumulate <= 0) /* just accumulating? */ - return(0); - /* else output ready results */ + /* output ready results */ while (out_bq != NULL && out_bq->nadded >= accumulate && out_bq->ndx == lastdone+1) { if ((nmax > 0) & (nout >= nmax)) @@ -274,30 +269,20 @@ set_stdout(const LUENT *le, void *p) int in_rchild() { -#ifdef _WIN32 - error(WARNING, "multiprocessing unsupported -- running solo"); - nproc = 1; - return(1); -#else - /* try to fork ourselves */ - while (nchild < nproc) { - int p0[2], p1[2]; - int pid; - /* prepare i/o pipes */ + int rval; + + while (nchild < nproc) { /* fork until target reached */ errno = 0; - if (pipe(p0) < 0 || pipe(p1) < 0) - error(SYSTEM, "pipe() call failed!"); - pid = fork(); /* fork parent process */ - if (pid == 0) { /* if in child, set up & return true */ - close(p0[1]); close(p1[0]); + rval = open_process(&kida[nchild].pr, NULL); + if (rval < 0) + error(SYSTEM, "open_process() call failed"); + if (rval == 0) { /* if in child, set up & return true */ lu_doall(&modconttab, set_stdout, NULL); lu_done(&ofiletab); while (nchild--) { /* don't share other pipes */ close(kida[nchild].pr.w); fclose(kida[nchild].infp); } - dup2(p0[0], 0); close(p0[0]); - dup2(p1[1], 1); close(p1[1]); inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f'; outfmt = 'd'; header = 0; @@ -310,17 +295,12 @@ in_rchild() waitflush = xres = 0; account = accumulate = 0; } - return(1); /* child return value */ + return(1); /* return "true" in child */ } - if (pid < 0) - error(SYSTEM, "fork() call failed!"); - /* connect parent's pipes */ - close(p0[0]); close(p1[1]); - kida[nchild].pr.r = p1[0]; - kida[nchild].pr.w = p0[1]; - kida[nchild].pr.pid = pid; - kida[nchild].pr.running = 1; - kida[nchild].infp = fdopen(p1[0], "rb"); + if (rval != PIPE_BUF) + error(CONSISTENCY, "bad value from open_process()"); + /* connect to child's output */ + kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb"); if (kida[nchild].infp == NULL) error(SYSTEM, "out of memory in in_rchild()"); #ifdef getc_unlocked @@ -328,27 +308,29 @@ in_rchild() #endif kida[nchild++].nr = 0; /* mark as available */ } - return(0); /* parent return value */ -#endif + return(0); /* return "false" in parent */ } /* Close child processes */ void -end_children() +end_children(int immed) { int status; while (nchild > 0) { nchild--; - fclose(kida[nchild].infp); - kida[nchild].pr.r = -1; /* close(-1) error is ignored */ - if ((status = close_process(&kida[nchild].pr)) > 0) { +#ifdef SIGKILL + if (immed) /* error mode -- quick exit */ + kill(kida[nchild].pr.pid, SIGKILL); +#endif + if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) { sprintf(errmsg, "rendering process returned bad status (%d)", status); error(WARNING, errmsg); } + fclose(kida[nchild].infp); } } @@ -418,9 +400,7 @@ tryagain: /* catch up with output? */ void parental_loop() { -#define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2)) - static int ignore_warning_given = 0; - int qlimit = (accumulate == 1) ? 1 : MAXIQ-1; + const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1; int ninq = 0; FVECT orgdir[2*MAXIQ]; int i, n; @@ -432,19 +412,26 @@ parental_loop() if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */ (orgdir[2*ninq+1][1] == 0.0) & (orgdir[2*ninq+1][2] == 0.0)) { - if (accumulate != 1) { - if (!ignore_warning_given++) - error(WARNING, - "dummy ray(s) ignored during accumulation\n"); - continue; + if (ninq) { /* preempt our queue */ + i = next_child_nq(0); + n = ninq; + memset(orgdir[2*n++], 0, sizeof(FVECT)*2); + n *= sizeof(FVECT)*2; + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) + error(SYSTEM, "pipe write error"); + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; + ninq = 0; } - while (next_child_nq(1) >= 0) - ; /* clear the queue */ - lastdone = lastray = 0; - if ((yres <= 0) | (xres <= 0)) + if ((yres <= 0) | (xres <= 0) && + (lastray+1) % accumulate == 0) { + while (next_child_nq(1) >= 0) + ; /* clear the queue */ + lastdone = lastray %= accumulate; waitflush = 1; /* flush next */ + } put_zero_record(++lastray); - } else if (++ninq >= qlimit || accumulate > 1 && + } else if (++ninq >= qlimit || lastray/accumulate != (lastray+ninq)/accumulate) { i = next_child_nq(0); /* manages output */ n = ninq; @@ -455,34 +442,126 @@ parental_loop() error(SYSTEM, "pipe write error"); kida[i].r1 = lastray+1; lastray += kida[i].nr = ninq; /* mark as busy */ - ninq = 0; if (lastray < lastdone) { /* RNUMBER wrapped? */ while (next_child_nq(1) >= 0) ; - lastdone = lastray = 0; + lastray -= ninq; + lastdone = lastray %= accumulate; } + ninq = 0; } if (raysleft && !--raysleft) break; /* preemptive EOI */ } while (next_child_nq(1) >= 0) /* empty results queue */ ; - /* output accumulated record */ - if (accumulate <= 0 || account < accumulate) { - end_children(); /* frees up file descriptors */ - if (account < accumulate) { - error(WARNING, "partial accumulation in final record"); - accumulate -= account; - } - for (i = 0; i < nmods; i++) - mod_output(out_bq->mca[i]); - end_record(); - free_binq(out_bq); + if (account < accumulate) { + error(WARNING, "partial accumulation in final record"); + free_binq(out_bq); /* XXX just ignore it */ out_bq = NULL; } + free_binq(NULL); /* clean up */ + lu_done(&ofiletab); if (raysleft) error(USER, "unexpected EOF on input"); - free_binq(NULL); /* clean up */ +} + + +/* Wait for the next available child by monitoring "to" pipes */ +static int +next_child_ready() +{ + fd_set writeset, errset; + int i, n; + + for (i = nchild; i--; ) /* see if there's one free first */ + if (!kida[i].nr) + return(i); + /* prepare select() call */ + FD_ZERO(&writeset); FD_ZERO(&errset); + n = 0; + for (i = nchild; i--; ) { + FD_SET(kida[i].pr.w, &writeset); + FD_SET(kida[i].pr.r, &errset); + if (kida[i].pr.w >= n) + n = kida[i].pr.w + 1; + if (kida[i].pr.r >= n) + n = kida[i].pr.r + 1; + } + errno = 0; + n = select(n, NULL, &writeset, &errset, NULL); + if (n < 0) + error(SYSTEM, "select() error in next_child_ready()"); + n = -1; /* identify waiting child */ + for (i = nchild; i--; ) { + if (FD_ISSET(kida[i].pr.r, &errset)) + error(USER, "rendering process died"); + if (FD_ISSET(kida[i].pr.w, &writeset)) + kida[n = i].nr = 0; + } + return(n); /* first available child */ +} + + +/* Modified parental loop for full accumulation mode (-c 0) */ +void +feeder_loop() +{ + static int ignore_warning_given = 0; + int ninq = 0; + FVECT orgdir[2*MAXIQ]; + int i, n; + /* load rays from stdin & process */ +#ifdef getc_unlocked + flockfile(stdin); /* avoid lock/unlock overhead */ +#endif + while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) { + if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */ + (orgdir[2*ninq+1][1] == 0.0) & + (orgdir[2*ninq+1][2] == 0.0)) { + if (!ignore_warning_given++) + error(WARNING, + "dummy ray(s) ignored during accumulation\n"); + continue; + } + if (++ninq >= MAXIQ) { + i = next_child_ready(); /* get eager child */ + n = sizeof(FVECT)*2 * ninq; /* give assignment */ + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) + error(SYSTEM, "pipe write error"); + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; + ninq = 0; + if (lastray < lastdone) /* RNUMBER wrapped? */ + lastdone = lastray = 0; + } + if (raysleft && !--raysleft) + break; /* preemptive EOI */ + } + if (ninq) { /* polish off input */ + i = next_child_ready(); + n = sizeof(FVECT)*2 * ninq; + if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n) + error(SYSTEM, "pipe write error"); + kida[i].r1 = lastray+1; + lastray += kida[i].nr = ninq; + ninq = 0; + } + memset(orgdir, 0, sizeof(FVECT)*2); /* get results */ + for (i = nchild; i--; ) { + writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2); + queue_results(i); + } + if (recover) /* and from before? */ + queue_modifiers(); + end_children(0); /* free up file descriptors */ + for (i = 0; i < nmods; i++) + mod_output(out_bq->mca[i]); /* output accumulated record */ + end_record(); + free_binq(out_bq); /* clean up */ + out_bq = NULL; + free_binq(NULL); lu_done(&ofiletab); -#undef MAXIQ + if (raysleft) + error(USER, "unexpected EOF on input"); }