ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
(Generate patch)

Comparing ray/src/rt/rc3.c (file contents):
Revision 2.2 by greg, Sat Jun 9 16:47:27 2012 UTC vs.
Revision 2.6 by greg, Tue Jun 12 17:20:44 2012 UTC

# Line 13 | Line 13 | static const char RCSid[] = "$Id$";
13  
14   /* Modifier contribution queue (results waiting to be output) */
15   typedef struct s_binq {
16 <        int             ndx;            /* index for this entry */
17 <        int             n2add;          /* number left to add */
16 >        RNUMBER         ndx;            /* index for this entry */
17 >        RNUMBER         nadded;         /* accumulated so far */
18          struct s_binq   *next;          /* next in queue */
19          MODCONT         *mca[1];        /* contrib. array (extends struct) */
20   } BINQ;
# Line 22 | Line 22 | typedef struct s_binq {
22   static BINQ     *out_bq = NULL;         /* output bin queue */
23   static BINQ     *free_bq = NULL;        /* free queue entries */
24  
25 < static SUBPROC  kida[MAXPROCESS];       /* child processes */
26 < static FILE     *inq_fp[MAXPROCESS];    /* input streams */
25 > static struct {
26 >        RNUMBER r1;                     /* assigned ray starting index */
27 >        SUBPROC pr;                     /* PID, i/o descriptors */
28 >        FILE    *infp;                  /* file pointer to read from process */
29 >        int     nr;                     /* number rays to sum (0 if free) */
30 > } kida[MAXPROCESS];             /* our child processes */
31  
32  
33 < /* Get new (empty) bin queue entry */
33 > /* Get new bin queue entry */
34   static BINQ *
35   new_binq()
36   {
37 <        BINQ    *bp = free_bq;
37 >        BINQ    *bp;
38          int     i;
39  
40 <        if (bp != NULL) {               /* something already available? */
40 >        if (free_bq != NULL) {          /* something already available? */
41 >                bp = free_bq;
42                  free_bq = bp->next;
43                  bp->next = NULL;
44 <                bp->n2add = accumulate-1;
44 >                bp->nadded = 0;
45                  return(bp);
46          }
47                                          /* else allocate fresh */
48 <        bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
48 >        bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
49          if (bp == NULL)
50                  goto memerr;
51          for (i = nmods; i--; ) {
# Line 53 | Line 58 | new_binq()
58                  /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
59          }
60          bp->ndx = 0;
61 <        bp->n2add = accumulate-1;
61 >        bp->nadded = 0;
62          bp->next = NULL;
63          return(bp);
64   memerr:
# Line 105 | Line 110 | queue_modifiers()
110                          addcolor(mpout->cbin[j], mpin->cbin[j]);
111                  memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
112          }
113 +        out_bq->nadded++;
114   }
115  
116  
117 < /* Sum one modifier record into another (doesn't update n2add) */
117 > /* Sum one modifier record into another (updates nadded) */
118   static void
119   add_modbin(BINQ *dst, BINQ *src)
120   {
# Line 120 | Line 126 | add_modbin(BINQ *dst, BINQ *src)
126                  for (j = mpout->nbins; j--; )
127                          addcolor(mpout->cbin[j], mpin->cbin[j]);
128          }
129 +        dst->nadded += src->nadded;
130   }
131  
132  
# Line 143 | Line 150 | queue_output(BINQ *bp)
150          for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
151                                  b_cur = b_cur->next)
152                  b_last = b_cur;
153 +
154          if (b_last != NULL) {
155                  bp->next = b_cur;
156                  b_last->next = bp;
# Line 150 | Line 158 | queue_output(BINQ *bp)
158                  bp->next = out_bq;
159                  out_bq = bp;
160          }
161 <        if (accumulate <= 1)            /* no accumulating? */
161 >        if (accumulate == 1)            /* no accumulation? */
162                  return;
163          b_cur = out_bq;                 /* else merge accumulation entries */
164          while (b_cur->next != NULL) {
165 <                if (b_cur->n2add <= 0 ||
165 >                if (b_cur->nadded >= accumulate ||
166                                  (b_cur->ndx-1)/accumulate !=
167                                  (b_cur->next->ndx-1)/accumulate) {
168                          b_cur = b_cur->next;
169                          continue;
170                  }
171                  add_modbin(b_cur, b_cur->next);
164                b_cur->n2add--;
172                  b_last = b_cur->next;
173                  b_cur->next = b_last->next;
174                  b_last->next = NULL;
# Line 170 | Line 177 | queue_output(BINQ *bp)
177   }
178  
179  
180 < /* Get current with output FIFO by producing ready results */
180 > /* Count number of records ready for output */
181   static int
182 < output_catchup()
182 > queue_ready()
183   {
184 +        int     nready = 0;
185 +        BINQ    *bp;
186 +
187 +        if (accumulate <= 0)            /* just accumulating? */
188 +                return(0);
189 +
190 +        for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
191 +                                bp->ndx == lastdone+nready*accumulate+1;
192 +                                bp = bp->next)
193 +                ++nready;
194 +
195 +        return(nready);
196 + }
197 +
198 +
199 + /* Catch up with output queue by producing ready results */
200 + static int
201 + output_catchup(int nmax)
202 + {
203          int     nout = 0;
204          BINQ    *bp;
205          int     i;
# Line 181 | Line 207 | output_catchup()
207          if (accumulate <= 0)            /* just accumulating? */
208                  return(0);
209                                          /* else output ready results */
210 <        while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
210 >        while (out_bq != NULL && out_bq->nadded >= accumulate
211 >                                && out_bq->ndx == lastdone+1) {
212 >                if ((nmax > 0) & (nout >= nmax))
213 >                        break;
214                  bp = out_bq;                    /* pop off first entry */
215                  out_bq = bp->next;
216                  bp->next = NULL;
# Line 196 | Line 225 | output_catchup()
225   }
226  
227  
228 < /* Put a zero record in results queue */
228 > /* Put a zero record in results queue & output */
229   void
230 < zero_record(int ndx)
230 > put_zero_record(int ndx)
231   {
232          BINQ    *bp = new_binq();
233          int     i;
# Line 206 | Line 235 | zero_record(int ndx)
235          for (i = nmods; i--; )
236                  memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
237          bp->ndx = ndx;
238 +        bp->nadded = 1;
239          queue_output(bp);
240 +        output_catchup(0);
241   }
242  
243  
244 + /* Get results from child process and add to queue */
245 + static void
246 + queue_results(int k)
247 + {
248 +        BINQ    *bq = new_binq();       /* get results holder */
249 +        int     j;
250 +
251 +        bq->ndx = kida[k].r1;
252 +        bq->nadded = kida[k].nr;
253 +                                        /* read from child */
254 +        for (j = 0; j < nmods; j++)
255 +                if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
256 +                                        kida[k].infp) != bq->mca[j]->nbins)
257 +                        error(SYSTEM, "read error from render process");
258 +                        
259 +        queue_output(bq);               /* put results in output queue */
260 +        kida[k].nr = 0;                 /* mark child as available */
261 + }
262 +
263 +
264   /* callback to set output spec to NULL (stdout) */
265   static int
266   set_stdout(const LUENT *le, void *p)
# Line 233 | Line 284 | in_rchild()
284                  int     p0[2], p1[2];
285                  int     pid;
286                                          /* prepare i/o pipes */
287 +                errno = 0;
288                  if (pipe(p0) < 0 || pipe(p1) < 0)
289                          error(SYSTEM, "pipe() call failed!");
290                  pid = fork();           /* fork parent process */
291                  if (pid == 0) {         /* if in child, set up & return true */
292                          close(p0[1]); close(p1[0]);
293 +                        lu_doall(&modconttab, set_stdout, NULL);
294 +                        lu_done(&ofiletab);
295 +                        while (nchild--) {      /* don't share other pipes */
296 +                                close(kida[nchild].pr.w);
297 +                                fclose(kida[nchild].infp);
298 +                        }
299                          dup2(p0[0], 0); close(p0[0]);
300                          dup2(p1[1], 1); close(p1[1]);
301                          inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
302                          outfmt = 'd';
303                          header = 0;
246                        waitflush = xres = 1;
304                          yres = 0;
305                          raysleft = 0;
306 <                        account = accumulate = 1;
307 <                        lu_doall(&modconttab, set_stdout, NULL);
308 <                        nchild = -1;
306 >                        if (accumulate == 1) {
307 >                                waitflush = xres = 1;
308 >                                account = accumulate = 1;
309 >                        } else {        /* parent controls accumulation */
310 >                                waitflush = xres = 0;
311 >                                account = accumulate = 0;
312 >                        }
313                          return(1);      /* child return value */
314                  }
315                  if (pid < 0)
316                          error(SYSTEM, "fork() call failed!");
317 <                                        /* connect our pipes */
317 >                                        /* connect parent's pipes */
318                  close(p0[0]); close(p1[1]);
319 <                kida[nchild].r = p1[0];
320 <                kida[nchild].w = p0[1];
321 <                kida[nchild].pid = pid;
322 <                kida[nchild].running = -1;
323 <                inq_fp[nchild] = fdopen(p1[0], "rb");
324 <                if (inq_fp[nchild] == NULL)
319 >                kida[nchild].pr.r = p1[0];
320 >                kida[nchild].pr.w = p0[1];
321 >                kida[nchild].pr.pid = pid;
322 >                kida[nchild].pr.running = 1;
323 >                kida[nchild].infp = fdopen(p1[0], "rb");
324 >                if (kida[nchild].infp == NULL)
325                          error(SYSTEM, "out of memory in in_rchild()");
326 <                ++nchild;
326 > #ifdef getc_unlocked
327 >                flockfile(kida[nchild].infp);   /* avoid mutex overhead */
328 > #endif
329 >                kida[nchild++].nr = 0;  /* mark as available */
330          }
331          return(0);                      /* parent return value */
332   #endif
# Line 275 | Line 339 | end_children()
339   {
340          int     status;
341          
342 <        while (nchild-- > 0) {
343 <                kida[nchild].r = -1;    /* close(-1) error is ignored */
344 <                if ((status = close_process(&kida[nchild])) > 0) {
342 >        while (nchild > 0) {
343 >                nchild--;
344 >                fclose(kida[nchild].infp);      
345 >                kida[nchild].pr.r = -1;         /* close(-1) error is ignored */
346 >                if ((status = close_process(&kida[nchild].pr)) > 0) {
347                          sprintf(errmsg,
348                                  "rendering process returned bad status (%d)",
349                                          status);
350                          error(WARNING, errmsg);
351                  }
286                fclose(inq_fp[nchild]); /* performs actual close() */
352          }
353   }
354  
355  
356 < /* Wait for the next available child, managing output queue as well */
356 > /* Wait for the next available child, managing output queue simultaneously */
357   static int
358 < next_child_nq(int force_wait)
358 > next_child_nq(int flushing)
359   {
360          static struct timeval   polling;
361 <        struct timeval          *pmode = force_wait | (accumulate <= 0) ?
297 <                                        (struct timeval *)NULL : &polling;
361 >        struct timeval          *pmode;
362          fd_set                  readset, errset;
363 <        int                     i, j, n, nr;
363 >        int                     i, n, nr, nqr;
364  
365 <        if (!force_wait)                /* see if there's one free */
365 >        if (!flushing)                  /* see if there's one free */
366                  for (i = nchild; i--; )
367 <                        if (kida[i].running < 0)
367 >                        if (!kida[i].nr)
368                                  return(i);
369 +
370 +        nqr = queue_ready();            /* choose blocking mode or polling */
371 +        if ((nqr > 0) & !flushing)
372 +                pmode = &polling;
373 +        else
374 +                pmode = NULL;
375 + tryagain:                               /* catch up with output? */
376 +        if (pmode == &polling) {
377 +                if (nqr > nchild)       /* don't get too far behind */
378 +                        nqr -= output_catchup(nqr-nchild);
379 +        } else if (nqr > 0)             /* clear output before blocking */
380 +                nqr -= output_catchup(0);
381                                          /* prepare select() call */
382          FD_ZERO(&readset); FD_ZERO(&errset);
383          n = nr = 0;
384          for (i = nchild; i--; ) {
385 <                if (kida[i].running > 0) {
386 <                        FD_SET(kida[i].r, &readset);
385 >                if (kida[i].nr) {
386 >                        FD_SET(kida[i].pr.r, &readset);
387                          ++nr;
388                  }
389 <                FD_SET(kida[i].r, &errset);
390 <                if (kida[i].r >= n)
391 <                        n = kida[i].r + 1;
389 >                FD_SET(kida[i].pr.r, &errset);
390 >                if (kida[i].pr.r >= n)
391 >                        n = kida[i].pr.r + 1;
392          }
393 <        if (!nr)                        /* nothing going on */
393 >        if (!nr)                        /* nothing to wait for? */
394                  return(-1);
319 tryagain:
320        if (pmode == NULL)              /* about to block, so catch up */
321                output_catchup();
395          if ((nr > 1) | (pmode == &polling)) {
396                  errno = 0;
397                  nr = select(n, &readset, NULL, &errset, pmode);
398 <                if (!nr & (pmode == &polling)) {
398 >                if (!nr) {
399                          pmode = NULL;   /* try again, blocking this time */
400                          goto tryagain;
401                  }
402 <                if (nr <= 0)
403 <                        error(SYSTEM, "select call error in next_child_nq()");
402 >                if (nr < 0)
403 >                        error(SYSTEM, "select() error in next_child_nq()");
404          } else
405                  FD_ZERO(&errset);
406          n = -1;                         /* read results from child(ren) */
407          for (i = nchild; i--; ) {
408 <                BINQ    *bq;
336 <                if (FD_ISSET(kida[i].r, &errset))
408 >                if (FD_ISSET(kida[i].pr.r, &errset))
409                          error(USER, "rendering process died");
410 <                if (!FD_ISSET(kida[i].r, &readset))
411 <                        continue;
340 <                bq = new_binq();        /* get results holder */
341 <                bq->ndx = kida[i].running;
342 <                                        /* read from child */
343 <                for (j = 0; j < nmods; j++) {
344 <                        n = bq->mca[j]->nbins;
345 <                        nr = fread(bq->mca[j]->cbin,sizeof(DCOLOR),n,inq_fp[i]);
346 <                        if (nr != n)
347 <                                error(SYSTEM, "read error from render process");
348 <                }
349 <                queue_output(bq);       /* put results in output queue */
350 <                kida[i].running = -1;   /* mark child as available */
351 <                n = i;
410 >                if (FD_ISSET(kida[i].pr.r, &readset))
411 >                        queue_results(n = i);
412          }
413 <        return(n);                      /* last available child */
413 >        return(n);                      /* first available child */
414   }
415  
416  
# Line 358 | Line 418 | tryagain:
418   void
419   parental_loop()
420   {
421 + #define MAXIQ           (int)(PIPE_BUF/(sizeof(FVECT)*2))
422          static int      ignore_warning_given = 0;
423 <        FVECT           orgdir[2];
423 >        int             qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
424 >        int             ninq = 0;
425 >        FVECT           orgdir[2*MAXIQ];
426          double          d;
427 +        int             i, n;
428                                          /* load rays from stdin & process */
429   #ifdef getc_unlocked
430          flockfile(stdin);               /* avoid lock/unlock overhead */
431   #endif
432 <        while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
432 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
433                  d = normalize(orgdir[1]);
434 <                                                        /* asking for flush? */
435 <                if ((d == 0.0) & (accumulate != 1)) {
436 <                        if (!ignore_warning_given++)
437 <                                error(WARNING,
434 >                if (d == 0.0) {                         /* asking for flush? */
435 >                        if (accumulate != 1) {
436 >                                if (!ignore_warning_given++)
437 >                                        error(WARNING,
438                                  "dummy ray(s) ignored during accumulation\n");
439 <                        continue;
440 <                }
377 <                if ((d == 0.0) | (lastray+1 < lastray)) {
439 >                                continue;
440 >                        }
441                          while (next_child_nq(1) >= 0)
442 <                                ;
442 >                                ;                       /* clear the queue */
443                          lastdone = lastray = 0;
381                }
382                if (d == 0.0) {
444                          if ((yres <= 0) | (xres <= 0))
445 <                                waitflush = 1;          /* flush right after */
446 <                        zero_record(++lastray);
447 <                } else {                                /* else assign */
448 <                        int     avail = next_child_nq(0);
449 <                        if (writebuf(kida[avail].w, (char *)orgdir,
450 <                                        sizeof(FVECT)*2) != sizeof(FVECT)*2)
445 >                                waitflush = 1;          /* flush next */
446 >                        put_zero_record(++lastray);
447 >                } else if (++ninq >= qlimit || accumulate > 1 &&
448 >                            lastray/accumulate != (lastray+ninq)/accumulate) {
449 >                        i = next_child_nq(0);           /* manages output */
450 >                        n = ninq;
451 >                        if (accumulate != 1)            /* request flush? */
452 >                                memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
453 >                        n *= sizeof(FVECT)*2;           /* send assignment */
454 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
455                                  error(SYSTEM, "pipe write error");
456 <                        kida[avail].running = ++lastray;
456 >                        kida[i].r1 = lastray+1;
457 >                        lastray += kida[i].nr = ninq;   /* mark as busy */
458 >                        ninq = 0;
459 >                        if (lastray < lastdone) {       /* RNUMBER wrapped? */
460 >                                while (next_child_nq(1) >= 0)
461 >                                        ;
462 >                                lastdone = lastray = 0;
463 >                        }
464                  }
465                  if (raysleft && !--raysleft)
466                          break;                          /* preemptive EOI */
# Line 397 | Line 469 | parental_loop()
469                  ;
470                                                  /* output accumulated record */
471          if (accumulate <= 0 || account < accumulate) {
472 <                int     i;
472 >                end_children();                 /* frees up file descriptors */
473                  if (account < accumulate) {
474                          error(WARNING, "partial accumulation in final record");
475                          accumulate -= account;
# Line 405 | Line 477 | parental_loop()
477                  for (i = 0; i < nmods; i++)
478                          mod_output(out_bq->mca[i]);
479                  end_record();
480 +                free_binq(out_bq);
481 +                out_bq = NULL;
482          }
483          if (raysleft)
484                  error(USER, "unexpected EOF on input");
485          free_binq(NULL);                        /* clean up */
486          lu_done(&ofiletab);
487 + #undef MAXIQ
488   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines