ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
(Generate patch)

Comparing ray/src/rt/rc3.c (file contents):
Revision 2.4 by greg, Mon Jun 11 05:07:55 2012 UTC vs.
Revision 2.15 by greg, Thu Jun 21 17:14:32 2012 UTC

# Line 6 | Line 6 | static const char RCSid[] = "$Id$";
6   * Controlling process for multiple children
7   */
8  
9 + #include <signal.h>
10   #include "rcontrib.h"
10 #include "platform.h"
11   #include "rtprocess.h"
12   #include "selcall.h"
13  
14 + #define MAXIQ           (int)(PIPE_BUF/(sizeof(FVECT)*2))
15 +
16   /* Modifier contribution queue (results waiting to be output) */
17   typedef struct s_binq {
18 <        int             ndx;            /* index for this entry */
19 <        int             nadded;         /* accumulated so far */
18 >        RNUMBER         ndx;            /* index for this entry */
19 >        RNUMBER         nadded;         /* accumulated so far */
20          struct s_binq   *next;          /* next in queue */
21          MODCONT         *mca[1];        /* contrib. array (extends struct) */
22   } BINQ;
# Line 22 | Line 24 | typedef struct s_binq {
24   static BINQ     *out_bq = NULL;         /* output bin queue */
25   static BINQ     *free_bq = NULL;        /* free queue entries */
26  
27 < static SUBPROC  kida[MAXPROCESS];       /* child processes */
28 < static FILE     *inq_fp[MAXPROCESS];    /* input streams */
27 > static struct {
28 >        RNUMBER r1;                     /* assigned ray starting index */
29 >        SUBPROC pr;                     /* PID, i/o descriptors */
30 >        FILE    *infp;                  /* file pointer to read from process */
31 >        int     nr;                     /* number of rays to sum (0 if free) */
32 > } kida[MAXPROCESS];             /* our child processes */
33  
34  
35   /* Get new bin queue entry */
# Line 37 | Line 43 | new_binq()
43                  bp = free_bq;
44                  free_bq = bp->next;
45                  bp->next = NULL;
46 <                bp->nadded = 1;
46 >                bp->nadded = 0;
47                  return(bp);
48          }
49                                          /* else allocate fresh */
# Line 54 | Line 60 | new_binq()
60                  /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
61          }
62          bp->ndx = 0;
63 <        bp->nadded = 1;
63 >        bp->nadded = 0;
64          bp->next = NULL;
65          return(bp);
66   memerr:
# Line 90 | Line 96 | free_binq(BINQ *bp)
96  
97  
98   /* Add modifier values to accumulation record in queue and clear */
99 < void
99 > static void
100   queue_modifiers()
101   {
102          MODCONT *mpin, *mpout;
# Line 106 | Line 112 | queue_modifiers()
112                          addcolor(mpout->cbin[j], mpin->cbin[j]);
113                  memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
114          }
115 +        out_bq->nadded++;
116   }
117  
118  
# Line 141 | Line 148 | queue_output(BINQ *bp)
148                  }
149                  return;
150          }
151 <        b_last = NULL;                  /* else insert in output queue */
151 >        b_last = NULL;                  /* insert in output queue */
152          for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
153                                  b_cur = b_cur->next)
154                  b_last = b_cur;
# Line 179 | Line 186 | queue_ready()
186          int     nready = 0;
187          BINQ    *bp;
188  
182        if (accumulate <= 0)            /* just accumulating? */
183                return(0);
184
189          for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
190                                  bp->ndx == lastdone+nready*accumulate+1;
191                                  bp = bp->next)
# Line 198 | Line 202 | output_catchup(int nmax)
202          int     nout = 0;
203          BINQ    *bp;
204          int     i;
205 <
202 <        if (accumulate <= 0)            /* just accumulating? */
203 <                return(0);
204 <                                        /* else output ready results */
205 >                                        /* output ready results */
206          while (out_bq != NULL && out_bq->nadded >= accumulate
207                                  && out_bq->ndx == lastdone+1) {
208                  if ((nmax > 0) & (nout >= nmax))
# Line 230 | Line 231 | put_zero_record(int ndx)
231          for (i = nmods; i--; )
232                  memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
233          bp->ndx = ndx;
234 +        bp->nadded = 1;
235          queue_output(bp);
236          output_catchup(0);
237   }
238  
239  
240 + /* Get results from child process and add to queue */
241 + static void
242 + queue_results(int k)
243 + {
244 +        BINQ    *bq = new_binq();       /* get results holder */
245 +        int     j;
246 +
247 +        bq->ndx = kida[k].r1;
248 +        bq->nadded = kida[k].nr;
249 +                                        /* read from child */
250 +        for (j = 0; j < nmods; j++)
251 +                if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
252 +                                        kida[k].infp) != bq->mca[j]->nbins)
253 +                        error(SYSTEM, "read error from render process");
254 +                        
255 +        queue_output(bq);               /* put results in output queue */
256 +        kida[k].nr = 0;                 /* mark child as available */
257 + }
258 +
259 +
260   /* callback to set output spec to NULL (stdout) */
261   static int
262   set_stdout(const LUENT *le, void *p)
# Line 248 | Line 270 | set_stdout(const LUENT *le, void *p)
270   int
271   in_rchild()
272   {
273 < #ifdef _WIN32
274 <        error(WARNING, "multiprocessing unsupported -- running solo");
275 <        nproc = 1;
254 <        return(1);
255 < #else
256 <                                        /* try to fork ourselves */
257 <        while (nchild < nproc) {
258 <                int     p0[2], p1[2];
259 <                int     pid;
260 <                                        /* prepare i/o pipes */
273 >        int     rval;
274 >
275 >        while (nchild < nproc) {        /* fork until target reached */
276                  errno = 0;
277 <                if (pipe(p0) < 0 || pipe(p1) < 0)
278 <                        error(SYSTEM, "pipe() call failed!");
279 <                pid = fork();           /* fork parent process */
280 <                if (pid == 0) {         /* if in child, set up & return true */
266 <                        close(p0[1]); close(p1[0]);
277 >                rval = open_process(&kida[nchild].pr, NULL);
278 >                if (rval < 0)
279 >                        error(SYSTEM, "open_process() call failed");
280 >                if (rval == 0) {        /* if in child, set up & return true */
281                          lu_doall(&modconttab, set_stdout, NULL);
282                          lu_done(&ofiletab);
283 <                        while (nchild--) {
284 <                                close(kida[nchild].w);
285 <                                fclose(inq_fp[nchild]);
283 >                        while (nchild--) {      /* don't share other pipes */
284 >                                close(kida[nchild].pr.w);
285 >                                fclose(kida[nchild].infp);
286                          }
273                        dup2(p0[0], 0); close(p0[0]);
274                        dup2(p1[1], 1); close(p1[1]);
287                          inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
288                          outfmt = 'd';
289                          header = 0;
278                        waitflush = xres = 1;
290                          yres = 0;
291                          raysleft = 0;
292 <                        account = accumulate = 1;
293 <                        return(1);      /* child return value */
292 >                        if (accumulate == 1) {
293 >                                waitflush = xres = 1;
294 >                                account = accumulate = 1;
295 >                        } else {        /* parent controls accumulation */
296 >                                waitflush = xres = 0;
297 >                                account = accumulate = 0;
298 >                        }
299 >                        return(1);      /* return "true" in child */
300                  }
301 <                if (pid < 0)
302 <                        error(SYSTEM, "fork() call failed!");
303 <                                        /* connect parent's pipes */
304 <                close(p0[0]); close(p1[1]);
305 <                kida[nchild].r = p1[0];
289 <                kida[nchild].w = p0[1];
290 <                kida[nchild].pid = pid;
291 <                kida[nchild].running = -1;
292 <                inq_fp[nchild] = fdopen(p1[0], "rb");
293 <                if (inq_fp[nchild] == NULL)
301 >                if (rval != PIPE_BUF)
302 >                        error(CONSISTENCY, "bad value from open_process()");
303 >                                        /* connect to child's output */
304 >                kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
305 >                if (kida[nchild].infp == NULL)
306                          error(SYSTEM, "out of memory in in_rchild()");
307   #ifdef getc_unlocked
308 <                flockfile(inq_fp[nchild]);      /* avoid mutex overhead */
308 >                flockfile(kida[nchild].infp);   /* avoid mutex overhead */
309   #endif
310 <                ++nchild;
310 >                kida[nchild++].nr = 0;  /* mark as available */
311          }
312 <        return(0);                      /* parent return value */
301 < #endif
312 >        return(0);                      /* return "false" in parent */
313   }
314  
315  
316   /* Close child processes */
317   void
318 < end_children()
318 > end_children(int immed)
319   {
320          int     status;
321          
322 <        while (nchild-- > 0) {
323 <                kida[nchild].r = -1;    /* close(-1) error is ignored */
324 <                if ((status = close_process(&kida[nchild])) > 0) {
322 >        while (nchild > 0) {
323 >                nchild--;
324 > #ifdef SIGKILL
325 >                if (immed)              /* error mode -- quick exit */
326 >                        kill(kida[nchild].pr.pid, SIGKILL);
327 > #endif
328 >                if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
329                          sprintf(errmsg,
330                                  "rendering process returned bad status (%d)",
331                                          status);
332                          error(WARNING, errmsg);
333                  }
334 <                fclose(inq_fp[nchild]); /* performs actual close() */
334 >                fclose(kida[nchild].infp);
335          }
336   }
337  
338  
339 < /* Wait for the next available child, managing output queue as well */
339 > /* Wait for the next available child, managing output queue simultaneously */
340   static int
341 < next_child_nq(int force_wait)
341 > next_child_nq(int flushing)
342   {
343          static struct timeval   polling;
344          struct timeval          *pmode;
345          fd_set                  readset, errset;
346 <        int                     i, j, n, nr, nqr;
346 >        int                     i, n, nr, nqr;
347  
348 <        if (!force_wait)                /* see if there's one free */
348 >        if (!flushing)                  /* see if there's one free */
349                  for (i = nchild; i--; )
350 <                        if (kida[i].running < 0)
350 >                        if (!kida[i].nr)
351                                  return(i);
352  
353 <        nqr = queue_ready();            /* wait mode or polling? */
354 <        if (!nqr | force_wait | (accumulate <= 0))
340 <                pmode = NULL;
341 <        else
353 >        nqr = queue_ready();            /* choose blocking mode or polling */
354 >        if ((nqr > 0) & !flushing)
355                  pmode = &polling;
356 < tryagain:
357 <        n = 0;                          /* catch up with output? */
358 <        if ((pmode == &polling) & (nqr > nchild))
359 <                n = nqr - nchild;
360 <        if ((pmode == NULL) & (nqr > 0) | (n > 0))
361 <                nqr -= output_catchup(n);
356 >        else
357 >                pmode = NULL;
358 > tryagain:                               /* catch up with output? */
359 >        if (pmode == &polling) {
360 >                if (nqr > nchild)       /* don't get too far behind */
361 >                        nqr -= output_catchup(nqr-nchild);
362 >        } else if (nqr > 0)             /* clear output before blocking */
363 >                nqr -= output_catchup(0);
364                                          /* prepare select() call */
365          FD_ZERO(&readset); FD_ZERO(&errset);
366          n = nr = 0;
367          for (i = nchild; i--; ) {
368 <                if (kida[i].running > 0) {
369 <                        FD_SET(kida[i].r, &readset);
368 >                if (kida[i].nr) {
369 >                        FD_SET(kida[i].pr.r, &readset);
370                          ++nr;
371                  }
372 <                FD_SET(kida[i].r, &errset);
373 <                if (kida[i].r >= n)
374 <                        n = kida[i].r + 1;
372 >                FD_SET(kida[i].pr.r, &errset);
373 >                if (kida[i].pr.r >= n)
374 >                        n = kida[i].pr.r + 1;
375          }
376          if (!nr)                        /* nothing to wait for? */
377                  return(-1);
378          if ((nr > 1) | (pmode == &polling)) {
379                  errno = 0;
380 <                i = select(n, &readset, NULL, &errset, pmode);
381 <                if (!i) {
380 >                nr = select(n, &readset, NULL, &errset, pmode);
381 >                if (!nr) {
382                          pmode = NULL;   /* try again, blocking this time */
383                          goto tryagain;
384                  }
385 <                if (i < 0)
385 >                if (nr < 0)
386                          error(SYSTEM, "select() error in next_child_nq()");
387          } else
388                  FD_ZERO(&errset);
389          n = -1;                         /* read results from child(ren) */
390          for (i = nchild; i--; ) {
391 <                BINQ    *bq;
377 <                if (FD_ISSET(kida[i].r, &errset))
391 >                if (FD_ISSET(kida[i].pr.r, &errset))
392                          error(USER, "rendering process died");
393 <                if (!FD_ISSET(kida[i].r, &readset))
394 <                        continue;
381 <                bq = new_binq();        /* get results holder */
382 <                bq->ndx = kida[i].running;
383 <                                        /* read from child */
384 <                for (j = 0; j < nmods; j++) {
385 <                        nr = bq->mca[j]->nbins;
386 <                        if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr,
387 <                                                        inq_fp[i]) != nr)
388 <                                error(SYSTEM, "read error from render process");
389 <                }
390 <                queue_output(bq);       /* add results to output queue */
391 <                kida[i].running = -1;   /* mark child as available */
392 <                n = i;
393 >                if (FD_ISSET(kida[i].pr.r, &readset))
394 >                        queue_results(n = i);
395          }
396          return(n);                      /* first available child */
397   }
# Line 400 | Line 402 | void
402   parental_loop()
403   {
404          static int      ignore_warning_given = 0;
405 <        FVECT           orgdir[2];
406 <        double          d;
407 <        int             i;
405 >        int             qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
406 >        int             ninq = 0;
407 >        FVECT           orgdir[2*MAXIQ];
408 >        int             i, n;
409                                          /* load rays from stdin & process */
410   #ifdef getc_unlocked
411          flockfile(stdin);               /* avoid lock/unlock overhead */
412   #endif
413 <        while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
414 <                d = normalize(orgdir[1]);
415 <                                                        /* asking for flush? */
416 <                if ((d == 0.0) & (accumulate != 1)) {
417 <                        if (!ignore_warning_given++)
418 <                                error(WARNING,
413 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
414 >                if (orgdir[2*ninq+1][0] == 0.0 &&       /* asking for flush? */
415 >                                (orgdir[2*ninq+1][1] == 0.0) &
416 >                                (orgdir[2*ninq+1][2] == 0.0)) {
417 >                        if (accumulate != 1) {
418 >                                if (!ignore_warning_given++)
419 >                                        error(WARNING,
420                                  "dummy ray(s) ignored during accumulation\n");
421 <                        continue;
422 <                }
419 <                if ((d == 0.0) | (lastray+1 < lastray)) {
421 >                                continue;
422 >                        }
423                          while (next_child_nq(1) >= 0)
424                                  ;                       /* clear the queue */
425                          lastdone = lastray = 0;
423                }
424                if (d == 0.0) {
426                          if ((yres <= 0) | (xres <= 0))
427 <                                waitflush = 1;          /* flush right after */
427 >                                waitflush = 1;          /* flush next */
428                          put_zero_record(++lastray);
429 <                } else {                                /* else assign ray */
430 <                        i = next_child_nq(0);
431 <                        if (writebuf(kida[i].w, (char *)orgdir,
432 <                                        sizeof(orgdir)) != sizeof(orgdir))
429 >                } else if (++ninq >= qlimit ||
430 >                            lastray/accumulate != (lastray+ninq)/accumulate) {
431 >                        i = next_child_nq(0);           /* manages output */
432 >                        n = ninq;
433 >                        if (accumulate != 1)            /* request flush? */
434 >                                memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
435 >                        n *= sizeof(FVECT)*2;           /* send assignment */
436 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
437                                  error(SYSTEM, "pipe write error");
438 <                        kida[i].running = ++lastray;
438 >                        kida[i].r1 = lastray+1;
439 >                        lastray += kida[i].nr = ninq;   /* mark as busy */
440 >                        ninq = 0;
441 >                        if (lastray < lastdone) {       /* RNUMBER wrapped? */
442 >                                while (next_child_nq(1) >= 0)
443 >                                        ;
444 >                                lastdone = lastray = 0;
445 >                        }
446                  }
447                  if (raysleft && !--raysleft)
448                          break;                          /* preemptive EOI */
449          }
450          while (next_child_nq(1) >= 0)           /* empty results queue */
451                  ;
452 <                                                /* output accumulated record */
453 <        if (accumulate <= 0 || account < accumulate) {
454 <                if (account < accumulate) {
443 <                        error(WARNING, "partial accumulation in final record");
444 <                        accumulate -= account;
445 <                }
446 <                for (i = 0; i < nmods; i++)
447 <                        mod_output(out_bq->mca[i]);
448 <                end_record();
449 <                free_binq(out_bq);
452 >        if (account < accumulate) {
453 >                error(WARNING, "partial accumulation in final record");
454 >                free_binq(out_bq);              /* XXX just ignore it */
455                  out_bq = NULL;
456          }
457 +        free_binq(NULL);                        /* clean up */
458 +        lu_done(&ofiletab);
459          if (raysleft)
460                  error(USER, "unexpected EOF on input");
461 <        free_binq(NULL);                        /* clean up */
461 > }
462 >
463 >
464 > /* Wait for the next available child by monitoring "to" pipes */
465 > static int
466 > next_child_ready()
467 > {
468 >        fd_set                  writeset, errset;
469 >        int                     i, n;
470 >
471 >        for (i = nchild; i--; )         /* see if there's one free first */
472 >                if (!kida[i].nr)
473 >                        return(i);
474 >                                        /* prepare select() call */
475 >        FD_ZERO(&writeset); FD_ZERO(&errset);
476 >        n = 0;
477 >        for (i = nchild; i--; ) {
478 >                FD_SET(kida[i].pr.w, &writeset);
479 >                FD_SET(kida[i].pr.r, &errset);
480 >                if (kida[i].pr.w >= n)
481 >                        n = kida[i].pr.w + 1;
482 >                if (kida[i].pr.r >= n)
483 >                        n = kida[i].pr.r + 1;
484 >        }
485 >        errno = 0;
486 >        n = select(n, NULL, &writeset, &errset, NULL);
487 >        if (n < 0)
488 >                error(SYSTEM, "select() error in next_child_ready()");
489 >        n = -1;                         /* identify waiting child */
490 >        for (i = nchild; i--; ) {
491 >                if (FD_ISSET(kida[i].pr.r, &errset))
492 >                        error(USER, "rendering process died");
493 >                if (FD_ISSET(kida[i].pr.w, &writeset))
494 >                        kida[n = i].nr = 0;
495 >        }
496 >        return(n);                      /* first available child */
497 > }
498 >
499 >
500 > /* Modified parental loop for full accumulation mode (-c 0) */
501 > void
502 > feeder_loop()
503 > {
504 >        static int      ignore_warning_given = 0;
505 >        int             ninq = 0;
506 >        FVECT           orgdir[2*MAXIQ];
507 >        int             i, n;
508 >                                        /* load rays from stdin & process */
509 > #ifdef getc_unlocked
510 >        flockfile(stdin);               /* avoid lock/unlock overhead */
511 > #endif
512 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
513 >                if (orgdir[2*ninq+1][0] == 0.0 &&       /* asking for flush? */
514 >                                (orgdir[2*ninq+1][1] == 0.0) &
515 >                                (orgdir[2*ninq+1][2] == 0.0)) {
516 >                        if (!ignore_warning_given++)
517 >                                error(WARNING,
518 >                                "dummy ray(s) ignored during accumulation\n");
519 >                        continue;
520 >                }
521 >                if (++ninq >= MAXIQ) {
522 >                        i = next_child_ready();         /* get eager child */
523 >                        n = sizeof(FVECT)*2 * ninq;     /* give assignment */
524 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
525 >                                error(SYSTEM, "pipe write error");
526 >                        kida[i].r1 = lastray+1;
527 >                        lastray += kida[i].nr = ninq;
528 >                        ninq = 0;
529 >                        if (lastray < lastdone)         /* RNUMBER wrapped? */
530 >                                lastdone = lastray = 0;
531 >                }
532 >                if (raysleft && !--raysleft)
533 >                        break;                          /* preemptive EOI */
534 >        }
535 >        if (ninq) {                             /* polish off input */
536 >                i = next_child_ready();
537 >                n = sizeof(FVECT)*2 * ninq;
538 >                if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
539 >                        error(SYSTEM, "pipe write error");
540 >                kida[i].r1 = lastray+1;
541 >                lastray += kida[i].nr = ninq;
542 >                ninq = 0;
543 >        }
544 >        memset(orgdir, 0, sizeof(FVECT)*2);     /* get results */
545 >        for (i = nchild; i--; ) {
546 >                writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
547 >                queue_results(i);
548 >        }
549 >        if (recover)                            /* and from before? */
550 >                queue_modifiers();
551 >        end_children(0);                        /* free up file descriptors */
552 >        for (i = 0; i < nmods; i++)
553 >                mod_output(out_bq->mca[i]);     /* output accumulated record */
554 >        end_record();
555 >        free_binq(out_bq);                      /* clean up */
556 >        out_bq = NULL;
557 >        free_binq(NULL);
558          lu_done(&ofiletab);
559 +        if (raysleft)
560 +                error(USER, "unexpected EOF on input");
561   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines