ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
(Generate patch)

Comparing ray/src/rt/rc3.c (file contents):
Revision 2.1 by greg, Sat Jun 9 07:16:47 2012 UTC vs.
Revision 2.4 by greg, Mon Jun 11 05:07:55 2012 UTC

# Line 14 | Line 14 | static const char RCSid[] = "$Id$";
14   /* Modifier contribution queue (results waiting to be output) */
15   typedef struct s_binq {
16          int             ndx;            /* index for this entry */
17 <        int             n2add;          /* number left to add */
17 >        int             nadded;         /* accumulated so far */
18          struct s_binq   *next;          /* next in queue */
19          MODCONT         *mca[1];        /* contrib. array (extends struct) */
20   } BINQ;
# Line 23 | Line 23 | static BINQ    *out_bq = NULL;         /* output bin queue */
23   static BINQ     *free_bq = NULL;        /* free queue entries */
24  
25   static SUBPROC  kida[MAXPROCESS];       /* child processes */
26 + static FILE     *inq_fp[MAXPROCESS];    /* input streams */
27  
28  
29 < /* Get new (empty) bin queue entry */
29 > /* Get new bin queue entry */
30   static BINQ *
31   new_binq()
32   {
33 <        BINQ    *bp = free_bq;
33 >        BINQ    *bp;
34          int     i;
35  
36 <        if (bp != NULL) {               /* something already available? */
36 >        if (free_bq != NULL) {          /* something already available? */
37 >                bp = free_bq;
38                  free_bq = bp->next;
39                  bp->next = NULL;
40 <                bp->n2add = accumulate-1;
40 >                bp->nadded = 1;
41                  return(bp);
42          }
43                                          /* else allocate fresh */
44 <        bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
44 >        bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
45          if (bp == NULL)
46                  goto memerr;
47          for (i = nmods; i--; ) {
# Line 52 | Line 54 | new_binq()
54                  /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
55          }
56          bp->ndx = 0;
57 <        bp->n2add = accumulate-1;
57 >        bp->nadded = 1;
58          bp->next = NULL;
59          return(bp);
60   memerr:
# Line 107 | Line 109 | queue_modifiers()
109   }
110  
111  
112 < /* Sum one modifier record into another (doesn't update n2add) */
112 > /* Sum one modifier record into another (updates nadded) */
113   static void
114   add_modbin(BINQ *dst, BINQ *src)
115   {
# Line 119 | Line 121 | add_modbin(BINQ *dst, BINQ *src)
121                  for (j = mpout->nbins; j--; )
122                          addcolor(mpout->cbin[j], mpin->cbin[j]);
123          }
124 +        dst->nadded += src->nadded;
125   }
126  
127  
128 < /* Queue output, catching up with and freeing FIFO entries when possible */
129 < static int
128 > /* Queue values for later output */
129 > static void
130   queue_output(BINQ *bp)
131   {
129        int     nout = 0;
132          BINQ    *b_last, *b_cur;
131        int     i;
133  
134          if (accumulate <= 0) {          /* just accumulating? */
135                  if (out_bq == NULL) {
# Line 138 | Line 139 | queue_output(BINQ *bp)
139                          add_modbin(out_bq, bp);
140                          free_binq(bp);
141                  }
142 <                return(0);
142 >                return;
143          }
144          b_last = NULL;                  /* else insert in output queue */
145          for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
146                                  b_cur = b_cur->next)
147                  b_last = b_cur;
148 +
149          if (b_last != NULL) {
150                  bp->next = b_cur;
151                  b_last->next = bp;
# Line 151 | Line 153 | queue_output(BINQ *bp)
153                  bp->next = out_bq;
154                  out_bq = bp;
155          }
156 <        if (accumulate > 1) {           /* merge accumulation entries */
157 <                b_cur = out_bq;
158 <                while (b_cur->next != NULL) {
159 <                        if (b_cur->n2add <= 0 ||
160 <                                        (b_cur->ndx-1)/accumulate !=
161 <                                        (b_cur->next->ndx-1)/accumulate) {
162 <                                b_cur = b_cur->next;
163 <                                continue;
164 <                        }
163 <                        add_modbin(b_cur, b_cur->next);
164 <                        b_cur->n2add--;
165 <                        b_last = b_cur->next;
166 <                        b_cur->next = b_last->next;
167 <                        b_last->next = NULL;
168 <                        free_binq(b_last);
156 >        if (accumulate == 1)            /* no accumulation? */
157 >                return;
158 >        b_cur = out_bq;                 /* else merge accumulation entries */
159 >        while (b_cur->next != NULL) {
160 >                if (b_cur->nadded >= accumulate ||
161 >                                (b_cur->ndx-1)/accumulate !=
162 >                                (b_cur->next->ndx-1)/accumulate) {
163 >                        b_cur = b_cur->next;
164 >                        continue;
165                  }
166 +                add_modbin(b_cur, b_cur->next);
167 +                b_last = b_cur->next;
168 +                b_cur->next = b_last->next;
169 +                b_last->next = NULL;
170 +                free_binq(b_last);
171          }
172 <                                        /* output ready results */
173 <        while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
174 <                b_cur = out_bq;                 /* pop off first entry */
175 <                out_bq = b_cur->next;
176 <                b_cur->next = NULL;
172 > }
173 >
174 >
175 > /* Count number of records ready for output */
176 > static int
177 > queue_ready()
178 > {
179 >        int     nready = 0;
180 >        BINQ    *bp;
181 >
182 >        if (accumulate <= 0)            /* just accumulating? */
183 >                return(0);
184 >
185 >        for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
186 >                                bp->ndx == lastdone+nready*accumulate+1;
187 >                                bp = bp->next)
188 >                ++nready;
189 >
190 >        return(nready);
191 > }
192 >
193 >
194 > /* Catch up with output queue by producing ready results */
195 > static int
196 > output_catchup(int nmax)
197 > {
198 >        int     nout = 0;
199 >        BINQ    *bp;
200 >        int     i;
201 >
202 >        if (accumulate <= 0)            /* just accumulating? */
203 >                return(0);
204 >                                        /* else output ready results */
205 >        while (out_bq != NULL && out_bq->nadded >= accumulate
206 >                                && out_bq->ndx == lastdone+1) {
207 >                if ((nmax > 0) & (nout >= nmax))
208 >                        break;
209 >                bp = out_bq;                    /* pop off first entry */
210 >                out_bq = bp->next;
211 >                bp->next = NULL;
212                  for (i = 0; i < nmods; i++)     /* output record */
213 <                        mod_output(b_cur->mca[i]);
213 >                        mod_output(bp->mca[i]);
214                  end_record();
215 <                free_binq(b_cur);               /* free this entry */
215 >                free_binq(bp);                  /* free this entry */
216                  lastdone += accumulate;
217                  ++nout;
218          }
# Line 184 | Line 220 | queue_output(BINQ *bp)
220   }
221  
222  
223 < /* Put a zero record in results queue */
223 > /* Put a zero record in results queue & output */
224   void
225 < zero_record(int ndx)
225 > put_zero_record(int ndx)
226   {
227          BINQ    *bp = new_binq();
228          int     i;
# Line 195 | Line 231 | zero_record(int ndx)
231                  memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
232          bp->ndx = ndx;
233          queue_output(bp);
234 +        output_catchup(0);
235   }
236  
237  
# Line 221 | Line 258 | in_rchild()
258                  int     p0[2], p1[2];
259                  int     pid;
260                                          /* prepare i/o pipes */
261 +                errno = 0;
262                  if (pipe(p0) < 0 || pipe(p1) < 0)
263                          error(SYSTEM, "pipe() call failed!");
264                  pid = fork();           /* fork parent process */
265                  if (pid == 0) {         /* if in child, set up & return true */
266                          close(p0[1]); close(p1[0]);
267 +                        lu_doall(&modconttab, set_stdout, NULL);
268 +                        lu_done(&ofiletab);
269 +                        while (nchild--) {
270 +                                close(kida[nchild].w);
271 +                                fclose(inq_fp[nchild]);
272 +                        }
273                          dup2(p0[0], 0); close(p0[0]);
274                          dup2(p1[1], 1); close(p1[1]);
275                          inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
# Line 235 | Line 279 | in_rchild()
279                          yres = 0;
280                          raysleft = 0;
281                          account = accumulate = 1;
238                        lu_doall(&modconttab, set_stdout, NULL);
239                        nchild = -1;
282                          return(1);      /* child return value */
283                  }
284                  if (pid < 0)
285                          error(SYSTEM, "fork() call failed!");
286 <                                        /* connect our pipes */
286 >                                        /* connect parent's pipes */
287                  close(p0[0]); close(p1[1]);
288                  kida[nchild].r = p1[0];
289                  kida[nchild].w = p0[1];
290                  kida[nchild].pid = pid;
291                  kida[nchild].running = -1;
292 +                inq_fp[nchild] = fdopen(p1[0], "rb");
293 +                if (inq_fp[nchild] == NULL)
294 +                        error(SYSTEM, "out of memory in in_rchild()");
295 + #ifdef getc_unlocked
296 +                flockfile(inq_fp[nchild]);      /* avoid mutex overhead */
297 + #endif
298                  ++nchild;
299          }
300          return(0);                      /* parent return value */
# Line 260 | Line 308 | end_children()
308   {
309          int     status;
310          
311 <        while (nchild-- > 0)
311 >        while (nchild-- > 0) {
312 >                kida[nchild].r = -1;    /* close(-1) error is ignored */
313                  if ((status = close_process(&kida[nchild])) > 0) {
314                          sprintf(errmsg,
315                                  "rendering process returned bad status (%d)",
316                                          status);
317                          error(WARNING, errmsg);
318                  }
319 +                fclose(inq_fp[nchild]); /* performs actual close() */
320 +        }
321   }
322  
323  
324 < /* Wait for the next available child */
324 > /* Wait for the next available child, managing output queue as well */
325   static int
326   next_child_nq(int force_wait)
327   {
328 <        fd_set  readset, errset;
329 <        int     i, j, n, nr;
328 >        static struct timeval   polling;
329 >        struct timeval          *pmode;
330 >        fd_set                  readset, errset;
331 >        int                     i, j, n, nr, nqr;
332  
333          if (!force_wait)                /* see if there's one free */
334                  for (i = nchild; i--; )
335                          if (kida[i].running < 0)
336                                  return(i);
337 +
338 +        nqr = queue_ready();            /* wait mode or polling? */
339 +        if (!nqr | force_wait | (accumulate <= 0))
340 +                pmode = NULL;
341 +        else
342 +                pmode = &polling;
343 + tryagain:
344 +        n = 0;                          /* catch up with output? */
345 +        if ((pmode == &polling) & (nqr > nchild))
346 +                n = nqr - nchild;
347 +        if ((pmode == NULL) & (nqr > 0) | (n > 0))
348 +                nqr -= output_catchup(n);
349                                          /* prepare select() call */
350          FD_ZERO(&readset); FD_ZERO(&errset);
351          n = nr = 0;
# Line 293 | Line 358 | next_child_nq(int force_wait)
358                  if (kida[i].r >= n)
359                          n = kida[i].r + 1;
360          }
361 <        if (!nr)                        /* nothing going on */
361 >        if (!nr)                        /* nothing to wait for? */
362                  return(-1);
363 <        if (nr > 1) {                   /* call select if multiple busy */
363 >        if ((nr > 1) | (pmode == &polling)) {
364                  errno = 0;
365 <                if (select(n, &readset, NULL, &errset, NULL) < 0)
366 <                        error(SYSTEM, "select call error in next_child_nq()");
365 >                i = select(n, &readset, NULL, &errset, pmode);
366 >                if (!i) {
367 >                        pmode = NULL;   /* try again, blocking this time */
368 >                        goto tryagain;
369 >                }
370 >                if (i < 0)
371 >                        error(SYSTEM, "select() error in next_child_nq()");
372          } else
373                  FD_ZERO(&errset);
374          n = -1;                         /* read results from child(ren) */
# Line 312 | Line 382 | next_child_nq(int force_wait)
382                  bq->ndx = kida[i].running;
383                                          /* read from child */
384                  for (j = 0; j < nmods; j++) {
385 <                        n = sizeof(DCOLOR)*bq->mca[j]->nbins;
386 <                        nr = readbuf(kida[i].r, (char *)bq->mca[j]->cbin, n);
387 <                        if (nr != n)
385 >                        nr = bq->mca[j]->nbins;
386 >                        if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr,
387 >                                                        inq_fp[i]) != nr)
388                                  error(SYSTEM, "read error from render process");
389                  }
390 <                queue_output(bq);       /* put results in output queue */
390 >                queue_output(bq);       /* add results to output queue */
391                  kida[i].running = -1;   /* mark child as available */
392                  n = i;
393          }
394 <        return(n);                      /* last available child */
394 >        return(n);                      /* first available child */
395   }
396  
397  
# Line 332 | Line 402 | parental_loop()
402          static int      ignore_warning_given = 0;
403          FVECT           orgdir[2];
404          double          d;
405 +        int             i;
406                                          /* load rays from stdin & process */
407   #ifdef getc_unlocked
408          flockfile(stdin);               /* avoid lock/unlock overhead */
# Line 347 | Line 418 | parental_loop()
418                  }
419                  if ((d == 0.0) | (lastray+1 < lastray)) {
420                          while (next_child_nq(1) >= 0)
421 <                                ;
421 >                                ;                       /* clear the queue */
422                          lastdone = lastray = 0;
423                  }
424                  if (d == 0.0) {
425                          if ((yres <= 0) | (xres <= 0))
426                                  waitflush = 1;          /* flush right after */
427 <                        zero_record(++lastray);
428 <                } else {                                /* else assign */
429 <                        int     avail = next_child_nq(0);
430 <                        if (writebuf(kida[avail].w, (char *)orgdir,
431 <                                        sizeof(FVECT)*2) != sizeof(FVECT)*2)
427 >                        put_zero_record(++lastray);
428 >                } else {                                /* else assign ray */
429 >                        i = next_child_nq(0);
430 >                        if (writebuf(kida[i].w, (char *)orgdir,
431 >                                        sizeof(orgdir)) != sizeof(orgdir))
432                                  error(SYSTEM, "pipe write error");
433 <                        kida[avail].running = ++lastray;
433 >                        kida[i].running = ++lastray;
434                  }
435                  if (raysleft && !--raysleft)
436                          break;                          /* preemptive EOI */
# Line 368 | Line 439 | parental_loop()
439                  ;
440                                                  /* output accumulated record */
441          if (accumulate <= 0 || account < accumulate) {
371                int     i;
442                  if (account < accumulate) {
443                          error(WARNING, "partial accumulation in final record");
444                          accumulate -= account;
# Line 376 | Line 446 | parental_loop()
446                  for (i = 0; i < nmods; i++)
447                          mod_output(out_bq->mca[i]);
448                  end_record();
449 +                free_binq(out_bq);
450 +                out_bq = NULL;
451          }
452          if (raysleft)
453                  error(USER, "unexpected EOF on input");

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines