ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.18
Committed: Thu Nov 15 19:41:03 2012 UTC (11 years, 5 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad4R2P2, rad4R2, rad4R2P1
Changes since 2.17: +20 -27 lines
Log Message:
Tweaks and fixes related to flushing with -c >1

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.18 static const char RCSid[] = "$Id: rc3.c,v 2.17 2012/11/15 15:26:52 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * Accumulate ray contributions for a set of materials
6     * Controlling process for multiple children
7     */
8    
9 greg 2.14 #include <signal.h>
10 greg 2.1 #include "rcontrib.h"
11     #include "selcall.h"
12    
13 greg 2.10 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
14    
15 greg 2.1 /* Modifier contribution queue (results waiting to be output) */
16     typedef struct s_binq {
17 greg 2.6 RNUMBER ndx; /* index for this entry */
18     RNUMBER nadded; /* accumulated so far */
19 greg 2.1 struct s_binq *next; /* next in queue */
20     MODCONT *mca[1]; /* contrib. array (extends struct) */
21     } BINQ;
22    
23     static BINQ *out_bq = NULL; /* output bin queue */
24     static BINQ *free_bq = NULL; /* free queue entries */
25    
26 greg 2.6 static struct {
27     RNUMBER r1; /* assigned ray starting index */
28     SUBPROC pr; /* PID, i/o descriptors */
29     FILE *infp; /* file pointer to read from process */
30 greg 2.10 int nr; /* number of rays to sum (0 if free) */
31 greg 2.6 } kida[MAXPROCESS]; /* our child processes */
32 greg 2.1
33    
34 greg 2.4 /* Get new bin queue entry */
35 greg 2.1 static BINQ *
36     new_binq()
37     {
38 greg 2.4 BINQ *bp;
39 greg 2.1 int i;
40    
41 greg 2.4 if (free_bq != NULL) { /* something already available? */
42     bp = free_bq;
43 greg 2.1 free_bq = bp->next;
44     bp->next = NULL;
45 greg 2.6 bp->nadded = 0;
46 greg 2.1 return(bp);
47     }
48     /* else allocate fresh */
49 greg 2.4 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
50 greg 2.1 if (bp == NULL)
51     goto memerr;
52     for (i = nmods; i--; ) {
53     MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
54     bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
55     sizeof(DCOLOR)*(mp->nbins-1));
56     if (bp->mca[i] == NULL)
57     goto memerr;
58     memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
59     /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
60     }
61     bp->ndx = 0;
62 greg 2.6 bp->nadded = 0;
63 greg 2.1 bp->next = NULL;
64     return(bp);
65     memerr:
66     error(SYSTEM, "out of memory in new_binq()");
67     return(NULL);
68     }
69    
70    
71     /* Free a bin queue entry */
72     static void
73     free_binq(BINQ *bp)
74     {
75     int i;
76    
77     if (bp == NULL) { /* signal to release our free list */
78     while ((bp = free_bq) != NULL) {
79     free_bq = bp->next;
80     for (i = nmods; i--; )
81     free(bp->mca[i]);
82     /* Note: we don't own bp->mca[i]->binv */
83     free(bp);
84     }
85     return;
86     }
87     /* clear sums for next use */
88     /* for (i = nmods; i--; )
89     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
90     */
91     bp->ndx = 0;
92     bp->next = free_bq; /* push onto free list */
93     free_bq = bp;
94     }
95    
96    
97     /* Add modifier values to accumulation record in queue and clear */
98 greg 2.10 static void
99 greg 2.1 queue_modifiers()
100     {
101     MODCONT *mpin, *mpout;
102     int i, j;
103    
104     if ((accumulate > 0) | (out_bq == NULL))
105     error(CONSISTENCY, "bad call to queue_modifiers()");
106    
107     for (i = nmods; i--; ) {
108     mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
109     mpout = out_bq->mca[i];
110     for (j = mpout->nbins; j--; )
111     addcolor(mpout->cbin[j], mpin->cbin[j]);
112     memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
113     }
114 greg 2.5 out_bq->nadded++;
115 greg 2.1 }
116    
117    
118 greg 2.4 /* Sum one modifier record into another (updates nadded) */
119 greg 2.1 static void
120     add_modbin(BINQ *dst, BINQ *src)
121     {
122     int i, j;
123    
124     for (i = nmods; i--; ) {
125     MODCONT *mpin = src->mca[i];
126     MODCONT *mpout = dst->mca[i];
127     for (j = mpout->nbins; j--; )
128     addcolor(mpout->cbin[j], mpin->cbin[j]);
129     }
130 greg 2.4 dst->nadded += src->nadded;
131 greg 2.1 }
132    
133    
134 greg 2.2 /* Queue values for later output */
135     static void
136 greg 2.1 queue_output(BINQ *bp)
137     {
138     BINQ *b_last, *b_cur;
139    
140     if (accumulate <= 0) { /* just accumulating? */
141     if (out_bq == NULL) {
142     bp->next = NULL;
143     out_bq = bp;
144     } else {
145     add_modbin(out_bq, bp);
146     free_binq(bp);
147     }
148 greg 2.2 return;
149 greg 2.1 }
150 greg 2.10 b_last = NULL; /* insert in output queue */
151 greg 2.1 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
152     b_cur = b_cur->next)
153     b_last = b_cur;
154 greg 2.4
155 greg 2.1 if (b_last != NULL) {
156     bp->next = b_cur;
157     b_last->next = bp;
158     } else {
159     bp->next = out_bq;
160     out_bq = bp;
161     }
162 greg 2.3 if (accumulate == 1) /* no accumulation? */
163 greg 2.2 return;
164     b_cur = out_bq; /* else merge accumulation entries */
165     while (b_cur->next != NULL) {
166 greg 2.4 if (b_cur->nadded >= accumulate ||
167 greg 2.2 (b_cur->ndx-1)/accumulate !=
168     (b_cur->next->ndx-1)/accumulate) {
169     b_cur = b_cur->next;
170     continue;
171 greg 2.1 }
172 greg 2.2 add_modbin(b_cur, b_cur->next);
173     b_last = b_cur->next;
174     b_cur->next = b_last->next;
175     b_last->next = NULL;
176     free_binq(b_last);
177 greg 2.1 }
178 greg 2.2 }
179    
180    
181 greg 2.4 /* Count number of records ready for output */
182     static int
183     queue_ready()
184     {
185     int nready = 0;
186     BINQ *bp;
187    
188     for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
189     bp->ndx == lastdone+nready*accumulate+1;
190     bp = bp->next)
191     ++nready;
192    
193     return(nready);
194     }
195    
196    
197     /* Catch up with output queue by producing ready results */
198 greg 2.2 static int
199 greg 2.4 output_catchup(int nmax)
200 greg 2.2 {
201     int nout = 0;
202     BINQ *bp;
203     int i;
204 greg 2.10 /* output ready results */
205 greg 2.4 while (out_bq != NULL && out_bq->nadded >= accumulate
206     && out_bq->ndx == lastdone+1) {
207     if ((nmax > 0) & (nout >= nmax))
208     break;
209 greg 2.2 bp = out_bq; /* pop off first entry */
210     out_bq = bp->next;
211     bp->next = NULL;
212 greg 2.1 for (i = 0; i < nmods; i++) /* output record */
213 greg 2.2 mod_output(bp->mca[i]);
214 greg 2.1 end_record();
215 greg 2.2 free_binq(bp); /* free this entry */
216 greg 2.1 lastdone += accumulate;
217     ++nout;
218     }
219     return(nout);
220     }
221    
222    
223 greg 2.3 /* Put a zero record in results queue & output */
224 greg 2.1 void
225 greg 2.3 put_zero_record(int ndx)
226 greg 2.1 {
227     BINQ *bp = new_binq();
228     int i;
229    
230     for (i = nmods; i--; )
231     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
232     bp->ndx = ndx;
233 greg 2.6 bp->nadded = 1;
234 greg 2.1 queue_output(bp);
235 greg 2.4 output_catchup(0);
236 greg 2.1 }
237    
238    
239 greg 2.6 /* Get results from child process and add to queue */
240     static void
241     queue_results(int k)
242     {
243     BINQ *bq = new_binq(); /* get results holder */
244     int j;
245    
246     bq->ndx = kida[k].r1;
247     bq->nadded = kida[k].nr;
248     /* read from child */
249     for (j = 0; j < nmods; j++)
250     if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
251     kida[k].infp) != bq->mca[j]->nbins)
252     error(SYSTEM, "read error from render process");
253    
254     queue_output(bq); /* put results in output queue */
255     kida[k].nr = 0; /* mark child as available */
256     }
257    
258    
259 greg 2.1 /* callback to set output spec to NULL (stdout) */
260     static int
261     set_stdout(const LUENT *le, void *p)
262     {
263     (*(MODCONT *)le->data).outspec = NULL;
264     return(0);
265     }
266    
267    
268     /* Start child processes if we can */
269     int
270     in_rchild()
271     {
272 greg 2.9 int rval;
273    
274     while (nchild < nproc) { /* fork until target reached */
275 greg 2.3 errno = 0;
276 greg 2.9 rval = open_process(&kida[nchild].pr, NULL);
277     if (rval < 0)
278     error(SYSTEM, "open_process() call failed");
279     if (rval == 0) { /* if in child, set up & return true */
280 greg 2.18 lu_doall(&modconttab, &set_stdout, NULL);
281 greg 2.4 lu_done(&ofiletab);
282 greg 2.5 while (nchild--) { /* don't share other pipes */
283 greg 2.6 close(kida[nchild].pr.w);
284     fclose(kida[nchild].infp);
285 greg 2.4 }
286 greg 2.1 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
287     outfmt = 'd';
288     header = 0;
289     yres = 0;
290     raysleft = 0;
291 greg 2.6 if (accumulate == 1) {
292     waitflush = xres = 1;
293     account = accumulate = 1;
294     } else { /* parent controls accumulation */
295     waitflush = xres = 0;
296     account = accumulate = 0;
297     }
298 greg 2.9 return(1); /* return "true" in child */
299 greg 2.1 }
300 greg 2.9 if (rval != PIPE_BUF)
301     error(CONSISTENCY, "bad value from open_process()");
302     /* connect to child's output */
303     kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
304 greg 2.6 if (kida[nchild].infp == NULL)
305 greg 2.2 error(SYSTEM, "out of memory in in_rchild()");
306 greg 2.3 #ifdef getc_unlocked
307 greg 2.6 flockfile(kida[nchild].infp); /* avoid mutex overhead */
308 greg 2.3 #endif
309 greg 2.6 kida[nchild++].nr = 0; /* mark as available */
310 greg 2.1 }
311 greg 2.9 return(0); /* return "false" in parent */
312 greg 2.1 }
313    
314    
315     /* Close child processes */
316     void
317 greg 2.12 end_children(int immed)
318 greg 2.1 {
319     int status;
320    
321 greg 2.5 while (nchild > 0) {
322     nchild--;
323 greg 2.15 #ifdef SIGKILL
324 greg 2.12 if (immed) /* error mode -- quick exit */
325     kill(kida[nchild].pr.pid, SIGKILL);
326 greg 2.15 #endif
327 greg 2.12 if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
328 greg 2.1 sprintf(errmsg,
329     "rendering process returned bad status (%d)",
330     status);
331     error(WARNING, errmsg);
332     }
333 greg 2.9 fclose(kida[nchild].infp);
334 greg 2.2 }
335 greg 2.1 }
336    
337    
338 greg 2.5 /* Wait for the next available child, managing output queue simultaneously */
339 greg 2.1 static int
340 greg 2.5 next_child_nq(int flushing)
341 greg 2.1 {
342 greg 2.2 static struct timeval polling;
343 greg 2.4 struct timeval *pmode;
344 greg 2.2 fd_set readset, errset;
345 greg 2.6 int i, n, nr, nqr;
346 greg 2.1
347 greg 2.5 if (!flushing) /* see if there's one free */
348 greg 2.1 for (i = nchild; i--; )
349 greg 2.6 if (!kida[i].nr)
350 greg 2.1 return(i);
351 greg 2.4
352 greg 2.5 nqr = queue_ready(); /* choose blocking mode or polling */
353     if ((nqr > 0) & !flushing)
354     pmode = &polling;
355     else
356 greg 2.4 pmode = NULL;
357 greg 2.5 tryagain: /* catch up with output? */
358     if (pmode == &polling) {
359     if (nqr > nchild) /* don't get too far behind */
360     nqr -= output_catchup(nqr-nchild);
361     } else if (nqr > 0) /* clear output before blocking */
362     nqr -= output_catchup(0);
363 greg 2.1 /* prepare select() call */
364     FD_ZERO(&readset); FD_ZERO(&errset);
365     n = nr = 0;
366     for (i = nchild; i--; ) {
367 greg 2.6 if (kida[i].nr) {
368     FD_SET(kida[i].pr.r, &readset);
369 greg 2.1 ++nr;
370     }
371 greg 2.6 FD_SET(kida[i].pr.r, &errset);
372     if (kida[i].pr.r >= n)
373     n = kida[i].pr.r + 1;
374 greg 2.1 }
375 greg 2.3 if (!nr) /* nothing to wait for? */
376     return(-1);
377 greg 2.2 if ((nr > 1) | (pmode == &polling)) {
378 greg 2.1 errno = 0;
379 greg 2.5 nr = select(n, &readset, NULL, &errset, pmode);
380     if (!nr) {
381 greg 2.2 pmode = NULL; /* try again, blocking this time */
382     goto tryagain;
383     }
384 greg 2.5 if (nr < 0)
385 greg 2.3 error(SYSTEM, "select() error in next_child_nq()");
386 greg 2.1 } else
387     FD_ZERO(&errset);
388     n = -1; /* read results from child(ren) */
389     for (i = nchild; i--; ) {
390 greg 2.6 if (FD_ISSET(kida[i].pr.r, &errset))
391 greg 2.1 error(USER, "rendering process died");
392 greg 2.6 if (FD_ISSET(kida[i].pr.r, &readset))
393     queue_results(n = i);
394 greg 2.1 }
395 greg 2.3 return(n); /* first available child */
396 greg 2.1 }
397    
398    
399     /* Run parental oversight loop */
400     void
401     parental_loop()
402     {
403 greg 2.17 const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
404 greg 2.6 int ninq = 0;
405     FVECT orgdir[2*MAXIQ];
406     int i, n;
407 greg 2.1 /* load rays from stdin & process */
408     #ifdef getc_unlocked
409     flockfile(stdin); /* avoid lock/unlock overhead */
410     #endif
411 greg 2.6 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
412 greg 2.18 const int zero_ray = orgdir[2*ninq+1][0] == 0.0 &&
413     (orgdir[2*ninq+1][1] == 0.0) &
414     (orgdir[2*ninq+1][2] == 0.0);
415     ninq += !zero_ray;
416     /* Zero ray cannot go in input queue */
417     if (zero_ray ? ninq : ninq >= qlimit ||
418 greg 2.6 lastray/accumulate != (lastray+ninq)/accumulate) {
419 greg 2.5 i = next_child_nq(0); /* manages output */
420 greg 2.6 n = ninq;
421 greg 2.18 if (accumulate > 1) /* need terminator? */
422 greg 2.6 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
423     n *= sizeof(FVECT)*2; /* send assignment */
424     if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
425 greg 2.1 error(SYSTEM, "pipe write error");
426 greg 2.6 kida[i].r1 = lastray+1;
427     lastray += kida[i].nr = ninq; /* mark as busy */
428     if (lastray < lastdone) { /* RNUMBER wrapped? */
429     while (next_child_nq(1) >= 0)
430     ;
431 greg 2.17 lastray -= ninq;
432     lastdone = lastray %= accumulate;
433 greg 2.6 }
434 greg 2.17 ninq = 0;
435 greg 2.1 }
436 greg 2.18 if (zero_ray) { /* put bogus record? */
437     if ((yres <= 0) | (xres <= 1) &&
438     (lastray+1) % accumulate == 0) {
439     while (next_child_nq(1) >= 0)
440     ; /* clear the queue */
441     lastdone = lastray = accumulate-1;
442     waitflush = 1; /* flush next */
443     }
444     put_zero_record(++lastray);
445     }
446 greg 2.1 if (raysleft && !--raysleft)
447     break; /* preemptive EOI */
448     }
449     while (next_child_nq(1) >= 0) /* empty results queue */
450     ;
451 greg 2.10 if (account < accumulate) {
452     error(WARNING, "partial accumulation in final record");
453     free_binq(out_bq); /* XXX just ignore it */
454 greg 2.3 out_bq = NULL;
455 greg 2.1 }
456 greg 2.10 free_binq(NULL); /* clean up */
457     lu_done(&ofiletab);
458 greg 2.1 if (raysleft)
459     error(USER, "unexpected EOF on input");
460 greg 2.10 }
461    
462    
463     /* Wait for the next available child by monitoring "to" pipes */
464     static int
465     next_child_ready()
466     {
467     fd_set writeset, errset;
468 greg 2.11 int i, n;
469 greg 2.10
470     for (i = nchild; i--; ) /* see if there's one free first */
471     if (!kida[i].nr)
472     return(i);
473     /* prepare select() call */
474     FD_ZERO(&writeset); FD_ZERO(&errset);
475     n = 0;
476     for (i = nchild; i--; ) {
477     FD_SET(kida[i].pr.w, &writeset);
478     FD_SET(kida[i].pr.r, &errset);
479     if (kida[i].pr.w >= n)
480     n = kida[i].pr.w + 1;
481     if (kida[i].pr.r >= n)
482     n = kida[i].pr.r + 1;
483     }
484     errno = 0;
485     n = select(n, NULL, &writeset, &errset, NULL);
486     if (n < 0)
487     error(SYSTEM, "select() error in next_child_ready()");
488     n = -1; /* identify waiting child */
489     for (i = nchild; i--; ) {
490     if (FD_ISSET(kida[i].pr.r, &errset))
491     error(USER, "rendering process died");
492     if (FD_ISSET(kida[i].pr.w, &writeset))
493     kida[n = i].nr = 0;
494     }
495     return(n); /* first available child */
496     }
497    
498    
499     /* Modified parental loop for full accumulation mode (-c 0) */
500     void
501     feeder_loop()
502     {
503     static int ignore_warning_given = 0;
504     int ninq = 0;
505     FVECT orgdir[2*MAXIQ];
506     int i, n;
507     /* load rays from stdin & process */
508     #ifdef getc_unlocked
509     flockfile(stdin); /* avoid lock/unlock overhead */
510     #endif
511     while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
512     if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
513     (orgdir[2*ninq+1][1] == 0.0) &
514     (orgdir[2*ninq+1][2] == 0.0)) {
515     if (!ignore_warning_given++)
516     error(WARNING,
517     "dummy ray(s) ignored during accumulation\n");
518     continue;
519     }
520     if (++ninq >= MAXIQ) {
521     i = next_child_ready(); /* get eager child */
522     n = sizeof(FVECT)*2 * ninq; /* give assignment */
523     if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
524     error(SYSTEM, "pipe write error");
525     kida[i].r1 = lastray+1;
526     lastray += kida[i].nr = ninq;
527     if (lastray < lastdone) /* RNUMBER wrapped? */
528     lastdone = lastray = 0;
529 greg 2.18 ninq = 0;
530 greg 2.10 }
531     if (raysleft && !--raysleft)
532     break; /* preemptive EOI */
533     }
534     if (ninq) { /* polish off input */
535     i = next_child_ready();
536     n = sizeof(FVECT)*2 * ninq;
537     if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
538     error(SYSTEM, "pipe write error");
539     kida[i].r1 = lastray+1;
540     lastray += kida[i].nr = ninq;
541     ninq = 0;
542     }
543 greg 2.13 memset(orgdir, 0, sizeof(FVECT)*2); /* get results */
544     for (i = nchild; i--; ) {
545     writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
546 greg 2.10 queue_results(i);
547     }
548     if (recover) /* and from before? */
549     queue_modifiers();
550 greg 2.12 end_children(0); /* free up file descriptors */
551 greg 2.10 for (i = 0; i < nmods; i++)
552     mod_output(out_bq->mca[i]); /* output accumulated record */
553     end_record();
554     free_binq(out_bq); /* clean up */
555     out_bq = NULL;
556     free_binq(NULL);
557 greg 2.1 lu_done(&ofiletab);
558 greg 2.10 if (raysleft)
559     error(USER, "unexpected EOF on input");
560 greg 2.1 }